首先需要创建maven项目导入坐标

<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.2.1</version>
        </dependency>

接下来,直接上代码

package com.example.demo;


import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.junit.Test;

import java.util.*;
import java.util.concurrent.ExecutionException;


public class KafkaTest2 {
    @Test
    public void DemoTest() throws ExecutionException, InterruptedException {

        Properties properties=new Properties();
        //主机信息查看kafka/config对应目录下advertised.host.name
        properties.put("bootstrap.servers","对应的主机ip:端口号");
        //这个id在kafka目录下使用bin/kafka-consumer-groups.sh --zookeeper 对应的zookeeperip:2181 --list命令查看
        properties.put("group.id", "test");
        
        properties.put("enable.auto.commit", "true");
        
        properties.put("auto.commit.interval.ms", "1000");
        
        properties.put("auto.offset.reset", "earliest");
        /**
         * 消费者在指定的时间内没有发送心跳给群组协调器,就被认为已经死亡,
         * 协调器就会触发再均衡,把它的分区分配给其他消费者。
         */
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);

        kafkaConsumer.assign(Arrays.asList(new TopicPartition("对应的topic",0)));
        /**
         * 订阅主题,这个地方只传了一个主题:gys.
         * 这个地方也可以有正则表达式。
         */
//        kafkaConsumer.subscribe(Arrays.asList("mt1"));
        Map<String, List<PartitionInfo>> stringListMap = kafkaConsumer.listTopics();

        System.out.println(stringListMap.get("对应的topic").stream());
        ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
        //查询最新的topic信息数量
        records.count();
//  查询topic具体内容
//        for (ConsumerRecord<String, String> record : records) {
//            //Thread.sleep(1000);
//            System.out.printf("offset = %d, value = %s", record.offset(), record.value());
//            System.out.println();
//        }
//如果想实时获取可以使用while
		//        while (true) {
//            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
//            for (ConsumerRecord<String, String> record : records) {
//                //Thread.sleep(1000);
//                System.out.printf("offset = %d, value = %s", record.offset(), record.value());
//                System.out.println();
//            }
//        }
    }

}

另外,查询所有的topic可以在kafka目录下使用bin/kafka-topics.sh --zookeeper 对应的zookeeperip:2181 --list
也可以使用java代码:

        ZkUtils zkUtils = ZkUtils.apply("对应的zookeeperip:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled());
        Map<String, Properties> topics = JavaConverters.mapAsJavaMapConverter(AdminUtils.fetchAllTopicConfigs(zkUtils))
                .asJava();
        for (Map.Entry<String, Properties> entry : topics.entrySet()) {
            String key = entry.getKey();
            Object value = entry.getValue();
            System.out.println(key + ":" + value);
        }
        zkUtils.close();
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐