简介

Kafka在运行的过程中,存储在磁盘上的数据会逐渐扩大,甚至会撑爆系统盘,在线上环境我们通常会把kafka的数据存储目录和日志存储目录迁移到磁盘中,或者扩容kafka的存储磁盘。本文将一站式解决kafka的磁盘存储或扩容问题。

一.kafka的存储目录

kafka配置文件默认的存储路径是tmp/kafka-logs,如果你修改了kafka的配置文件,那么kafka的数据存储路径,在配置文件中的配置项是:

log.dirs=/mnt/kafka/data

除了数据要存储,还有kafka的运行日志文件,日积月累也会变大,所以我们最好也把他迁移到大的磁盘中。kafka的日志文件,默认存储在kafka安装目录的logs文件夹中,在v2.7.1及一下的版本中,都无法在配置文件中直接定义修改,只能修改其运行文件/kafka/bin/kafka-run-class.sh(下文再讲如何改)。

二.迁移流程

本文环境及迁移目录
条目内容
运行环境centos 6.5
kafka版本v2.7.1
kafka安装目录usr/local/kafka
kafka数据目录usr/local/kafka/data
kafka日志目录usr/local/kafka/logs
kafka数据迁移后目录/mnt/kafka/data
kafka日志迁移后目录/mnt/kafka/log

1.关闭kafka

执行命令关闭kafka,如果是集群需要在每个节点上都执行下方命令关闭kafka。

cd kafka/bin 
./kafka-server-stop.sh 

2.迁移数据

再每个节点服务器上都执行下方命令:

①.创建数据目录和日志目录,并授权读写
mkdir -p /mnt/kafka/data
chmod -R o+r+w /mnt/kafka/data
chmod -R o+r+w /mnt/kafka/data/

mkdir -p /mnt/kafka/logs
chmod -R o+r+w /mnt/kafka/logs
chmod -R o+r+w /mnt/kafka/logs/
②.将数据迁移到新目录

我们在此先选择复制,万一集群出现问题可以立即恢复,等迁移后运行几天发现没有异常后,再删除旧数据(参见步骤6)。

迁移数据:
cp -r   /usr/local/kafka/data/*  /mnt/kafka/data

迁移日志:
cp -r   /usr/local/kafka/logs/*  /mnt/kafka/logs

3.修改kafka日志存储目录

①.修改kafka-run-class.sh文件

/usr/local/kafka/bin/kafka-run-class.sh ,默认文件中,日志存储目录定义如下:

# Log directory to use
if [ "x$LOG_DIR" = "x" ]; then
  LOG_DIR="$base_dir/logs"
fi

我们需要在这个脚本中,增加自己的日志存储路径:

增加一行脚本(LOG_DIR=/mnt/kafka/logs):

# Log directory to use
LOG_DIR=/mnt/kafka/logs
if [ "x$LOG_DIR" = "x" ]; then
  LOG_DIR="$base_dir/logs"
fi
②.如果是将/usr/local/kafka/bin/kafka-run-class.sh上传到服务器,需要授权
chmod +x kafka-run-class.sh

上传后的.sh文件不授权,将不能执行。

四.修改kafka数据存储目录

打开kafka安装目录下,server.propertities配置文件,修改

①.修改数据保存路径:
log.dirs=/mnt/kafka/data

log.dirs既然是复数形式,那它其实是支持配置多个路径的,而且我也推荐这么做,因为kafka可以“负载均匀”的将数据存储到多个目录下,当然必须是挂载到多个磁盘,而不是同一个磁盘的多个目录。挂载多个磁盘,多个磁头可以同时进行写操作,将极大提升kafka的吞吐量。
kafka负载均匀是相对于kafka的分区数来说的,而不是根据磁盘空间来负载均衡的。

如,可以指定两个不同磁盘(mnt1,mnt2),作为数据存储目录,配置文件修改如下:

log.dirs=/mnt1/kafka/data,/mnt2/kafka/data
②.修改保留策略

可以根据自己的磁盘大小,决定保留数据的大小。

#保留小时数(720h=30天)
log.retention.hours=720
#总保留字节(200G)
log.retention.bytes=214748364800

log.retention.{hours|minutes|ms},这组参数控制消息存留时间。从时间维度控制数据保留时间。

尽量单独使用,如果3个同时设置,比如:

log.retention.hours=720(不生效)
log.retention.minutes=60(不生效)
log.retention.ms=60000(生效)

那么上述设置优先生效的是ms,即只保存60000毫秒内的数据。其优先级ms>minutes>hours

kafka的默认存储时间是7天,超过7天的数据将自动删除。kafka根据数据中的时间戳来实现这一保留策略的。

log.retention.bytes,这个参数控制kafka集群为每个消息日志保存多大的数据,若超过此参数的分区日志,kafka将自动删除该日志文件。从空间维度控制kafka数据的保留时间。

默认值是-1,表示kafka不会根据文件大小删除日志。

5.启动kafak

所有集群节点,都要启动。

注:重启kafka集群前,最好先重启下zookeeper集群,否则可能会出现连接超时的情况。

无日志启动:

cd /usr/local/kafka/bin
nohup ./kafka-server-start.sh ../config/server.properties  >>/dev/null 2>&1 &

查看启动日志(日志已保存到新目录):

tail -1000f /mnt/kafka/logs/server.log

6.确认kafka集群正常运行后,删除掉旧数据

一定让kafka多跑一会,发现各个topic可以正常写入数据后,说明数据迁移正常,并且看看迁移后的目录,其磁盘目录存储大小在递增,说明迁移成功了,此时,再考虑删除旧数据。

# 删除旧数据文件夹
rm -rf /usr/local/kafka/data

# 删除旧日志文件夹
rm -rf /usr/local/kafka/logs

三.总结

本文以处理生产环境的标准,来做迁移kafka数据这件事,如有忽略的地方可以在评论中指出。希望本文可以帮你安全的迁移生产的kafka数据,喜欢本文可以点赞收藏。

kafka文章集锦:

《Centors下搭建Kafka集群教程(v2.7.1)》

《Centors下搭建Zookeeper v3.5.9集群(支持kafka2.7.1集群)教程》

《新版kafka Manager(CMAK)安装部署教程》

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐