需求

出于成本考虑,我们只使用了两个topic,测试环境一个,线上一个,然后大家的消息都发到同一个topic里,所以在消费时,就需要实现消息过滤。kafka是没有tag等功能的,所以过滤只能在消费端实现。下面直接上代码

代码

配置文件

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092 # kafka连接接地址
    #    client-id: # 发送请求时传给服务器的id
    consumer:
      topic: TEST_XXX
      bootstrap-servers: 127.0.0.1:9092 # 会覆盖spring.kafka.bootstrap-servers 配置
      group-id: TEST_GROUP # 消费者所属消息组
      key-serializer: org.apache.kafka.common.serialization.StringDeserializer # 反序列化key的类
      value-serializer: org.springframework.kafka.support.serializer.JsonDeserializer # 反序列化value的类

消费过滤代码:

    /**
     * 消息过滤
     * @return
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        //设置可以丢弃消息  配合RecordFilterStrategy使用
        factory.setAckDiscarded(true);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);

        factory.setRecordFilterStrategy(new RecordFilterStrategy() {
            @Override
            public boolean filter(ConsumerRecord consumerRecord) {
                String data = (String) consumerRecord.value();
                log.info("filterContainerFactory filter : "+data);
                if (data.indexOf("test") != -1) {
                    return false;
                }
                //返回true将会被丢弃
                return true;
            }
        });
        return factory;
    }

实际消费代码:

 @KafkaListener(topics = "${spring.kafka.consumer.topic}",groupId = "${spring.kafka.consumer.group-id}",containerFactory = "filterContainerFactory")
    public void consumerDatabase(String data, Acknowledgment ack){
    log.info("group1接收到消息时间:{}",dateTime);
   //业务处理代码
    //手动提交偏移量
    ack.acknowledge();
}

原理:消息过滤器

消息过滤器可以在消息抵达监听容器前被拦截,过滤器根据系统业务逻辑去筛选出需要的数据再交由KafkaListener处理。

配置消息只需要为监听容器工厂配置一个RecordFilterStrategy(消息过滤策略),

返回true的时候消息将会被抛弃,返回false时,消息能正常抵达监听容器。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐