Kafka版本:2.2.1

环境:CDH

日志分段(segment)

格式

在kafka数据存储的目录下,进入topic文件目录,可以看到多个文件,如下,从文件名可以看出,.log.index.timeindex文件一一对应:

-rw-r--r--. 1 kafka kafka    245432 Feb 25 13:01 00000000000000000000.index
-rw-r--r--. 1 kafka kafka 909769306 Feb 25 11:31 00000000000000000000.log
-rw-r--r--. 1 kafka kafka    343392 Feb 25 13:01 00000000000000000000.timeindex
-rw-r--r--. 1 kafka kafka  10485760 Mar  1 08:56 00000000000001778276.index
-rw-r--r--. 1 kafka kafka 265569655 Mar  1 08:56 00000000000001778276.log
-rw-r--r--. 1 kafka kafka        10 Feb 25 13:01 00000000000001778276.snapshot
-rw-r--r--. 1 kafka kafka  10485756 Mar  1 08:56 00000000000001778276.timeindex

文件说明

文件类别 作用
.index 消息的物理地址的偏移量索引文件
.timeindex 映射时间戳和相对offset的时间戳索引文件
log 日志文件(消息存储文件)
.snapshot 对幂等型或者事务型producer所生成的快照文件
leader-epoch-checkpoint 保存了每一任leader开始写入消息时的offset, 会定时更新

参数配置

  • .log文件可以通过server.properties文件进行配置

    # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    log.segment.bytes=1073741824(1G)
    
  • 文件的存储目录可以通过server.properties进行配置

    # A comma separated list of directories under which to store log files
    log.dirs=/tmp/kafka-logs
    

log文件内容

输出批消息日志

命令

kafka-dump-log --files 00000000000001778276.log

输出内容

baseOffset: 1778276 lastOffset: 1778362 count: 87 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1645765269680 size: 44169 magic: 2 compresscodec: NONE crc: 1917853349 isvalid: true
baseOffset: 1778363 lastOffset: 1778431 count: 69 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 44169 CreateTime: 1645765272680 size: 34959 magic: 2 compresscodec: NONE crc: 1371007139 isvalid: true
baseOffset: 1778432 lastOffset: 1778446 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 79128 CreateTime: 1645765272681 size: 7797 magic: 2 compresscodec: NONE crc: 464128466 isvalid: true
baseOffset: 1778447 lastOffset: 1778511 count: 65 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 86925 CreateTime: 1645765275680 size: 33145 magic: 2 compresscodec: NONE crc: 992223742 isvalid: true
baseOffset: 1778512 lastOffset: 1778529 count: 18 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 120070 CreateTime: 1645765275681 size: 9309 magic: 2 compresscodec: NONE crc: 1123370590 isvalid: true

输出具体数据

命令

kafka-dump-log --files 00000000000001778276.log --print-data-log

输出内容

baseOffset: 1778276 lastOffset: 1778362 count: 87 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1645765269680 size: 44169 magic: 2 compresscodec: NONE crc: 1917853349 isvalid: true
| offset: 1778276 CreateTime: 1645765269675 keysize: -1 valuesize: 501 sequence: -1 headerKeys: [] payload: 20220225    13:01:09
| offset: 1778277 CreateTime: 1645765269675 keysize: -1 valuesize: 510 sequence: -1 headerKeys: [] payload: 20220225    13:01:09
……
| offset: 1778361 CreateTime: 1645765269680 keysize: -1 valuesize: 500 sequence: -1 headerKeys: [] payload: 20220225    13:01:09
| offset: 1778362 CreateTime: 1645765269680 keysize: -1 valuesize: 458 sequence: -1 headerKeys: [] payload: 20220225    13:01:09
baseOffset: 1778363 lastOffset: 1778431 count: 69 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 44169 CreateTime: 1645765272680 size: 34959 magic: 2 compresscodec: NONE crc: 1371007139 isvalid: true
| offset: 1778363 CreateTime: 1645765272675 keysize: -1 valuesize: 501 sequence: -1 headerKeys: [] payload: 20220225    13:01:12
……
| offset: 1778431 CreateTime: 1645765272680 keysize: -1 valuesize: 517 sequence: -1 headerKeys: [] payload: 20220225    13:01:12
baseOffset: 1778432 lastOffset: 1778446 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 79128 CreateTime: 1645765272681 size: 7797 magic: 2 compresscodec: NONE crc: 464128466 isvalid: true
| offset: 1778432 CreateTime: 1645765272680 keysize: -1 valuesize: 519 sequence: -1 headerKeys: [] payload: 20220225    13:01:12
……

分析

  1. 一条批消息(Record Batch)日志记录,包含多条的实际数据
  2. lastOffset - baseOffset + 1 = countbaseOffsetlastOffset为批消息中第一条记录和最后一条记录的偏移量,在整个topic分区中的偏移量,count为批消息中包含的记录数
  3. position为批消息的起始位置(这里应该是物理地址),每个log文件中,position 都是从0开始的
  4. CreateTime在批消息中为批消息中最后一条记录的创建时间,在记录中为记录的创建时间
  5. size为批消息的大小,从内容可以看出,前一条批消息的position,加上前一条消息的size,结果为后一条消息的position
  6. offset为记录的偏移量,整个topic分区中的偏移量
  7. valuesize为一条记录的大小
  8. 其他字段有些没有用到,所以显示-1,有些暂时也不清楚

偏移量索引

.index文件内容

命令:

kafka-dump-log --files 00000000000001778276.index

输出内容:

Dumping 00000000000001778276.index
offset: 1778431 position: 44169
offset: 1778446 position: 79128
offset: 1778511 position: 86925
offset: 1778529 position: 120070
offset: 1778592 position: 129379
offset: 1778608 position: 161537
offset: 1778672 position: 169791
offset: 1778694 position: 202354
offset: 1778760 position: 213640
……

分析

个人参考资料分析,不具有权威

结合**.index.log**文件内容可以看出:

  1. .index文件中的offset对应log文件中的lastOffset
  2. .index文件中的position对应log文件中的position

Kafka的数据都是按序插入的,offset也是按序增长的,因此很适合用二分查找定位指定偏移量的数据(网上也都是说用的二分查找):

  1. 根据二分查找,定位数据所在的logindex文件
  2. 通过二分查找,定位最大不大于指定偏移量的offset
  3. 根据查找到的offset,定位到批消息位置
  4. 在批消息中通过position定位到指定数

时间戳索引

.timeindex文件内容

命令:

kafka-dump-log --files 00000000000001778276.timeindex

输出内容:

Dumping 00000000000001778276.timeindex
timestamp: 1645765272680 offset: 1778427
timestamp: 1645765272681 offset: 1778442
timestamp: 1645765275680 offset: 1778507
timestamp: 1645765275681 offset: 1778521
timestamp: 1645765278680 offset: 1778583
timestamp: 1645765278681 offset: 1778597
timestamp: 1645765281680 offset: 1778667

分析

个人参考资料分析,不具权威

在**.timeindex**文件中:

  1. 一条记录中的timestampoffset,表示创建时间为timestamp的第一条记录的offset,即创建时间为1645765272680的多条记录中,最小的偏移量为1778427
  2. 网上的资料都说,.timeindexindex文件中的offset是一致的,这与我实际查询的不符

根据时间戳定位指定数据:

  1. 查找该时间戳应该在哪个日志分段中。将指定的时间戳和每个日志分段中最大时间戳largestTimeStamp逐一对比,直到找到最小不小于指定时间戳所对应的日志分段

    日志分段中的largestTimeStamp的计算是:先查询该日志分段所对应时间戳索引文件,找到最后一条索引项,若最后一条索引项的时间戳字段值大于0,则取该值,否则取该日志分段的最近修改 时间

  2. 找到日志分段后,在相应的timeindex文件中,通过二分查找,找到最大不大于指定时间戳的offset

  3. log文件中,通过offset进行顺序查找

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐