kafka动态指定主题与分组ID

原因

近期项目新做了一个环境,采购了阿里的CLB进行四层代理的负载均衡,每个服务都搭建了两个实例,后来测试过程中,遇到了一个问题,推送服务有时没数据

问题描述

两个相同实例,都作为kafka监听者,存在只有一个实例消费到数据,之前设置了GroupId,但是集群环境下还是会存在组内消费竞争的问题。

注意

请注意自己的业务场景!
请注意自己的业务场景!
请注意自己的业务场景!

解决方案

1.配置kafka工厂

注意:此时应将groupid写入yml或properties配置文件

@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String brokers;

    @Value("${group.android}")
    private String tcpAndroid;

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory4WS() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory4WS());
        factory.setConcurrency(2);
        factory.getContainerProperties().setPollTimeout(4000);
        return factory;
    }

    public Map<String, Object> getCommonPropertis() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, tcpAndroid);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        return properties;
    }

    public ConsumerFactory<String, String> consumerFactory4WS() {
        Map<String, Object> properties = getCommonPropertis();
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, tcpAndroid);
        return new DefaultKafkaConsumerFactory<>(properties);
    }
}

2.kafka监听者配置

@KafkaListener(topics = {"${topic.msg}"}, containerFactory = "kafkaListenerContainerFactory4WS")

此时,kafka可以指定工厂,名字在第1步已经配置好了

如果你的业务场景需要每个实例都需要数据,可以采用这个方式。 因为我们是推送业务,必须保证每个实例都能消费到数据,所以,你懂的。

完成

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐