根据ip和端口获取kafka列表、判断kafka中是否存在某个topic、获取kafka消息队列上有哪些topic
根据ip和端口获取kafka列表、判断kafka中是否存在某个topic、获取kafka消息队列上有哪些topic/*判断topic是否在此消息队列上存在IP和端口可以通过参数或属性动态获取*/public static Boolean topicExistence(String topic) {Boolean ok = false;if (topic == null || topic.lengt
·
根据ip和端口获取kafka列表、判断kafka中是否存在某个topic、获取kafka消息队列上有哪些topic
/*
判断topic是否在此消息队列上存在
IP和端口可以通过参数或属性动态获取
*/
public static Boolean topicExistence(String topic) {
Boolean ok = false;
if (topic == null || topic.length() <=0){
return ok;
}
Properties properties = new Properties();
// key反序列化方式
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
// value反系列化方式
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
// 提交方式
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 指定broker地址多个中间用,拆分,来找到group的coordinator
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092,127.0.0.1:9093");
// 指定用户组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "TEST");
//topic列表
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
//循环判断是否存在
for (String e :consumer.listTopics().keySet()){
System.out.println(e);
if (topic.equals(e)){
ok = true;
}
}
return ok;
}
其他步骤
更多推荐
已为社区贡献12条内容
所有评论(0)