kafka的offset存储位置以及offset的提交方式
一 offset的存储位置1.1 存储位置1.从0.9版本开始,consumer默认将offset保存在Kafka 一个内置的topic中,该topic为__consumer_offsets2.Kafka0.9版本之前,consumer默认将offset保存在Zookeeper中。__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.i
·
一 offset的存储位置
1.1 存储位置
1.从
0.9
版本开始,
consumer
默认将
offset
保存在
Kafka 一个内置的topic
中,该
topic
为
__consumer_offsets
2.
Kafka0.9
版本之前,
consumer
默认将
offset
保存在
Zookeeper
中。
分区号
,
value
就是当前
offset
的值。每隔一段时间,
kafka
内部会对这个
topic
进行
compact
,也就是每个
group.id+topic+分区号就保留最新数据
。
1.2 消费offset案例
1.首先在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false,
默认是
true
,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为
false。
2.
采用命令行方式,创建一个新的
topic
。
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic
atguigu
--partitions 2 --
replication-factor 2
3.启动生产者往
atguigu
生产数据
bin/kafka-console-producer.sh --topic atguigu --bootstrap-server hadoop102:9092
4.消费数据
bin/kafka-console-consumer.sh -- bootstrap-server hadoop102:9092 --topic atguigu
--group test
5.
查看消费者消费主题
__consumer_offsets
二 offset的提交方式
2.1 自动提交方式
为了使我们能够专注于自己的业务逻辑,
Kafka提供了自动提交offset的功能。
自动提交
offset
的相关参数:
⚫
enable.auto.commit
:
是否开启自动提交
offset
功能,默认是
true
⚫
auto.commit.interval.ms
:
自动提交
offset
的时间间隔,默认是
5s
2.1.1 代码部分设置
//
是否自动提交
offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
true);
//
提交
offset
的时间周期
1000ms
,默认
5s
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
1000);
2.2 手动提交方式
虽然自动提交
offset
十分简单便利,但由于其是基于时间提交的,开发人员难以把握
offset
提交的时机。因 此Kafka
还提供了手动提交
offset
的
API
。
手动提交
offset
的方法有两种:分别是
commitSync
(同步提交)
和
commitAsync
(异步提交)
。两者的相 同点是,都会将
本次提交的一批数据最高的偏移量提交
;不同点是,
同步提交阻塞当前线程
,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而
异步提交则没有失败重试机制,故 有可能提交失败。
•
commitSync
(同步提交):必须等待
offset提交完毕,再去消费下一批数据
。并且会自动失败重试
•
commitAsync
(异步提交) :发送完提交
offset请求后,就开始消费下一批数据了。没有失败重试机制
2.2.1 同步提交
由于同步提交
offset
有失败重试机制,故更加可靠,但是由于一直等待提交结果,提
交的效率比较低。以下为同步提交
offset
的示例
2.3.2 异步提交
虽然同步提交
offset
更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此
吞吐量会受到很大的影响。
因此更多的情况下,会选用异步提交 offset 的方式。
更多推荐
已为社区贡献18条内容
所有评论(0)