根据ip和端口获取kafka列表、判断kafka中是否存在某个topic、获取kafka消息队列上有哪些topic

/*
判断topic是否在此消息队列上存在
IP和端口可以通过参数或属性动态获取
*/
public static Boolean topicExistence(String topic) {
	Boolean ok = false;
	if (topic == null || topic.length() <=0){
		return ok;
	}
	Properties properties = new Properties();
	// key反序列化方式
	properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
	// value反系列化方式
	properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
	// 提交方式
	properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
	// 指定broker地址多个中间用,拆分,来找到group的coordinator
	properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092,127.0.0.1:9093");
	// 指定用户组
	properties.put(ConsumerConfig.GROUP_ID_CONFIG, "TEST");
	//topic列表
	KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
	//循环判断是否存在
	for (String e :consumer.listTopics().keySet()){
		System.out.println(e);
		if (topic.equals(e)){
			ok = true;
		}
	}
	return ok;
}

其他步骤

  1. kafka服务自带zookeeper下载与启动
  2. Spring boot配置kafka服务
  3. kafka生产者发送消息成功回调
  4. kafka根据ip端口获取消息队列上的topic
  5. kafka动态设置监听哪些topic
  6. 动态启动关闭kafka监听、设置默认不监听kafka
  7. kafka设置:1只接受消息、不发送消息;2只发送消息不接受消息;3既接受消息也发送消息;4既不接收消息也不发送消息
  8. kafka会把历史数据都获取下来
  9. Spring boot kafka执行多次多次消费
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐