【Kafka】从kafka中读取最新数据
【Kafka】从kafka中读取最新数据
【Kafka】从kafka中读取最新数据
前情提要:我这里只是读取kafka里面的数据,生产者已经配置好且会自动监控数据库的变化来推入kafka中,所以这里不对生产者做过多的解释。
一、死循环无限拉取kafka数据
1.1 整体框架剖析
1、要想从Kafka中读取数据,就要先对消费者信息进行配置
//1、创建消费者配置信息
Properties properties = new Properties();
//2、给配置信息赋值
//2.1 kafka集群信息
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.9.220:9092,192.168.9.221:9092,192.168.9.222:9092");
//2.2 开启自动提交offset 提交以后每次offset都在消费的最新位置
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
//2.3 自动提交offset延时 1秒钟提交一次
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
//2.4 key value的反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
//2.5 消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"base_db_app_210325");
2、消费者基本配置信息完成以后,创建消费者、订阅主题、为了后面的消费
//创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
//订阅主题
consumer.subscribe(Collections.singletonList("ticdc-paperfree-monitor"));
3、订阅主题,就相当于已经订阅了kafka中的消息,下一步就是消费
。而kafka消费消息的方式是poll拉取,我们这里对kafka中的数据进行消费,上面我们选择了自动提交offset,那么每次offset就是在上一次消费完成以后的最新位置,所以我们接下来的每次消费得到的都是最新未消费的数据!
while (true) {
//获取数据
ConsumerRecords<String, String> poll = consumer.poll(100);
//解析并打印
for (ConsumerRecord<String, String> stringStringConsumerRecord : poll) {
System.out.println("stringStringConsumerRecord.key() = " + stringStringConsumerRecord.key() + "------" + stringStringConsumerRecord.value() + "----" + stringStringConsumerRecord.topic());
}
}
1.2 测试
方法一:
1、创建MyConsumer1类,根据上面整体结构的剖析,添加如下代码,并进行测试。
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
/**
* @author potential
*/
public class MyConsumer1 {
public static void main(String[] args) {
//1、创建消费者配置信息
Properties properties = new Properties();
//2、给配置信息赋值
//2.1 kafka集群信息
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.9.220:9092,192.168.9.221:9092,192.168.9.222:9092");
//2.2 开启自动提交offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
//2.3 自动提交offset延时 1秒钟提交一次
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
//2.4 key value的反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
//2.5 消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"base_db_app_210325");
//创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
//订阅主题
consumer.subscribe(Collections.singletonList("ticdc-paperfree-monitor"));
while (true) {
//获取数据
ConsumerRecords<String, String> poll = consumer.poll(100);
//解析并打印
for (ConsumerRecord<String, String> stringStringConsumerRecord : poll) {
System.out.println("stringStringConsumerRecord.key() = " + stringStringConsumerRecord.key() + "------" + stringStringConsumerRecord.value() + "----" + stringStringConsumerRecord.topic());
}
}
// //关闭连接
// consumer.close();
}
}
方法二:
2、创建MyConsumer2类,根据上面整体结构的剖析,添加如下代码,并进行测试。
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;
/**
* @author potential
*/
public class MyConsumer2 {
public static void main(String[] args) {
//配置必要的参数
//准备一个map集合放置参数
Map<String, Object> config = new HashMap<String, Object>();
//bootserverscon
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.9.220:9092,192.168.9.221:9092,192.168.9.222:9092");
//开启自动提交offset
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
//自动提交offset延时 1秒钟提交一次
config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
//valuedeserilizer
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , StringDeserializer.class);
//groupid
config.put(ConsumerConfig.GROUP_ID_CONFIG, "base_db_app_210325");
//如果找不到偏移量,设置earliest,则从最新消费开始,也就是消费者一开始最新消费的时候
//一定要注意顺序,读取时候的顺序会影响
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// //此处是把消费者的偏移量重置到生产者最顶端
// Map<TopicPartition, OffsetAndMetadata> hashMaps = new HashMap<TopicPartition, OffsetAndMetadata>();
// hashMaps.put(new TopicPartition("ticdc-paperfree-monitor", 0), new OffsetAndMetadata(129));
//消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(config);
// //放置刚刚设置的偏移量
// consumer.commitSync(hashMaps);
//先订阅后消费
consumer.subscribe(Arrays.asList("ticdc-paperfree-monitor"));
// // 批量从主题的分区拉取消息
// //final ConsumerRecords<Integer, String> consumerRecords = consumer.poll(3_000);
// ConsumerRecords<String, String> consumerRecords = consumer.poll(3000);
while (true) {
//获取数据
ConsumerRecords<String, String> poll = consumer.poll(100);
//解析并打印
for (ConsumerRecord<String, String> stringStringConsumerRecord : poll) {
System.out.println("stringStringConsumerRecord.key() = " + stringStringConsumerRecord.key() + "------" + stringStringConsumerRecord.value() + "----" + stringStringConsumerRecord.topic());
}
}
//
// //遍历本次从主题的分区拉取的批量消息 这里是将整个分区中的全部数据都拉出来了
// consumerRecords.forEach(new java.util.function.Consumer<ConsumerRecord<Integer, String>>() {
// @Override
// public void accept(ConsumerRecord<Integer, String> consumerRecord) {
// System.out.println(
// consumerRecord.topic() +"\t"
// +consumerRecord.offset() + "\t"
// +consumerRecord.key() +"\t"
// +consumerRecord.value()+"\t"
// );
// }
// });
// consumer.close();
}
}
注意:
方式一、方式二只是写法上的不同,整体架构都是一样的,任选其一来写即可。
至此,从kafka中读取最新数据的流程就全部结束了。
二、@KafkaListener注解 实现监听kafka数据
1、导入依赖
【我这里SpringBoot版本是2.2.13】
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.7.RELEASE</version>
</dependency>
注意:
1、springboot +2、kafka-clients +3、spring-kafka(在下图中体现为Sprig for Apache Kafka Version) 这三个 要注意版本对应
。具体对应情况如下图所示:
2、配置文件
在application.yml
文件中添加如下内容:
spring:
kafka:
consumer:
bootstrap-servers: 192.168.9.220:9092,192.168.9.221:9092,192.168.9.222:9092 #集群信息
producer: #生产者
retries: 0 #设置大于0的值,则客户端将发送失败的记录重新发送
batch-size: 16384 #批量大小
buffer-momory: 33554432 #生产端缓冲区大小
acks: 1 #应答级别
#指定消息key和消息体的解编码方式 序列化与反序列化
key- key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializ
consumer:
group-id: base_db_app_210325
enable-auto-comnit: true #是否自动提交offset
auto-offset-reset: latest #重置为分区中最新的offset(消费者分区中新产生的数据)
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
topic-name: ticdc-paperfree-monitor #主题
listener:
ack-mode: manual_immediate
3、创建MyConsumer类,添加如下内容:
@KafkaListener(id = "test1", topics = "ticdc-paperfree-monitor")//这里id是随意起的,我这里叫test1,我这里主题直接写死,取ticdc-paperfree-monitor这个主题下的数据,也可以${},动态获取主题名称,group_id
public void listen(ConsumerRecord<String, String> record) {
//从Kafka中读取到的数据
System.out.println("topic:" + record.topic());
System.out.println("value:" + record.value());
}
4、测试
运行主启动类
,会自动进行监听且在程序运行的过程中将数据输出。
三、参考资料
https://blog.csdn.net/m0_67391270/article/details/126505944
https://blog.csdn.net/weixin_46271129/article/details/119800649
更多推荐
所有评论(0)