Kafka 消费特定时间戳之后的消息
kafka 根据时间戳来消费消息
·
前言
我们都知道 kafka 可以根据制定的分区和偏移量来消费。但是最近碰到一个需求,需要把之前一周的消息都拉出来做分析,那么就要根据时间戳来进行消费。
代码实现
public void seekBeforeTimestamp() {
// 初始化 kafka
KafkaConsumer<String, String> consumer = init();
Set<TopicPartition> assignment = new HashSet<>();
// 在poll()方法内部执行分区分配逻辑,该循环确保分区已被分配。
// 当分区消息为0时进入此循环,如果不为0,则说明已经成功分配到了分区。
while (assignment.size() == 0) {
consumer.poll(Duration.ofMillis(1000));
// assignment()方法是用来获取消费者所分配到的分区消息的
assignment = consumer.assignment();
}
System.out.println("assignment.size() = " + assignment.size());
Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
for (TopicPartition tp : assignment) {
// 设置查询分区时间戳的条件:获取当前时间前周之后的消息
timestampToSearch.put(tp, System.currentTimeMillis() - 7 * 24 * 3600 * 1000);
}
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestampToSearch);
for (TopicPartition tp : assignment) {
// 获取该分区的offset以及timestamp
OffsetAndTimestamp offsetAndTimestamp = offsets.get(tp);
// 如果offsetAndTimestamp不为null,则证明当前分区有符合时间戳条件的消息
if (offsetAndTimestamp != null) {
consumer.seek(tp, offsetAndTimestamp.offset());
}
}
}
上面代码中的 init() 方法为初始化 kafka 配置的方法,此处略,大家根据自己的需求配置初始化参数即可。这里就说明一下,如果要订阅 Topic 中的全部分区的实现方法。
// 订阅全部分区
List<TopicPartition> partitions = Lists.newArrayList();
List<PartitionInfo> partitionInfos = consumer.partitionsFor(KAFKA_TOPIC);
System.out.println("partitionInfos.size() = " + partitionInfos.size());
if (partitionInfos.size() > 0) {
for (PartitionInfo info : partitionInfos) {
partitions.add(new TopicPartition(info.topic(), info.partition()));
}
}
consumer.assign(partitions);
更多推荐
已为社区贡献5条内容
所有评论(0)