前言

整理文档的时候发现还缺了有关操作Group ID的内容,这部分其实和ACL的操作是一起的,但是篇幅有限单独拿出来说,AdminClient对象里关于Group ID的操作还是挺多内容可以说说的,下面我们正式开始。更多内容请点击【Apache Kafka API AdminClient 目录】

查询所有Group ID

首先我们先说下如何查询当前服务器上所有使用过的Group ID。这里要特别强调使用过,因为这个方法是无法查询到没有连接的Group ID的。假设我们新创建了一个Group ID: AAA。当AAA没有被Consumer的账户使用过时,是无法被这个方法查询出来的。查询所有使用过的Gourp ID的方法叫做listConsumerGroups()

Modifier and TypeMethodDescription
ListConsumerGroupsResultlistConsumerGroups()List the consumer groups available in the cluster with the default options.
abstract ListConsumerGroupsResultlistConsumerGroups(ListConsumerGroupsOptions options)List the consumer groups available in the cluster.

官方文档中提供了两个方法,一个正常用的,一个抽象的用于扩展。我们只需要用到第一个即可,方法的返回值ListGroupsResult一如既往,也没有什么参数,直接上Sample。

Sample

public void listAllGroupId() throws ExecutionException, InterruptedException {
    ListConsumerGroupsResult listResult = adminClient.listConsumerGroups();
    Collection<ConsumerGroupListing> listResultCollection = listResult.all().get();
    for (ConsumerGroupListing consumerGroupListing : listResultCollection) {
        System.out.println(consumerGroupListing.groupId());
    }
}

输出内容,只是简单的把所有当前使用的Group ID的名字输出了出来:
group1
group2
group3
group4
java_test
……

查询指定Group ID

上面的方法只能简单的查询出来Group ID的名字是什么,显然这么简单的结果基本上没啥作用,因此指定查询做的就比上面的好的多,能够显示更多的信息,首先还是看官网给了什么方法。

Modifier and TypeMethodDescription
default DescribeConsumerGroupsResultdescribeConsumerGroups(Collection<String> groupIds)Describe some group IDs in the cluster, with the default options.
DescribeConsumerGroupsResultdescribeConsumerGroups(Collection<String> groupIds, DescribeConsumerGroupsOptions options)Describe some group IDs in the cluster.

主要方法就是describeConsumerGroups(),参数也是简单明了,让我们传递一个Group ID的集合进去,可以是各种List或者Set等等。而且说明这个查询是支持多个Group ID查询的,传递两个Group ID进去就可以查出来两个的信息,不多说直接上Sample。

Sample

public void describeSingleGroupID(ArrayList<String> groupIds) throws ExecutionException, InterruptedException {
    DescribeConsumerGroupsResult groupIdResult = adminClient.describeConsumerGroups(groupIds);
    Map<String, ConsumerGroupDescription> groupIdResultCollection = groupIdResult.all().get();
	//如果传入的是多个Group ID就能循环拿到多个输出
    for (String key: groupIdResultCollection.keySet()) {
        System.out.println("Group ID: "+groupIdResultCollection.get(key));
    }
}
输出Group ID没有被使用过:
Group ID: (groupId=javaint, isSimpleConsumerGroup=true, members=, partitionAssignor=, state=Dead, coordinator=10.87.149.65:9092 (id: 5 rack: null), authorizedOperations=[])

输出Group ID被使用过或者正在被使用:
Group ID: (groupId=javaint, isSimpleConsumerGroup=false, 
members=(memberId=consumer-4-01505d82-46af-4f0d-9386-81c4ea3b3114, groupInstanceId=null, clientId=consumer-4, host=10.111.33.168/10.111.33.168, assignment=(topicPartitions=topic1-1,topic1-0)),
	(memberId=consumer-2-7a794a20-02fe-4699-83c6-7e55dbbe361d, groupInstanceId=null, clientId=consumer-2, host=10.111.33.168/10.111.33.168, assignment=(topicPartitions=topic2-3,topic2-2)),
	(memberId=consumer-4-bc385da9-68ba-450e-80dc-00681f15aa18, groupInstanceId=null, clientId=consumer-4, host=10.111.33.167/10.111.33.167, assignment=(topicPartitions=topic1-3,topic1-2)),
	(memberId=consumer-2-24f31ad6-ca6f-4fe9-8f32-ce8a0eac83ee, groupInstanceId=null, clientId=consumer-2, host=10.111.33.167/10.111.33.167, assignment=(topicPartitions=topic2-1,topic2-0)), 
	partitionAssignor=range, state=Stable, coordinator=10.87.149.65:9092 (id: 1 rack: null), authorizedOperations=[])

可以看到这个指定Group ID的方法能够拿出来更多的信息,即便这个Group ID从来没有被使用过,那么我们就可以针对这些信息通过Group ID对消费者进行一个监控。比如这里可以知道javaint有多少个consumer现成,分别从哪个host上发起的消费,状态是什么,partition分配规则是什么,在哪个broker上存得等等内容都可以拿到。

创建Group ID

创建Group ID也是属于Kafka 权限一系列的操作,因此我们还是使用的还是adminClient.createAcls()这个方法,方法的详细请参考【Apache Kafka API AdminClient 账号对Topic权限赋予与移除】, 就不额外占用篇幅了,直接上Sample吧,唯一要说的就是Group ID也是和账号绑定的,因此一般会把账号名和Group ID同时作为参数传递进来。

Sample

public void createGroup(String groupId) throws ExecutionException, InterruptedException {
	//创建Group对象
    ResourcePattern resourcePattern = new ResourcePattern(ResourceType.GROUP, groupId, PatternType.LITERAL);
    //创建账户对象
    AccessControlEntry accessControlEntry=new AccessControlEntry("User:kaf_java_int","*", AclOperation.ALL, AclPermissionType.ALLOW);
    //绑定对象和Group对象
    AclBinding aclBinding=new AclBinding(resourcePattern,accessControlEntry);
    Collection<AclBinding> aclBindingCollection= new ArrayList<>();
    //添加到列表
    aclBindingCollection.add(aclBinding);
    //给账户绑定上这个Group
    CreateAclsResult aclResult = adminClient.createAcls(aclBindingCollection);
    KafkaFuture<Void> result = aclResult.all();
    result.get();
    if (result.isDone()){
        System.out.println("createGroup:"+result.toString());
    }
}

删除Group ID

说完查询和创建,那就剩一个删除了。删除操作同样要使用ACL的相关方法adminClient.deleteAcls()去做,同样上面的连接有详细的说明,直接上Sample。

Sample

public void removeGroupId(String groupId) throws ExecutionException, InterruptedException {
	//创建Group对象
    ResourcePatternFilter resourcePatternFilter = new ResourcePatternFilter(ResourceType.GROUP, groupId, PatternType.LITERAL);
    //创建账户对象
    AccessControlEntryFilter accessControlEntry = new AccessControlEntryFilter("User:kaf_java_int", "*", AclOperation.ALL, AclPermissionType.ALLOW);
    //绑定对象和Group对象
    AclBindingFilter aclBindingFilter = new AclBindingFilter(resourcePatternFilter, accessControlEntry);
    Collection<AclBindingFilter> aclBindingCollection = new ArrayList<>();
    //添加到列表
    aclBindingCollection.add(aclBindingFilter);
    //从账户上移除Group ID
    DeleteAclsResult result = adminClient.deleteAcls(aclBindingCollection);
    result.all().get();
    if (result.all().isDone()){
        System.out.println("done");
    }
}

但是要注意的是,上面你给Group ID绑定了什么权限,这里最好移除同样的权限,就是AclOperation.ALL这里的配置,笔者上下都是配置的ALL,因为Group ID其实并不能做什么,配置ALL权限可以在使用中减少很多的麻烦。由于这是一个emun类型,因此你可以指定任何想要配置的权限,比如可以任意指定下列13种权限中的任意一种:

public enum AclOperation {
	UNKNOWN((byte) 0), 
	ANY((byte) 1), 
	ALL((byte) 2), 
	READ((byte) 3), 
	WRITE((byte) 4), 
	CREATE((byte) 5), 
	DELETE((byte) 6), 
	ALTER((byte) 7), 
	DESCRIBE((byte) 8), 
	CLUSTER_ACTION((byte) 9), 
	DESCRIBE_CONFIGS((byte) 10), 
	ALTER_CONFIGS((byte) 11),
	IDEMPOTENT_WRITE((byte) 12);
...code...
}

另一个删除方法

笔者在研读官网的时候还发现了另一个删除Group ID的方法deleteConsumerGroups(),从官网直接的描述来看,似乎这个才是删除Group ID的正途,但是这个方法的表现却很奇怪,先把官网介绍贴出来。

Modifier and TypeMethodDescription
DeleteConsumerGroupsResultdeleteConsumerGroups(Collection<String> groupIds)Delete consumer groups from the cluster with the default options.
abstract DeleteConsumerGroupsResultdeleteConsumerGroups(Collection<String> groupIds, DeleteConsumerGroupsOptions options)Delete consumer groups from the cluster.

和Kafka API一贯的方法类似,这个方法返回也是一个Result类型的,而且传递参数也是简简单单的一个Group ID的字符串集合。官网对于这个方法的描述也十分符合要求Delete consumer groups from the cluster with the default options,使用默认的选像从集群删除消费组,这个方法的Sample如下。

Sample

public void deleteGroupId(ArrayList<String> groupIds) throws ExecutionException, InterruptedException {
    DeleteConsumerGroupsResult result = adminClient.deleteConsumerGroups(groupIds);
    Map<String, KafkaFuture<Void>> resultMap = result.deletedGroups();
    for (String key: resultMap.keySet()) {
		//输出要删除的Group ID
        System.out.println("NeededDeleteGroupId:"+key); 
    }
    KafkaFuture<Void> future = result.all();
    future.get();
    if(future.isDone()){
        System.out.println("delete group id: "+result.toString());
    }
}
输出:
NeededDeleteGroupId:bob
后面要删除的时候,抛出异常GroupNotEmptyException

问题

这个方法表现很奇怪,因为无论是新创建的Group ID,还是正在使用的Group ID,亦或者消费者已经停止使用的Group ID这个方法都无法正常执行下去,上述三种情况都会报如下异常:

Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.
	at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
	at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
	at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
	at com.company.MyGroupOperation.deleteGroupId(MyGroupOperation.java:76)
	at com.company.Main.main(Main.java:67)
Caused by: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.

至于为什么会这样,笔者猜测:Group ID是建在Zookeeper上的如果Zookeeper下有数据,就不能删除Group ID。因此会报GroupNotEmptyException: The group is not empty.这个异常。因为哪怕是从来没有使用过的新建的Group ID 也会报这种错误,所以有这样的一个猜测。但是具体这个异常是怎么发生的,还需要去Kafka源码Scala中一步一步的去看,因此如果有知道的读者还请不吝赐教。目前只能说等待官网更新API看看是否能修复这个奇怪的地方。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐