Kafka——创建无消费者组的消费者消费订阅主题
前景最近在编写了一个websocket+kafka的推送订阅组件,类似之前的一篇wesocket+redis的推送订阅组件。在实现完功能后,发现一个问题就是从websocket接收到订阅信息到websocket推送接收到的kafka数据这期间花了3s多,排查了一个,这3s用在了这里:也就是Request joining group due to: need to re-join with the
前景
最近在编写了一个websocket+kafka的推送订阅组件,类似之前的一篇wesocket+redis的推送订阅组件。
期间遇到了两个问题。
不停地组平衡
由于每次订阅不同的主题都会去生成一个消费者,且同一个服务里博主配置的消费者组是同一个。相当于:
每次订阅不同的主题
= 消费者组会加入一个新的消费者
=> 消费者组发生组平衡
=> 组内所有消费者都会暂停手头的工作去等待消费者组进行组平衡
这样显然是不合适的。
后来就决定在每次创建消费者的时候都给他指定一个新的消费者组。这样就不会影响到其他消费者消费数据了。
订阅时间过长
在实现完功能后,发现一个问题就是从websocket接收到订阅信息
到websocket推送接收到的kafka数据
这期间花了3s多,这样会导致使用体验很差。排查了一个,这3s用在了这里:
也就是Request joining group due to: need to re-join with the given member-id
。当我创建一个带消费者组的消费者后,它在加入消费者组之前需要消费者组给它分配一个member-id。
提问
为什么消费者组给消费者分配member-id需要3s的时间? 影响这个时间的因素又有哪些?
在我查阅了一番资料以后,并没有得出什么结论。希望知道答案的大佬能给小弟解惑
。
既然这条路对于我来说走不通,那就换条路走吧~
在《Kafka权威指南》第53页第二行中写道“创建不属于任何一个群组的消费者也是可以的”
既然加入消费者组需要那么长的时间,那我可不可以不加入消费者组?当个自由人不香嘛~
当然我们得明确自己的需求,是不是可以不要消费者组,这里就需要了解一下消费者组的作用了,博主所知道的作用有:
1.确保组内消费者不会消费到同一个主题的同一个分组(保证消费的数据不重复)。
2. 同一个消费者组的成员可以一起消费一个主题(不同分区)里面的数据,比单个消费者消费整个主题的数据压力来的要轻很多。
3.当组内的成员“死亡”的时候,它所负责的分区会交给组内的其他成员。
对比了一下这三点,发现博主好像并不是很需要消费者组。
随即,博主就将创建消费者组的那一块代码中,删去了关于消费者组的一些配置。然后就是启动项目,开始测试。测试中发现会报一个错误:To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration
,那是因为博主用的KafkaConsumer的subscribe方法订阅的数据,该方法会调用maybeThrowInvalidGroupIdException方法去检测消费者的消费者组存不存在,不存在则会报上面说的那个错误。我们可以换一种方式订阅:KafkaConsumer的assign方法。
核心代码
websocket相关部分的代码和之前《websocket+redis动态订阅和动态取消订阅》中的几乎是一摸一样的,这里就不贴出来了。
KafkaPubsub
@Component
@Slf4j
public class KafkaPubSub {
private KafkaClient kafkaClient = GetBeanUtil.getBean(KafkaClient.class);
private boolean pollFlag = true;
private KafkaConsumer<String, String> consumer = null;
private final ObjectMapper o = new ObjectMapper();
private String message = "{\n" +
" \"id\": \"%s\",\n" +
" \"msg\": %s,\n" +
" \"type\": 0\n" +
"}";
public void subAndPoll(String topic){
consumer = kafkaClient.noGroupSubscribe(topic);
while (pollFlag) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
WebSocketServer.publish(String.format(message, topic, record.value()), topic);
}
}
consumer.unsubscribe();
}
public void unsubscribe(){
this.pollFlag = false;
}
}
KafkaClient
@Slf4j
public class KafkaClient {
private Properties noGroupproperties;
public KafkaClient(MessageBrokerProperties messageBrokerProperties) {
noGroupproperties = new Properties();
noGroupproperties.put("bootstrap.servers", messageBrokerProperties.getBootstrapServers());
noGroupproperties.put("enable.auto.commit", false);
noGroupproperties.put("key.deserializer", messageBrokerProperties.getKeyDeserializer());
noGroupproperties.put("value.deserializer", messageBrokerProperties.getValueDeserializer());
}
private KafkaConsumer<String, String> getNoGroupComsumer(){
return new KafkaConsumer(noGroupproperties);
}
public KafkaConsumer<String, String> noGroupSubscribe(String topic){
KafkaConsumer<String, String> consumer = getNoGroupComsumer();
TopicPartition topicPartition = new TopicPartition(topic, 0);
consumer.assign(Arrays.asList(topicPartition));
return consumer;
}
}
更多推荐
所有评论(0)