kafka动态指定主题Topic与GroupId,解决集群环境下只有一个实例能消费队列的情况。
kafka动态指定主题与分组ID原因近期项目新做了一个环境,采购了阿里的CLB进行四层代理的负载均衡,每个服务都搭建了两个实例,后来测试过程中,遇到了一个问题,推送服务有时没数据问题描述两个相同实例,都作为kafka监听者,存在只有一个实例消费到数据,之前设置了GroupId,但是集群环境下还是会存在组内消费竞争的问题。注意请注意自己的业务场景!请注意自己的业务场景!请注意自己的业务场景!解决方案
·
kafka动态指定主题与分组ID
原因
近期项目新做了一个环境,采购了阿里的CLB进行四层代理的负载均衡,每个服务都搭建了两个实例,后来测试过程中,遇到了一个问题,推送服务有时没数据
问题描述
两个相同实例,都作为kafka监听者,存在只有一个实例消费到数据,之前设置了GroupId,但是集群环境下还是会存在组内消费竞争的问题。
注意
请注意自己的业务场景!
请注意自己的业务场景!
请注意自己的业务场景!
解决方案
1.配置kafka工厂
注意:此时应将groupid写入yml或properties配置文件
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String brokers;
@Value("${group.android}")
private String tcpAndroid;
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory4WS() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory4WS());
factory.setConcurrency(2);
factory.getContainerProperties().setPollTimeout(4000);
return factory;
}
public Map<String, Object> getCommonPropertis() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, tcpAndroid);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return properties;
}
public ConsumerFactory<String, String> consumerFactory4WS() {
Map<String, Object> properties = getCommonPropertis();
properties.put(ConsumerConfig.GROUP_ID_CONFIG, tcpAndroid);
return new DefaultKafkaConsumerFactory<>(properties);
}
}
2.kafka监听者配置
@KafkaListener(topics = {"${topic.msg}"}, containerFactory = "kafkaListenerContainerFactory4WS")
此时,kafka可以指定工厂,名字在第1步已经配置好了
如果你的业务场景需要每个实例都需要数据,可以采用这个方式。 因为我们是推送业务,必须保证每个实例都能消费到数据,所以,你懂的。
完成
更多推荐
已为社区贡献1条内容
所有评论(0)