java连接kafka查询对应topic消费具体信息
首先需要创建maven项目导入坐标<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>0.10.2.1</version></dependency>接下来,直接上代码pack
·
首先需要创建maven项目导入坐标
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.2.1</version>
</dependency>
接下来,直接上代码
package com.example.demo;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.junit.Test;
import java.util.*;
import java.util.concurrent.ExecutionException;
public class KafkaTest2 {
@Test
public void DemoTest() throws ExecutionException, InterruptedException {
Properties properties=new Properties();
//主机信息查看kafka/config对应目录下advertised.host.name
properties.put("bootstrap.servers","对应的主机ip:端口号");
//这个id在kafka目录下使用bin/kafka-consumer-groups.sh --zookeeper 对应的zookeeperip:2181 --list命令查看
properties.put("group.id", "test");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("auto.offset.reset", "earliest");
/**
* 消费者在指定的时间内没有发送心跳给群组协调器,就被认为已经死亡,
* 协调器就会触发再均衡,把它的分区分配给其他消费者。
*/
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
kafkaConsumer.assign(Arrays.asList(new TopicPartition("对应的topic",0)));
/**
* 订阅主题,这个地方只传了一个主题:gys.
* 这个地方也可以有正则表达式。
*/
// kafkaConsumer.subscribe(Arrays.asList("mt1"));
Map<String, List<PartitionInfo>> stringListMap = kafkaConsumer.listTopics();
System.out.println(stringListMap.get("对应的topic").stream());
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
//查询最新的topic信息数量
records.count();
// 查询topic具体内容
// for (ConsumerRecord<String, String> record : records) {
// //Thread.sleep(1000);
// System.out.printf("offset = %d, value = %s", record.offset(), record.value());
// System.out.println();
// }
//如果想实时获取可以使用while
// while (true) {
// ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
// for (ConsumerRecord<String, String> record : records) {
// //Thread.sleep(1000);
// System.out.printf("offset = %d, value = %s", record.offset(), record.value());
// System.out.println();
// }
// }
}
}
另外,查询所有的topic可以在kafka目录下使用bin/kafka-topics.sh --zookeeper 对应的zookeeperip:2181 --list
也可以使用java代码:
ZkUtils zkUtils = ZkUtils.apply("对应的zookeeperip:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled());
Map<String, Properties> topics = JavaConverters.mapAsJavaMapConverter(AdminUtils.fetchAllTopicConfigs(zkUtils))
.asJava();
for (Map.Entry<String, Properties> entry : topics.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
System.out.println(key + ":" + value);
}
zkUtils.close();
更多推荐
已为社区贡献1条内容
所有评论(0)