思想

Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。清理策略针对的是过期的segment文件,而不是某条过期的数据。可以单独针对某topic配置,也可针对kafka集群配置(config/server.properties)。策略分三种:基于时间,基于日志文件大小,基于日志文件起始偏移量。为了避免在删除时阻塞读操作,采用了copy-on-write形式的实现,删除操作进行时,读取操作的二分查找功能实际是在一个静态的快照副本上进行的,这类似于Java的CopyOnWriteArrayList。

topic级别修改命令

修改单个topic清理策略:bin/kafka-configs.sh --zookeeper ZK_IP:2181 --describe --entity-type topics --entity-name test

删除topic:bin/kafka-topics.sh --delete --topic test --zookeeper ZK_IP:2181

保留topic,删除数据,通过修改数据保留时间实现(如下:保留10s):bin/kafka-configs.sh --zookeeper ZK_IP:2181 --entity-type topics --entity-name test --alter --add-config retention.ms=10000

查看topic清理策略:bin/kafka-configs.sh --zookeeper ZK_IP:2181 --describe --entity-type topics --entity-name test

删除策略:bin/kafka-configs.sh --zookeeper ZK_IP:2181 --entity-type topics --entity-name test --alter --delete-config retention.ms

配置文件级别删除

通过log.cleanup.policy,分为删除和压缩。

删除策略

当日志达到log.segment.bytes大小,会创建新的segment。当segment超过log.segment.bytes或保留时长达到log.retention.hours,就会被清理掉。

属性名含义默认值
log.cleanup.polict日志清理保存策略,delete/compactdelete
log.retention.hours日志保存时间,可以选择hours,minutes和ms168(7day)
log.retention.bytes删除前日志文件允许保存的最大值-1
log.segment.delete.delay.ms日志文件被真正删除前的保留时间60000(1min)
log.cleanup.interval.mins清理工作执行时间间隔10
log.retention.check.interval.ms清理工作执行时间间隔(新版本)300000

达到清理条件的日志文件,进行“delete”标注,文件无法被索引到。log.segment.delete.delay.ms这个时间后,文件才会被真正的从文件系统中删除。

配置server.properties,添加删除策略:

# 默认delete策略
# 日志保留时间
log.retention.hours=168

# 超过此大小的segment被删除,默认1G
#log.retention.bytes=1073741824

# 达到此大小,创建一个新的segment,默认1G
log.segment.bytes=1073741824

# 清理工作执行的间隔
log.retention.check.interval.ms=300000
刷新策略
属性名含义默认值
log.flush.interval.messages消息达到该条时,将数据写入到日志文件10000
log.flush.interval.ms当达到该时间间隔时,强制执行一次flushnull
log.flush.scheduler.interval.ms周期性检查,是否需要将消息flush-
分段策略属性
属性名含义默认值
log.roll.{hours,ms}日志滚动的周期时间,到达指定周期时间时,强制生成一个新的segment168(7day)
log.segment.bytes每个segment的最大容量。到达指定容量时,将强制生成一个新的segment1G(-1表示不限制)
log.retention.check.interval.ms日志片段文件检查的周期时间60000

压缩

将数据压缩,只保留每个key最后一个版本的数据,offset可能不连续。

#启用cleaner,默认是关闭的
log.cleaner.enable=true
log.cleanup.policy=compact

这种策略只适合特殊场景,比如消息的key是用户ID,消息体是用户的资料,通过这种压缩策略,整个消息集里就保存了所有用户最新的资料。压缩策略支持删除,当某个Key的最新版本的消息没有内容时,这个Key将被删除,这也符合以上逻辑。

据起始偏移量删除

日志配置

修改日志级别为INFO,防止长时间产生过多日志。将config/log4j.properties中必要选项改为INFO。
修改日志(server.log, controller.log, state-change.log等)存放位置,放置长时间产生日志占满磁盘。可以将日志目录配置到更大空间的分区盘。可通过更改kafka-run-class.sh中的LOG_DIR变量值实现。

参考

https://blog.csdn.net/shykevin/article/details/90103364.
http://www.openskill.cn/article/550.
https://blog.csdn.net/chenyixin121738/article/details/109590355.
https://blog.csdn.net/u013256816/article/details/80418297.

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐