一、过期时间TTL

Kafka 和 RabbitMQ 不同,Kafka的消息并没有TTL这一概念,因此想要实现消息的过期功能,需要作额外的处理

这里提供一种实现方案:将消息的 TTL 的设定值以键值对的形式保存在消息的 header 字段中,在消费者端配置拦截器,消费者在消费消息时判断此条消息是否超时

1.消息的header消息头

ProducerRecord 提供了两个构造方法,可以为消息设置消息头添加需要的额外信息:

public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)

public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers)

通过该方法为消息设置TTL:

new ProducerRecord<>(topic, 0, System.currentTimeMillis(),
                null, "msg", new RecordHeaders().add(new RecordHeader("ttl", ByteUtils.longToBytes(20))));

需要注意 RecordHeader 的值是 byte[]类型,因此需要进行转换

2.TTL拦截器实现

实现 ConsumerInterceptor 接口的 onConsumer()方法:

@Override
public ConsumerRecords<String, String> onConsume(org.apache.kafka.clients.consumer.ConsumerRecords records) {
     long now = System.currentTimeMillis();
     Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>();
     for (TopicPartition tp : records.partitions()) {
           List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
           List<ConsumerRecord<String, String>> newTpRecords = new ArrayList<>();
           for (ConsumerRecord<String, String> record : tpRecords) {
                long ttl = -1;
                for (Header header : record.headers()) {
                     if (header.key().equals("ttl")){
                           ttl = ByteUtils.byteToLong(header.value());
                     }
                }
               // 判断消息是否超时
               if (ttl < 0 || (now - record.timestamp() < ttl * 1000)){
                     newTpRecords.add(record);
               }
               if (!newTpRecords.isEmpty()){
                     newRecords.put(tp, newTpRecords);
               }
            }
     }
     return new ConsumerRecords<String, String>(newRecords);
}

通过这种方式实现TTL需要注意:在一次消息拉取的批次中,由于含有最大偏移量的消息可能会被过滤,因此可能会提交错误的位移信息。解决方式,提前记录这一批消息的最大偏移量,或在过滤之后的消息集中的头部或尾部设置一个状态消息,专门用来存放这一批消息的最大偏移量。

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐