Delete records
Kafka 将主题记录存储在磁盘上,即使消费者已经读取了这些数据,它也会保留这些数据。然而,记录不是存储在一个大文件中,而是按分区分为多个分段( segments),其中偏移量的顺序在同一主题分区的各个分段之间是连续的。由于服务器存储不可能无限大,因此 Kafka 提供了一些设置,用于根据时间和大小来控制保留多少数据:

控制数据保留的时间配置为 log.retention.hours ,默认为168小时(一周);
log.retention.bytes 参数控制 segments 在删除之前可以增加到多少。
但是,log.retention.bytes 的默认设置是-1,这使日志段的大小不受限制。如果您不小心并且没有配置保留大小以及保留时间,则可能会出现磁盘空间用完的情况。请记住,您永远不要进入文件系统并手动删除文件。相反,我们希望有一种受控且受支持的方式从主题中删除记录,以释放空间。幸运的是,Kafka 附带了一个工具,可以根据需要删除数据。

kafka-delete-records 有两个主要的参数:

–bootstrap-server:需要连接的 brokers 地址;
–offset-json-file:包含删除配置的 Json 文件。
下面是 JSON 文件的示例:

{
   "partitions": [
                  {"topic": "example", "partition": 0, "offset": -1}
                 ],
                 "version":1
 }

正如你所看到的,JSON 文件的格式非常简单,它其实是一个 JSON 对象数组。每个 JSON 对象有以下三个属性:

Topic:需要删除数据的主题;
Partition:需要删除的分区;
Offset:需要从什么偏移量删除数据,将会把小于这个偏移量的数据删除。
上面示例中,我删除 example 主题的分区 0 的数据。example 主题只包含10条记录,所以您可以很容易地计算启动删除过程的起始偏移量。但在实践中,你很可能不知道应该使用哪种偏移量。还要记住 offset != message number,因此不能直接从“message 42”中删除。如果您将这个值设置为 -1,这意味着您将删除主题中所有数据。可以使用下面命令来删除主题的数据:

kafka-delete-records --bootstrap-server <broker-host:port> \
                     --offset-json-file offsets.json

运行完这个命令之后,你可以在控制台看到以下的输出:

Executing records delete operation
Records delete operation completed:
partition: example-0  low_watermark: 10

命令的结果表明,Kafka 从主题分区 example-0 删除了所有记录。 low_watermark=10表示可供消费者使用的最低偏移量。 因为示例主题中只有10条记录,所以我们知道偏移量的范围是0到9,并且没有消费者可以再次读取这些记录。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐