近期生产出现了因代码异常造成的数据消费错误问题

# kafka

问题整理如下:

  • 1、因代码问题导致消费者在使用过程中逻辑不是幂等的,造成数据库插入数据,单kafka不提交offset,当再次启动kafka时,存在重复消费的情况,造成数据库流入脏数据
  • 2、在修复数据的过程中,因kafka的offset错误重置,导致部分历史消息进行错误消费。
  • 3、存在消费者因数据库链接等问题,造成在kafka允许时间内不进行提交,kafka进行重新分配的过程。

引出有关于kafka的相关问题

一、kafka的消息是如何生产的
二、kafka的消息是如何存储
三、kafka的消息是如何消费的

四、重置及查询过程中可能涉及到的一些命令


一、kafka的消息是如何生产的

首先生产者在生产消息的时候会存在topic分区的概念,一个topic存在多个partion,那么生产的消息进入具体哪个partion遵循四种策略方式,目前生产上基本基于以下按照key的分配方式

按照key来定义分区,存在一个问题,就是数据倾斜的问题,比如我们使用公司id来作为key进行消息发送,因为key值相同,那么会出现一个公司数据量很大,一个公司数据量很小,量大的公司会分配进入一个分区,该分区的消息数量要远超过其他分区,从而给该节点服务器会造成巨大的压力和数据的阻塞。
二、kafka的消息是如何存储
  • 已知消息已经通过生产者生产到对应的partition中。 参考的链接:[1]有关kafka存储的参考文献
  • 一个partion中,存在多个segment 每个partion(目录)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中。但每个段segment file消息数量不一定相等,这种特性方便old segment file快速被删除。

每个partiton只需要支持顺序读写就行了,segment文件生命周期由服务端配置参数决定。

写message
消息从java堆转入page cache(即物理内存)。
由异步线程刷盘,消息从page cache刷入磁盘。

segment file组成:由2大部分组成,分别为index file和data file,此2个文件一一对应,成对出现,后缀”.index”和“.log”分别表示为segment索引文件、数据文件.
segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。

通过offset定位到一条信息的过程
  1. 第一步通过offset=368776的message,其中00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0.第二个文件00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1.同样,第三个文件00000000000000737337.index的起始偏移量为737338=737337 + 1,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据offset 二分查找文件列表,就可以快速定位到具体文件。 当offset=368776时定位到00000000000000368769.index|log
  2. 第二步通过segment file查找message 通过第一步定位到segment file,当offset=368776时,依次定位到00000000000000368769.index的元数据物理位置和00000000000000368769.log的物理偏移地址,然后再通过00000000000000368769.log顺序查找直到offset=368776为止。
kafka这样使用的优点:
  1. Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
  2. 通过索引信息可以快速定位message和确定response的最大大小。
  3. 通过index元数据全部映射到memory,可以避免segment file的IO磁盘操作。
  4. 通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小。
三、kafka的消息是如何消费的

topic partition offset 这三个唯一确定一条消息。
消息的消费模型有两种,推送模型(push)和拉取模型(pull)。

  • 基于推送模型(push)的消息系统,消息代理在将消息推送到消费者后,标记这条消息已经消费,但这种方式无法很好地保证消费被处理。
  • 缺点:标记为消费后,其他消费者则不可以再消费了,不可取。
  • 用pull拉取模型,由消费者自己记录消费状态,每个消费者互相独立地顺序拉取每个分区的消息。由消费者控制偏移量的优点是:消费者可以按照任意的顺序消费消息。比如,消费者可以重置到旧的偏移量,重新处理之前已经消费过的消息;或者直接跳到最近的位置,从当前的时刻开始消费。

读message
消息直接从page cache转入socket发送出去。
当从page cache没有找到相应数据时,此时会产生磁盘IO,从磁 盘Load消息到page cache,然后直接从socket发出去

生产上可能存在几种情况
1、消费者消费的速度跟不上生产者生产消息的速度
2、生产者生产的速度跟不上消费的速度
3、topic的某一个partion挂掉了

以上几种情况可能会导致当前的消费者无法去消费信息了,必然会影响业务的使用
kafka构建了rebalance机制,也就是在均衡的机制

发生rebalance的情况有以下几种:
1、消费者组的consumer个数发生了变化
2、订阅的topic个数发生了变化
3、分区发生了变化
等等

**rebalance过程中,所有的消费者将不再进行消费,直到rebalance过程完成

消费者分区分配策略默认使用Range范围分配策略
计算公式为:
n = 分区数量/消费者数量
m = 分区数量%消费者数量

前m个消费者消费n+1个,剩余消费者消费n个

按照这个公式来计算,3个消费者,7个分区

本身还存在RoundRobin策略和Stricky粘性分配策略,本次先不赘述
可参考:[2]

四、kafka常用命令

本次记录offset重置命令

./kafka-consumer-groups.sh --execute --bootstrap-server 172.17.195.178:9092 --group testaaa --reset-offsets --topic thirdparty_clue_callback:0  --to-offset 470
//将testaaa消费者组中 topic为thirdparty_clue_callback,分区为0的offset重置为470

./kafka-consumer-groups.sh --describe --bootstrap-server 172.17.195.178:9092 --group testaaa
//查询消费者组数据

./kafka-topics.sh --zookeeper localhost:2181 --describe --topic tanma_shop_hupan_topic_trade
//查询topic数据

//创建外部订单写入topic
./kafka-topics.sh --zookeeper localhost:2181 --create --topic tanma_mall_order_topic --partitions 12  --replication-factor 2

还有一些借鉴的其他命令[3]

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐