文章目录

前言

我们都知道 kafka 可以根据制定的分区和偏移量来消费。但是最近碰到一个需求,需要把之前一周的消息都拉出来做分析,那么就要根据时间戳来进行消费。

代码实现

    public void seekBeforeTimestamp() {
    	// 初始化 kafka
        KafkaConsumer<String, String> consumer = init();
        
        Set<TopicPartition> assignment = new HashSet<>();
        // 在poll()方法内部执行分区分配逻辑,该循环确保分区已被分配。
        // 当分区消息为0时进入此循环,如果不为0,则说明已经成功分配到了分区。
        while (assignment.size() == 0) {
            consumer.poll(Duration.ofMillis(1000));
            // assignment()方法是用来获取消费者所分配到的分区消息的
            assignment = consumer.assignment();
        }

        System.out.println("assignment.size() = " + assignment.size());

        Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
        for (TopicPartition tp : assignment) {
            // 设置查询分区时间戳的条件:获取当前时间前周之后的消息
            timestampToSearch.put(tp, System.currentTimeMillis() - 7 * 24 * 3600 * 1000);
        }
        
        Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestampToSearch);

        for (TopicPartition tp : assignment) {
            // 获取该分区的offset以及timestamp
            OffsetAndTimestamp offsetAndTimestamp = offsets.get(tp);
            // 如果offsetAndTimestamp不为null,则证明当前分区有符合时间戳条件的消息
            if (offsetAndTimestamp != null) {
                consumer.seek(tp, offsetAndTimestamp.offset());
            }
        }
    }

上面代码中的 init() 方法为初始化 kafka 配置的方法,此处略,大家根据自己的需求配置初始化参数即可。这里就说明一下,如果要订阅 Topic 中的全部分区的实现方法。

        // 订阅全部分区
        List<TopicPartition> partitions = Lists.newArrayList();
        List<PartitionInfo> partitionInfos = consumer.partitionsFor(KAFKA_TOPIC);
        System.out.println("partitionInfos.size() = " + partitionInfos.size());
        if (partitionInfos.size() > 0) {
            for (PartitionInfo info : partitionInfos) {
                partitions.add(new TopicPartition(info.topic(), info.partition()));
            }
        }
        consumer.assign(partitions);
Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐