kafka指定时间范围消费一批topic数据
public class JavaConsumerTool {/*** 创建消费者* @return*/public static KafkaConsumer<String, String> getConsumer(){Properties props = new Properties();props.put("bootstrap.servers", "127.0.0.1:9092")
·
public class JavaConsumerTool {
/**
* 创建消费者
* @return
*/
public static KafkaConsumer<String, String> getConsumer(){
Properties props = new Properties();
props.put("bootstrap.servers", "127.0.0.1:9092");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
return consumer;
}
/**
* 根据时间戳获取偏移量
* @param consumer
* @param topic
* @param partition 分区号
* @param datetimeStr 消息时间
* @return
* @throws ParseException
*/
public static Long getOffsetByDateTime(KafkaConsumer consumer, String topic,int partition,String datetimeStr) throws ParseException {
DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long timestamp = df.parse(datetimeStr).getTime();
Map<TopicPartition,Long> map = new HashMap();
TopicPartition topicPartition = new TopicPartition(topic, partition);
map.put(topicPartition,timestamp);
Map<TopicPartition, OffsetAndTimestamp> offset = null;
try {
offset = consumer.offsetsForTimes(map,Duration.ofSeconds(10));
}catch (Exception e){
e.printStackTrace();
return null;
}
return offset.get(topicPartition).offset();
}
/**
* 消费某时间范围内的一批数据
* @param consumer
* @param topic
* @param partition 分区号
* @param startTime 消费起始时间
* @param endTime 消费结束时间
*/
public static void consumerOnTimeBatch(KafkaConsumer<String, String> consumer, String topic, int partition, String startTime,String endTime){
TopicPartition topicPartition = new TopicPartition(topic,partition);
//指定主题分区
consumer.assign(Arrays.asList(topicPartition));
long startOffset = 0L;
long endOffset = 0L;
try {
startOffset = getOffsetByDateTime(consumer,topic,partition,startTime);
endOffset = getOffsetByDateTime(consumer,topic,partition,endTime);
} catch (ParseException e) {
e.printStackTrace();
}
consumer.seek(topicPartition,startOffset);
long offset = 0L;
while (offset<=endOffset) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1));
for (ConsumerRecord<String, String> record : records){
offset = record.offset();
System.out.println("时间:"+new Date(record.timestamp())+",偏移量:"+record.offset()+",消息体:"+record.value());
}
}
consumer.close();
}
/*
执行入口
*/
public static void main(String[] args) throws Exception {
KafkaConsumer<String, String> consumer = getConsumer();
String topic = "test";
int partition = 0;
String startTime = "1997-01-17 00:00:00";
String endTime = "1997-01-18 00:00:00";
//消费某时间范围的一批主题数据
consumerOnTimeBatch(consumer,topic,partition,startTime,endTime);
}
}
更多推荐
已为社区贡献4条内容
所有评论(0)