spring boot 动态启动关闭kafka监听、设置默认不监听kafka
spring boot 动态启动关闭kafka监听//添加注解@Component@Slf4jpublic class KafkaReceiver {@Autowiredprivate KafkaListenerEndpointRegistry registry;public static final String KAFKA_LISTENER_ID = "KAFKA_ID";@Bean("ack
·
spring boot 动态启动关闭kafka监听、设置默认不监听kafka、动态设置kafka监听哪些topic
//添加注解
@Component
@Slf4j
public class KafkaReceiver {
@Autowired
private KafkaListenerEndpointRegistry registry;
public static final String KAFKA_LISTENER_ID = "KAFKA_ID";
@Bean("ackContainerFactory")
public ConcurrentKafkaListenerContainerFactory ackContainerFactory(ConsumerFactory consumerFactory){
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
/*
设置默认不监听kafka设置的,如果默认监听的话,还没有等到
程序关闭监听kafka服务时就会有一些kafka上的数据被拉下来了
*/
factory.setAutoStartup(false);
return factory;
}
//动态设置监听哪些kafka
@KafkaListener(id = KAFKA_LISTENER_ID,topics = {"#{'${receiveTopics}'.split(',')}"},containerFactory = "ackContainerFactory")
public void message(ConsumerRecord<String, String> record) {
//获取topic
String topic = record.topic();
//获取键值(有可能乱码)
String flieName = new String(record.key().getBytes("UTF-8"),"UTF-8");
//获取内容
String valueTopic = new String(record.value().getBytes("UTF-8"),"UTF-8");
…………处理操作
}
//关闭kafka监听方法
public void stop(){
log.info("kafkalistener stop...");
registry.getListenerContainer(KAFKA_LISTENER_ID).stop();
}
//启动kafka监听方法
public void start(){
log.info("kafkalistener start...");
//判断监听容器是否启动
if (!registry.getListenerContainer(KAFKA_LISTENER_ID).isRunning()){
registry.getListenerContainer(KAFKA_LISTENER_ID).start();
}
//项目启动的时候监听容器是未启动状态,resume是恢复的意思不是启动的意思
registry.getListenerContainer(KAFKA_LISTENER_ID).resume();
}
}
//启动项目
import com.phy.bcs.common.util.spring.SpringContextHolder;
@SpringBootApplication(/*exclude = {DataSourceAutoConfiguration.class, HibernateJpaAutoConfiguration.class}*/)
@Slf4j
public class Test{
public static KafkaProperties kafkaProperties;
public static void main(String[] args) throws UnknownHostException {
KafkaReceiver kafkaReceiver = SpringContextHolder.getBean(KafkaReceiver.class);
//获取配置文件
kafkaProperties = SpringContextHolder.getBean(KafkaProperties.class);
//根据配置动态设置是否监听kafka
if(kafkaProperties.getStartOrNot() == 1 || kafkaProperties.getStartOrNot() == 3){
kafkaReceiver.stop();
log.info("关闭监听kafka");
} else {
kafkaReceiver.start();
log.info("打开监听kafka");
}
}
}
其他步骤
更多推荐
已为社区贡献12条内容
所有评论(0)