kafka消费偏移量维护的方式有两种:

  • kafka自身维护
  • zookeeper维护

1.kafka维护消费偏移

1.1 查看kafka维护的消费组列表

./bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list

$ ./bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
The [new-consumer] option is deprecated and will be removed in a future major release.The new consumer is used by default if the [bootstrap-server] option is provided.
Note: This will not show information about old Zookeeper-based consumers.
creditpayment_db_t_partner_pre_apply_group
fund_trade_center_db_t_oc_task_complete_apply_group
cardloan_user_count_group_test
console-consumer-55818
p2p_db_t_user_info_group_test
p2p_db_t_user_invest_group_test
apply_db_group

……

1.2 查看 kafka维护的某个消费组

./bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --group apply_db_group --describe

$ ./bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --group apply_db_group --describe
The [new-consumer] option is deprecated and will be removed in a future major release.The new consumer is used by default if the [bootstrap-server] option is provided.
Note: This will not show information about old Zookeeper-based consumers.

TOPIC                                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                      HOST            CLIENT-ID
sdc2loan_cardloan-apply_s_binlog_pro 0          11605539        11605554        15              consumer-97-1acf945d-a2d3-4be8-8660-410c036e289d /10.2.0.45      consumer-97

1.3 kafka维护的消费偏移

如果是用kafka默认的api【org.apache.kafka.clients.consumer.KafkaConsumer】来消费,其消费者的offset会更新到一个kafka自带的topic下:__consumer_offsets

  • __consumer_offsets默认有50个分区

[zk: localhost:2181(CONNECTED) 4] ls /kafka/brokers/topics/__consumer_offsets/partitions
[44, 45, 46, 47, 48, 49, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43]

  • 查看__consumer_offsets主题所有信息

./bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server localhost:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning

  •  查看__consumer_offsets主题下某个分区所有信息

 bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9092,localhost:9093,localhost:9094 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"

  • 计算指定group在__consumer_offsets的哪个分区

    Math.abs(groupID.hashCode()) % numPartitions

    所以在本例中,对应的分区=Math.abs("console-consumer-46965".hashCode()) % 50 = 11,即__consumer_offsets的分区11保存了这个consumer group的位移信息。

2.zookeeper维护消费偏移

2.1 创建生产者

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>1
>2
>3
>1

2.2 创建消费组

bin/kafka-console-consumer.sh --zookeeper localhost:2181/kafka --topic test --consumer-property group.id=my_group

bin/kafka-console-consumer.sh --zookeeper localhost:2181/kafka --topic test --consumer-property group.id=my_group
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
3
1
2
1

2.3 查看zookeeper维护的消费组列表

./bin/kafka-consumer-groups.sh --zookeeper localhost:2181/kafka --list 

$ ./bin/kafka-consumer-groups.sh --zookeeper localhost:2181/kafka --list
Note: This will only show information about consumers that use ZooKeeper (not those using the Java consumer API).
console-consumer-7101
console-consumer-82464
my_group
console-consumer-2010

2.3 查看zookeeper维护的某个消费组

./bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --group test-consumer-group1 --describe

$ ./bin/kafka-consumer-groups.sh --zookeeper localhost:2181/kafka --group my_group1 --describe
Note: This will only show information about consumers that use ZooKeeper (not those using the Java consumer API).

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     
test            0          15              17              2               -               
test            1          16              18              2               -               
test            2          14              16              2               -      

2.4 zookeeper维护的消费偏移

如果是用javaapi【kafka.javaapi.consumer.ConsumerConnector】来消费,消费者的offset会更新到zookeeper目录下:/kafka/consumers/{group}/offsets/{topic}/{partition}

get /kafka/consumers/my_group1/offsets/test/0

[zk: localhost:2181(CONNECTED) 5] get /kafka/consumers/my_group1/offsets/test/0
15
cZxid = 0x328a8e539
ctime = Tue Apr 14 19:01:13 CST 2020
mZxid = 0x328b43eac
mtime = Tue Apr 14 23:51:14 CST 2020
pZxid = 0x328a8e539
cversion = 0
dataVersion = 4
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 2
numChildren = 0

2.5 查看topic的offset的范围

查看offset的最小值

 $ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic test --time -2     
test:2:0
test:1:0
test:0:0

查看offset的最大值

$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic test --time -1
test:2:16
test:1:18
test:0:17

3. 消费偏移的更新方式

无论是kafka默认api,还是java的api,offset的更新方式都有两种:自动提交和手动提交

3.1 自动提交(默认方式)

Kafka中偏移量的自动提交是由参数enable_auto_commit和auto_commit_interval_ms控制的,当enable_auto_commit=True时,Kafka在消费的过程中会以频率为auto_commit_interval_ms向Kafka自带的topic(__consumer_offsets)进行偏移量提交,具体提交到哪个Partation:Math.abs(groupID.hashCode()) % numPartitions。

这种方式也被称为at most once,fetch到消息后就可以更新offset,无论是否消费成功。

3.2 手动提交

鉴于Kafka自动提交offset的不灵活性和不精确性(只能是按指定频率的提交),Kafka提供了手动提交offset策略。手动提交能对偏移量更加灵活精准地控制,以保证消息不被重复消费以及消息不被丢失。

对于手动提交offset主要有3种方式:1.同步提交  2.异步提交  3.异步+同步 组合的方式提交

3.2.1 同步提交

  • 提交失败的时候一直尝试提交,直到遇到无法重试的情况下才会结束
  • 同步方式下消费者线程在拉取消息会被阻塞,在broker对提交的请求做出响应之前,会一直阻塞直到偏移量提交操作成功或者在提交过程中发生异常,限制了消息的吞吐量。

3.2.1 异步提交

  • 异步手动提交offset时,消费者线程不会阻塞,提交失败的时候也不会进行重试,并且可以配合回调函数在broker做出响应的时候记录错误信息。
  • 对于异步提交,由于不会进行失败重试,当消费者异常关闭或者触发了再均衡前,如果偏移量还未提交就会造成偏移量丢失。

3.2.1 异步+同步

  • 针对异步提交偏移量丢失的问题,通过对消费者进行异步批次提交并且在关闭时同步提交的方式,这样即使上一次的异步提交失败,通过同步提交还能够进行补救,同步会一直重试,直到提交成功。
  • 通过finally在最后不管是否异常都会触发consumer.commit()来同步补救一次,确保偏移量不会丢失

参考

Kafka提交offset机制 - HarvardFly - 博客园

Kafka常用命令合集 - 请叫我头头哥 - 博客园

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐