前情提要:我这里只是读取kafka里面的数据,生产者已经配置好且会自动监控数据库的变化来推入kafka中,所以这里不对生产者做过多的解释。

一、死循环无限拉取kafka数据

1.1 整体框架剖析

1、要想从Kafka中读取数据,就要先对消费者信息进行配置

//1、创建消费者配置信息
        Properties properties = new Properties();
        //2、给配置信息赋值
        //2.1 kafka集群信息
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.9.220:9092,192.168.9.221:9092,192.168.9.222:9092");
        //2.2 开启自动提交offset 提交以后每次offset都在消费的最新位置
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
        //2.3 自动提交offset延时 1秒钟提交一次
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
        //2.4 key value的反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        //2.5 消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"base_db_app_210325");

2、消费者基本配置信息完成以后,创建消费者、订阅主题、为了后面的消费

 //创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        //订阅主题
        consumer.subscribe(Collections.singletonList("ticdc-paperfree-monitor"));

3、订阅主题,就相当于已经订阅了kafka中的消息,下一步就是消费。而kafka消费消息的方式是poll拉取,我们这里对kafka中的数据进行消费,上面我们选择了自动提交offset,那么每次offset就是在上一次消费完成以后的最新位置,所以我们接下来的每次消费得到的都是最新未消费的数据!

while (true) {
           //获取数据
           ConsumerRecords<String, String> poll = consumer.poll(100);

           //解析并打印
           for (ConsumerRecord<String, String> stringStringConsumerRecord : poll) {
               System.out.println("stringStringConsumerRecord.key() = " + stringStringConsumerRecord.key() + "------" + stringStringConsumerRecord.value() + "----" + stringStringConsumerRecord.topic());

           }
       }

1.2 测试

方法一:

1、创建MyConsumer1类,根据上面整体结构的剖析,添加如下代码,并进行测试。

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Collections;
import java.util.Properties;

/**
 * @author potential
 */
public class MyConsumer1 {


    public static void main(String[] args) {
        //1、创建消费者配置信息
        Properties properties = new Properties();
        //2、给配置信息赋值
        //2.1 kafka集群信息
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.9.220:9092,192.168.9.221:9092,192.168.9.222:9092");
        //2.2 开启自动提交offset
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
        //2.3 自动提交offset延时 1秒钟提交一次
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
        //2.4 key value的反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        //2.5 消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"base_db_app_210325");

        //创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        //订阅主题
        consumer.subscribe(Collections.singletonList("ticdc-paperfree-monitor"));

       while (true) {
           //获取数据
           ConsumerRecords<String, String> poll = consumer.poll(100);

           //解析并打印
           for (ConsumerRecord<String, String> stringStringConsumerRecord : poll) {
               System.out.println("stringStringConsumerRecord.key() = " + stringStringConsumerRecord.key() + "------" + stringStringConsumerRecord.value() + "----" + stringStringConsumerRecord.topic());

           }
       }
//        //关闭连接
//        consumer.close();

    }
}

方法二:
2、创建MyConsumer2类,根据上面整体结构的剖析,添加如下代码,并进行测试。

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;


/**
 * @author potential
 */
public class MyConsumer2 {


    public static void main(String[] args) {
        //配置必要的参数
        //准备一个map集合放置参数
        Map<String, Object> config = new HashMap<String, Object>();
        //bootserverscon
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.9.220:9092,192.168.9.221:9092,192.168.9.222:9092");
        //开启自动提交offset
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
        //自动提交offset延时 1秒钟提交一次
        config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");

        //valuedeserilizer
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , StringDeserializer.class);
        //groupid
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "base_db_app_210325");

        //如果找不到偏移量,设置earliest,则从最新消费开始,也就是消费者一开始最新消费的时候
        //一定要注意顺序,读取时候的顺序会影响
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

//        //此处是把消费者的偏移量重置到生产者最顶端
//        Map<TopicPartition, OffsetAndMetadata> hashMaps = new HashMap<TopicPartition, OffsetAndMetadata>();
//        hashMaps.put(new TopicPartition("ticdc-paperfree-monitor", 0), new OffsetAndMetadata(129));
        //消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(config);
//        //放置刚刚设置的偏移量
//        consumer.commitSync(hashMaps);
        //先订阅后消费
        consumer.subscribe(Arrays.asList("ticdc-paperfree-monitor"));

//        // 批量从主题的分区拉取消息
//        //final ConsumerRecords<Integer, String> consumerRecords = consumer.poll(3_000);
//        ConsumerRecords<String, String> consumerRecords = consumer.poll(3000);

        while (true) {
            //获取数据
            ConsumerRecords<String, String> poll = consumer.poll(100);

            //解析并打印
            for (ConsumerRecord<String, String> stringStringConsumerRecord : poll) {
                System.out.println("stringStringConsumerRecord.key() = " + stringStringConsumerRecord.key() + "------" + stringStringConsumerRecord.value() + "----" + stringStringConsumerRecord.topic());

            }
        }



//
//        //遍历本次从主题的分区拉取的批量消息 这里是将整个分区中的全部数据都拉出来了
//        consumerRecords.forEach(new java.util.function.Consumer<ConsumerRecord<Integer, String>>() {
//            @Override
//            public void accept(ConsumerRecord<Integer, String> consumerRecord) {
//                System.out.println(
//                        consumerRecord.topic() +"\t"
//                                +consumerRecord.offset() + "\t"
//                                +consumerRecord.key() +"\t"
//                                +consumerRecord.value()+"\t"
//                );
//            }
//        });
//        consumer.close();

    }
}

注意:
方式一、方式二只是写法上的不同,整体架构都是一样的,任选其一来写即可。
在这里插入图片描述至此,从kafka中读取最新数据的流程就全部结束了。

二、@KafkaListener注解 实现监听kafka数据

1、导入依赖

【我这里SpringBoot版本是2.2.13】

<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.3.1</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.3.7.RELEASE</version>
        </dependency>

注意:
1、springboot +2、kafka-clients +3、spring-kafka(在下图中体现为Sprig for Apache Kafka Version) 这三个 要注意版本对应。具体对应情况如下图所示:
在这里插入图片描述2、配置文件
application.yml文件中添加如下内容:

spring:
  kafka:
    consumer:
      bootstrap-servers: 192.168.9.220:9092,192.168.9.221:9092,192.168.9.222:9092 #集群信息
      producer: #生产者
        retries: 0 #设置大于0的值,则客户端将发送失败的记录重新发送
        batch-size: 16384 #批量大小
        buffer-momory: 33554432 #生产端缓冲区大小
        acks: 1 #应答级别
        #指定消息key和消息体的解编码方式  序列化与反序列化
        key- key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.apache.kafka.common.serialization.StringSerializ
      consumer:
        group-id: base_db_app_210325
        enable-auto-comnit: true #是否自动提交offset
        auto-offset-reset: latest #重置为分区中最新的offset(消费者分区中新产生的数据)
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        topic-name: ticdc-paperfree-monitor #主题
      listener:
        ack-mode: manual_immediate

3、创建MyConsumer类,添加如下内容:

 @KafkaListener(id = "test1", topics = "ticdc-paperfree-monitor")//这里id是随意起的,我这里叫test1,我这里主题直接写死,取ticdc-paperfree-monitor这个主题下的数据,也可以${},动态获取主题名称,group_id
    public void listen(ConsumerRecord<String, String> record) {
        //从Kafka中读取到的数据
        System.out.println("topic:" + record.topic());
        System.out.println("value:" + record.value());
        }

4、测试
运行主启动类,会自动进行监听且在程序运行的过程中将数据输出。
在这里插入图片描述

三、参考资料

https://blog.csdn.net/m0_67391270/article/details/126505944
https://blog.csdn.net/weixin_46271129/article/details/119800649

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐