public class JavaConsumerTool {
    /**
     * 创建消费者
     * @return
     */
    public static KafkaConsumer<String, String> getConsumer(){
        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9092");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        return consumer;
    }

    /**
     * 根据时间戳获取偏移量
     * @param consumer
     * @param topic
     * @param partition 分区号
     * @param datetimeStr 消息时间
     * @return
     * @throws ParseException
     */
    public static Long getOffsetByDateTime(KafkaConsumer consumer, String topic,int partition,String datetimeStr) throws ParseException {
        DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        long timestamp = df.parse(datetimeStr).getTime();
        Map<TopicPartition,Long> map = new HashMap();
        TopicPartition topicPartition = new TopicPartition(topic, partition);
        map.put(topicPartition,timestamp);
        Map<TopicPartition, OffsetAndTimestamp> offset = null;
        try {
            offset = consumer.offsetsForTimes(map,Duration.ofSeconds(10));
        }catch (Exception e){
            e.printStackTrace();
            return null;
        }
        return  offset.get(topicPartition).offset();
    }

    /**
     * 消费某时间范围内的一批数据
     * @param consumer
     * @param topic
     * @param partition 分区号
     * @param startTime 消费起始时间
     * @param endTime   消费结束时间
     */
    public static void consumerOnTimeBatch(KafkaConsumer<String, String> consumer, String topic, int partition, String startTime,String endTime){
        TopicPartition topicPartition = new TopicPartition(topic,partition);
        //指定主题分区
        consumer.assign(Arrays.asList(topicPartition));
        long startOffset = 0L;
        long endOffset = 0L;
        try {
            startOffset = getOffsetByDateTime(consumer,topic,partition,startTime);
            endOffset = getOffsetByDateTime(consumer,topic,partition,endTime);
        } catch (ParseException e) {
            e.printStackTrace();
        }
        consumer.seek(topicPartition,startOffset);
        long offset = 0L;
        while (offset<=endOffset) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1));
            for (ConsumerRecord<String, String> record : records){
                offset = record.offset();
                System.out.println("时间:"+new Date(record.timestamp())+",偏移量:"+record.offset()+",消息体:"+record.value());
            }
        }
        consumer.close();
    }

    /*
    执行入口
     */
    public static void main(String[] args) throws Exception {
        KafkaConsumer<String, String> consumer = getConsumer();
        String topic = "test";
        int partition = 0;
        String startTime = "1997-01-17 00:00:00";
        String endTime = "1997-01-18 00:00:00";
        //消费某时间范围的一批主题数据
        consumerOnTimeBatch(consumer,topic,partition,startTime,endTime);
        
    }

}
Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐