Kafka写入流程:

1.producer 先从 zookeeper 的 “/brokers/…/state” 节点找到该 partition 的 leader
2. producer 将消息发送给该 leader
3. leader 将消息写入本地 log
4. followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK
5. leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK
文件组织
kafka的数据,实际上是以文件的形式存储在文件系统的。topic下有partition,partition下有segment,segment是实际的一个个文件,topic和partition都是抽象概念。

在目录/KaTeX parse error: Expected ‘}’, got ‘EOF’ at end of input: {topicName}-{partitionid}/下,存储着实际的log文件(即segment),还有对应的索引文件。

每个segment文件大小相等,文件名以这个segment中最小的offset命名,文件扩展名是.log;segment对应的索引的文件名字一样,扩展名是.index。有两个index文件,一个是offset index用于按offset去查message,一个是time index用于按照时间去查,其实这里可以优化合到一起,下面只说offset index。总体的组织是这样的:

为了减少索引文件的大小,降低空间使用,方便直接加载进内存中,这里的索引使用稀疏矩阵,不会每一个message都记录下具体位置,而是每隔一定的字节数,再建立一条索引。 索引包含两部分,分别是baseOffset,还有position。

baseOffset:意思是这条索引对应segment文件中的第几条message。这样做方便使用数值压缩算法来节省空间。例如kafka使用的是varint。
position:在segment中的绝对位置。
接下来弄清楚segment具体细节之后再说offset:

offset
个人认为发送的核心是围绕着offset来的,offset是一个连续的用于定位被追加到分区的每一个消息的序列号。一个消费组消费partition,需要保存offset记录消费到哪,这样保证一个partition内部消费的顺序性,以前保存在zk中,由于zk的写性能不好,以前的解决方法都是consumer每隔一分钟上报一次。这里zk的性能严重影响了消费的速度,而且很容易出现重复消费。在0.10版本后,kafka把这个offset的保存,从zk总剥离,保存在一个名叫__consumeroffsets topic的topic中。写进消息的key由groupid、topic、partition组成,value是偏移量offset。topic配置的清理策略是compact。总是保留最新的key,其余删掉。一般情况下,每个key的offset都是缓存在内存中,查询的时候不用遍历partition,如果没有缓存,第一次就会遍历partition建立缓存,然后查询返回。

确定consumer group位移信息写入__consumers_offsets的哪个partition,具体计算公式:

__consumers_offsets partition = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)
1
那么对于分区中的一个offset例如等于345552怎么去查找相应的message呢?

先找到该message所在的segment文件,通过二分查找的方式寻找小于等于345552的offset,假如叫S的segment符合要求,如果S等于345552则S上一个segment的最后一个message即为所求;如果S小于345552则依次遍历当前segment即可找到。实际上offset的存储采用了稀疏索引,这样对于稠密索引来说节省了存储空间,但代价是查找费点时间。

这里稍稍总结一下:由于kafka需要保证一个partition顺序消费,但是内部又分为很多个segment,那怎么保证顺序的呢,核心就是offset是不断递增的,而segment又是根据offset排序,所以整体是有序的。

kafka支持3种消息投递语义
At most once 消息至多会被发送一次,但如果产生网络延迟等原因消息就会有丢失。
At least once 消息至少会被发送一次,上面既然有消息会丢失,那么给它加一个消息确认机制即可解决,但是消息确认阶段也还会出现同样问题,这样消息就有可能被发送两次。
Exactly once 消息只会被发送一次,这是我们想要的效果。
那么kafka是怎么解决的呢?

生产幂等性:思路是这样的,为每个producer分配一个pid,作为该producer的唯一标识。producer会为每一个<topic,partition>维护一个单调递增的seq。类似的,broker也会为每个<pid,topic,partition>记录下最新的seq。当req_seq == broker_seq+1时,broker才会接受该消息。因为:

消息的seq比broker的seq大超过时,说明中间有数据还没写入,即乱序了。
消息的seq不比broker的seq小,那么说明该消息已被保存。
事务性/原子性广播:引入tid(transaction id),和pid不同,这个id是应用程序提供的,用于标识事务,和producer是谁并没关系。就是任何producer都可以使用这个tid去做事务,这样进行到一半就死掉的事务,可以由另一个producer去恢复。同时为了记录事务的状态,类似对offset的处理,引入transaction coordinator用于记录transaction log。在集群中会有多个transaction coordinator,每个tid对应唯一一个transaction coordinator。
注:transaction log删除策略是compact,已完成的事务会标记成null,compact后不保留。

做事务时,先标记开启事务,写入数据,全部成功就在transaction log中记录为prepare commit状态,否则写入prepare abort的状态。之后再去给每个相关的partition写入一条marker(commit或者abort)消息,标记这个事务的message可以被读取或已经废弃。成功后在transaction log记录下commit/abort状态,至此事务结束。

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐