六、消费者1:指定 topic、partition、offset

1,使用 topics 指定 topic
(1)监听器主要是使用 @KafkaListenter 注解即可,而通过 topics 参数设置监听的 topic(可监听多个,用逗号隔开):
其他参数介绍:id(消费者 ID)、 groupId(消费组 ID)

@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(id = "consumer1",groupId = "my-group1", topics = {"topic1","topic2"})
    public void listen1(String data) {
        System.out.println(data);
    }
}

(2)下面分别往这两个 topic 的两个分区发送消息,可以看到消费者这边都能够收到:

@RestController
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
 
    // 发送消息
    @GetMapping("/test")
    public void test() {
        kafkaTemplate.send("topic1", 0, "key1", "message1");
        kafkaTemplate.send("topic1", 1, "key2", "message2");
        kafkaTemplate.send("topic2", 0, "key3", "message3");
        kafkaTemplate.send("topic2", 1, "key4", "message4");
    }
}

2,使用 topicPartitions 指定 topic、parition、offset

(1)topicPartitions 可配置更加详细的监听信息,比如下面代码同样是同时监听 topic1 和 topic2,不同在于这次:
监听 topic1 的 0 号分区
监听 topic2 的 0 号和 1 号分区(其中 1 号分区的初始偏移量为 100)

注意:topics 和 topicPartitions 不能同时使用。

@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(id = "consumer1",groupId = "my-group1",topicPartitions = {
            @TopicPartition(topic = "topic1", partitions = { "0" }),
            @TopicPartition(topic = "topic2", partitions = "0",
                    partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
    })
    public void listen1(String data) {
        System.out.println(data);
    }
}

(2)下面分别往这两个 topic 的两个分区发送消息,可以看到消费者这边只会收到指定的消息:

@RestController
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
 
    // 发送消息
    @GetMapping("/test")
    public void test() {
        kafkaTemplate.send("topic1", 0, "key1", "message1");
        kafkaTemplate.send("topic1", 1, "key2", "message2");
        kafkaTemplate.send("topic2", 0, "key3", "message3");
        kafkaTemplate.send("topic2", 1, "key4", "message4");
    }
}

之前的样例中消费者这边都直接获取消息内容并使用,如果我们还想要获取分区信息、消息头等其他内容的话,有如下两种方式。

七、消费者2:获取消息头和消息体

1,使用 ConsumerRecord 类方式

(1)使用 ConsumerRecord 类接收有一定的好处,ConsumerRecord 类里面包含分区信息、消息头、消息体等内容,如果业务需要获取这些参数时,使用 ConsumerRecord 会是个不错的选择。

@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"topic3"})
    public void listen1(ConsumerRecord<String, Object> record) {
        //把ConsumerRecord里面所包含的内容打印到控制台中
        System.out.println(record);
    }
}

(2)这里我们简单的发送一条消息:

@RestController
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
 
    // 发送消息
    @GetMapping("/test")
    public void test() {
        kafkaTemplate.send("topic3", 0, System.currentTimeMillis(), "key1", "hangge.com");
    }
}

(3)可以看到控制台输出的内容如下:

在这里插入图片描述

2,使用注解方式获取

(1)如果我们监听方法需要获取该消息非常多的字段时,也可以通过如下这种注解的方式:

@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"topic3"})
    public void listen2(@Payload String data,
                        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
                        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) {
        System.out.println(data);
        System.out.println(topic);
        System.out.println(partition);
        System.out.println(key);
        System.out.println(ts);
    }
}

(2)这里我们简单的发送一条消息:

@RestController
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
 
    // 发送消息
    @GetMapping("/test")
    public void test() {
        kafkaTemplate.send("topic3", 0, System.currentTimeMillis(), "key1", "hangge.com");
    }
}

由于 Kafka 的写性能非常高,因此项目经常会碰到 Kafka 消息队列拥堵的情况。遇到这种情况,我们可以通过并发消费、批量消费的方法进行解决。

八、消费者3:并发、批量消费

1,批量消费

(1)首先我们在项目 application.properties 文件中添加如下配置,一个设置启用批量消费,一个设置批量消费每次最多消费多少条消息记录。

注意:这里设置 max-poll-records 是 5,并不是说如果没有达到 5 条消息,我们就一直等待。而是说一次 poll 最多返回的记录数为 5。

# 设置批量消费
spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
spring.kafka.consumer.max-poll-records=5

(2)接着对消费者监听这边代码稍作修改,改成使用 List 来接收:

@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"topic3"})
    public void listen1(List<String> data) {
        System.out.println("收到"+ data.size() + "条消息:");
        System.out.println(data);
    }
}
  • 如果使用 ConsumerRecord 类接收,则也是使用 List 来接收:
@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"topic3"})
    public void listen1(List<ConsumerRecord<String, Object>> records) {
        System.out.println("收到"+ records.size() + "条消息:");
        System.out.println(records);
    }
  • 如果使用注解方式获取消息头、消息体,则也是使用 List 来接收:
@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"topic3"})
    public void listen2(@Payload List<String> data,
                        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<String> keys,
                        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) List<Long> tss) {
        System.out.println("收到"+ data.size() + "条消息:");
        System.out.println(data);
        System.out.println(topics);
        System.out.println(partitions);
        System.out.println(keys);
        System.out.println(tss);
    }
}

(3)我们一次性发送的 23 条数据测试一下:

@RestController
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
 
    // 发送消息
    @GetMapping("/test")
    public void test() {
        for (int i = 0; i < 23; i++) {
            kafkaTemplate.send("topic3", "message-" + i);
        }
    }
}

2,并发消费

(1)为了加快消费,我们可以提高并发数,比如下面配置我们将并发设置为 3:
注意:并发量根据实际分区数决定,必须小于等于分区数,否则会有线程一直处于空闲状态。

# 并发数设为3
spring.kafka.listener.concurrency=3

(2)配置完毕后,消费者监听这边不需要修改:

@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"topic3"})
    public void listen1(String data) {
        System.out.println(data);
    }
}
  • 并发消费和批量消费可以结合同时使用的,消费者监听使用 List 来接收:
@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"topic3"})
    public void listen1(List<String> data) {
        System.out.println("收到"+ data.size() + "条消息:");
        System.out.println(data);
    }
}

(3)上面我们设置 concurrency 为 3,也就是将会启动 3 条线程进行监听。而由于我们创建的 topic 有 4 个 partition(分区),意味着将有 2 条线程都是分配到 1 个 partition,还有 1 条线程分配到 2 个 partition。我们可以通过日志看到每条线程分配到的 partition。

在这里插入图片描述

通常来说 KafkaListener 要做的事只是监听 Topic 中的数据并消费,如果在 KafkaListener 中还需要对异常进行 try catch 捕获并处理的话,则会显得代码块非常臃肿不利于维护。
​ 好在 spring-kafka 为我们提供了专门的异常处理器(ConsumerAwareListenerErrorHandler),通过异常处理器,我们可以处理 consumer 在消费时发生的异常。

九、消费者4:异常处理器

1,注册异常处理器

(1)注册一个异常处理器就是新建一个 ConsumerAwareListenerErrorHandler 类型的异常处理方法,使用 @Bean 注入(BeanName 默认就是方法名):

@Configuration
public class KafkaInitialConfiguration {
 
    //异常处理器
    @Bean
    public ConsumerAwareListenerErrorHandler myConsumerAwareErrorHandler() {
        return new ConsumerAwareListenerErrorHandler() {
            @Override
            public Object handleError(Message<?> message, ListenerExecutionFailedException exception,
                                      Consumer<?, ?> consumer) {
                System.out.println("--- 发生消费异常 ---");
                System.out.println(message.getPayload());
                System.out.println(exception);
                return null;
            }
        };
    }
}

(2)当然我们也可以使用下面这种 lambda 表达式写法,简化代码:

@Configuration
public class KafkaInitialConfiguration {
 
    //异常处理器
    @Bean
    public ConsumerAwareListenerErrorHandler myConsumerAwareErrorHandler() {
        return (message, exception, consumer) -> {
            System.out.println("--- 发生消费异常 ---");
            System.out.println(message.getPayload());
            System.out.println(exception);
            return null;
        };
    }
}

2,使用异常处理器

​ 异常处理器注册好之后,将这个异常处理器的 BeanName 放到 @KafkaListener 注解的 errorHandler 属性里面,当监听抛出异常的时候,则会自动调用异常处理器。

@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"topic3"}, errorHandler = "myConsumerAwareErrorHandler")
    public void listen1(String data) throws Exception {
        throw new Exception("模拟一个异常");
    }
}

3,开始测试

(1)编写测试方法,发送一条消息:

@RestController
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
 
    // 发送消息
    @GetMapping("/test")
    public void test() {
        kafkaTemplate.send("topic3", "hangge.com");
    }
}

(2)可以看到异常处理器已经能正常使用了:

附:批量消费异常处理器

​ 批量消费异常处理器和之前单消息消费异常处理器差不多,上面异常处理器代码可以完全不用改动直接使用,只不过传递过来的数据都是 List 集合方式:

消息过滤器可以让消息在抵达监听容器前被拦截,过滤器根据系统业务逻辑去筛选出需要的数据交由 KafkaListener 处理,不需要的消息则会过滤掉。

十、消费者5:消息过滤器

1,配置消息过滤器

​ 配置消息过滤器十分简单,只需要为监听容器工厂配置一个 RecordFilterStrategy(消息过滤策略),返回 true 的时候消息将会被抛弃,返回 false 时,消息则能正常抵达监听容器。

@Configuration
public class KafkaInitialConfiguration {
 
    // 监听器工厂
    @Autowired
    private ConsumerFactory consumerFactory;
 
    // 配置一个消息过滤策略
    @Bean
    public ConcurrentKafkaListenerContainerFactory myFilterContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory =
                new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        // 被过滤的消息将被丢弃
        factory.setAckDiscarded(true);
        // 消息过滤策略(将消息转换为long类型,判断是奇数还是偶数,把所有奇数过滤,监听器只接收偶数)
        factory.setRecordFilterStrategy(new RecordFilterStrategy() {
            @Override
            public boolean filter(ConsumerRecord consumerRecord) {
                long data = Long.parseLong((String) consumerRecord.value());
                if (data % 2 == 0) {
                    return false;
                }
                //返回true将会被丢弃
                return true;
            }
        });
        return factory;
    }
}

2,使用消息过滤器

​ 带有消息过滤策略的容器工厂注册好之后,将这个容器工厂的 BeanName 放到 @KafkaListener 注解的 containerFactory 属性里面。这样消息在抵达监听器之前,会先进行过滤。

@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"topic3"}, containerFactory = "myFilterContainerFactory")
    public void listen1(String data) {
        System.out.println(data);
    }
}

3,开始测试
(1)编写测试方法,连续发送 5 条消息(消息内容分别是 0 到 4 这个 5 个数字):

@RestController
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
 
    // 发送消息
    @GetMapping("/test")
    public void test() {
        for (int i = 0; i < 5; i++) {
            kafkaTemplate.send("topic3", String.valueOf(i));
        }
    }
}

在实际开发中,我们常常需要使用转发功能实现业务解耦。比如:应用 A 从 topic1 获取到消息,经过处理后转发到 topic2。应用 B 监听 topic2 获取消息再次进行处理。

十一、消费者6:消息转发

(1)Spring-Kafka 只需要通过一个 @SendTo 注解即可以实现消息的转发,被注解方法的 return 值即转发的消息内容:

@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"topic1"})
    @SendTo("topic2")
    public String listen1(String data) {
        System.out.println("业务A收到消息:" + data);
        return data + "(已处理)";
    }
 
    // 消费监听
    @KafkaListener(topics = {"topic2"})
    public void listen2(String data) {
        System.out.println("业务B收到消息:" + data);
    }
}

(2)编写测试方法,发送 1 条消息:

@RestController
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
 
    // 发送消息
    @GetMapping("/test")
    public void test() {
        kafkaTemplate.send("topic1", "1条测试消息");
    }
}
默认情况下,当项目启动时,监听器就开始工作(监听消费发送到指定 topic 的消息)。如果我们不想让监听器立即工作,想在程序运行的过程中能够动态地开启、关闭监听器,可以借助 KafkaListenerEndpointRegistry 实现,下面通过样例进行演示。

十二、消费者7:动态启动、停止监听器

1,动态地开启、关闭监听器

(1)消费者这边代码没有什么特别的,主要是设置了个消费者 ID(监听器 ID),后面要根据这个 ID 来开启、关闭监听:

@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(id = "myListener1", topics = {"topic1"})
    public void listen1(String data) {
        System.out.println("监听器收到消息:" + data);
    }
}

(2)然后我定义两个 controller 接口分别通过 KafkaListenerEndpointRegistry 来控制监听器的开启、关闭:

注意:
KafkaListenerEndpointRegistry 在 SpringIO 中已经被注册为 Bean,直接注入使用即可。
还需要注意一下启动监听容器的方法,resume 是恢复的意思不是启动的意思。所以我们需要判断容器是否运行,如果运行则调用 resume 方法,否则调用 start 方法。
@RestController
public class KafkaProducer {
 
    @Autowired
    private KafkaListenerEndpointRegistry registry;
 
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
 
    // 发送消息
    @GetMapping("/test")
    public void test() {
        System.out.println("监听器发送消息!");
        kafkaTemplate.send("topic1", "1条测试消息");
    }
 
    // 开启监听
    @GetMapping("/start")
    public void start() {
        System.out.println("开启监听");
        //判断监听容器是否启动,未启动则将其启动
        if (!registry.getListenerContainer("myListener1").isRunning()) {
            registry.getListenerContainer("myListener1").start();
        }
        registry.getListenerContainer("myListener1").resume();
    }
 
    // 关闭监听
    @GetMapping("/stop")
    public void stop() {
        System.out.println("关闭监听");
        //判断监听容器是否启动,未启动则将其启动
        registry.getListenerContainer("myListener1").pause();
    }
}

(3)开始测试一下:

  • 启动项目,调用 /test 接口发送一条消息,监听器成功接收到消息。

  • 调用 /stop 接口关闭监听,再次发送一条消息,监听器不会收到消息。

  • 调用 /start 接口开启监听,监听器收到之前发送的消息。

  • 由于监听已经开启,再次发送一条消息,监听器成功接收到消息。

2,禁止监听器自启动

(1)默认情况下,当项目启动的时候,监听器就开始工作。如果想要禁止监听器自启动,首先我们定义一个不自动启动的监听容器工厂:

@Configuration
public class KafkaInitialConfiguration {
 
    // 监听器工厂
    @Autowired
    private ConsumerFactory consumerFactory;
 
    // 监听器容器工厂(设置禁止KafkaListener自启动)
    @Bean
    public ConcurrentKafkaListenerContainerFactory delayContainerFactory() {
        ConcurrentKafkaListenerContainerFactory container =
                new ConcurrentKafkaListenerContainerFactory();
        container.setConsumerFactory(consumerFactory);
        //禁止自动启动
        container.setAutoStartup(false);
        return container;
    }
}

(2)然后将这个容器工厂的 BeanName 放到 @KafkaListener 注解的 containerFactory 属性里面。这样该监听器就不会自动启动了。

@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(id = "myListener1", topics = {"topic1"},
            containerFactory = "delayContainerFactory")
    public void listen1(String data) {
        System.out.println("监听器收到消息:" + data);
    }
}

附:定时启动、停止监听器

​ 有时我们需要监听器在我们指定的时间点开始工作,或者在我们指定的时间点停止工作。比如:项目需要利用 Kafka 做数据持久化的功能,由于用户活跃的时间为早上 10 点至晚上 12 点,那在这个时间段做一个大数据量的持久化可能会影响数据库性能导致用户体验降低,我们可以选择在用户活跃度低的时间段去做持久化的操作,也就是晚上 12 点后到第二条的早上 10 点前。这个可以结合定时任务来实现。

(1)首先在项目启动类上添加 @EnableScheduling 注解开启定时任务:

@SpringBootApplication
@EnableScheduling
public class KtestApplication {
    public static void main(String[] args) {
        SpringApplication.run(KtestApplication.class, args);
    }
}

(2)下面代码我们创建两个定时任务,一个用来在指定时间点启动 myListener1 这个监听器,另一个在指定时间点停止 myListener1 这个监听器:

@Component
public class MyKafkaSchedule {
    @Autowired
    private KafkaListenerEndpointRegistry registry;
 
    //定时器,每天凌晨0点开启监听
    @Scheduled(cron = "0 0 0 * * ?")
    public void startListener() {
        System.out.println("开启监听");
        //判断监听容器是否启动,未启动则将其启动
        if (!registry.getListenerContainer("myListener1").isRunning()) {
            registry.getListenerContainer("myListener1").start();
        }
        registry.getListenerContainer("myListener1").resume();
    }
 
    //定时器,每天早上10点关闭监听
    @Scheduled(cron = "0 0 10 * * ?")
    public void shutDownListener() {
        System.out.println("关闭监听");
        registry.getListenerContainer("myListener1").pause();
    }
}

(3)最后记得禁止这个监听器自启动(关于禁止自动启动的监听容器工厂见文章第 2 点内容):

@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(id = "myListener1", topics = {"topic1"},
            containerFactory = "delayContainerFactory")
    public void listen1(String data) {
        System.out.println("监听器收到消息:" + data);
    }
}

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐