spring boot 动态添加监听kafka哪些Topic
spring boot 动态添加监听kafka哪些Topic代码配置代码//设置kafka信息@Bean("ackContainerFactory")public ConcurrentKafkaListenerContainerFactory ackContainerFactory(ConsumerFactory consumerFactory){ConcurrentKafkaListenerCo
·
代码
//设置kafka信息
@Bean("ackContainerFactory")
public ConcurrentKafkaListenerContainerFactory ackContainerFactory(ConsumerFactory consumerFactory){
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
/*
设置默认不监听kafka设置的
(此功能是可以动态设置是否启动kafka监听,如果不需要可以
设置为true,甚至不用写这个方法;动态监听方法见下放)
*/
factory.setAutoStartup(false);
return factory;
}
public static final String KAFKA_LISTENER_ID = "KAFKA_ID";
//监听配置文件中的receiveTopics的值使用,分割
//会监听下方配置中的test、test1、test2;三个topic
@KafkaListener(id = KAFKA_LISTENER_ID,topics = {"#{'${receiveTopics}'.split(',')}"},containerFactory = "ackContainerFactory")
public void message(ConsumerRecord<String, String> record) {
//获取topic
String topic = record.topic();
//获取键值(有可能乱码)
String flieName = new String(record.key().getBytes("UTF-8"),"UTF-8");
//获取内容
String valueTopic = new String(record.value().getBytes("UTF-8"),"UTF-8");
…………处理操作
}
配置
receiveTopics: test,test1,test2
其他步骤
更多推荐
已为社区贡献11条内容
所有评论(0)