首先我们先模拟生产者代码


    /**
     * 多个消费者同时消费一条消息
     */
    @GetMapping("/kafka/many_consumer")
    public void manyConsumer() {
        for (int i = 0; i <1; i++) {
            try {
                Thread.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Long orderId = new SnowflakeGenerator().next();
            //key值取hash值对分区数量取模
            final Integer partition =Math.abs(orderId.hashCode())%3;
            final Order order = new Order();
            order.setOrderNo(orderId+"");
            order.setCreateTime(new Date());
            order.setPhone('1' + RandomUtil.randomNumbers(10));
            log.info("kafka 发送消息"+orderId + "分区====="+partition);
            kafkaService.sendMsg("topic.many.consumer",partition,orderId+"", JSON.toJSONString(order));
        }
    }
}

然后实现多个消费者同时消费topic中的消息

 @KafkaListener(groupId="many-consumer-id-1", topics = {"topic.many.consumer"})
    public void consumer1(ConsumerRecord<Integer, String> record) {
        log.info("分组:many-consumer-id-1 "+" 主题:" + record.topic() + "-" + record.partition() + "-" + record.value());
    }
    
    @KafkaListener(groupId="many-consumer-id-2", topics = {"topic.many.consumer"})
    public void consumer2(ConsumerRecord<Integer, String> record) {
        log.info("分组:many-consumer-id-2 "+" 主题:" + record.topic() + "-" + record.partition() + "-" + record.value());
    }
    @KafkaListener(groupId="many-consumer-id-3", topics = {"topic.many.consumer"})
    public void consumer3(ConsumerRecord<Integer, String> record) {
        log.info("分组:many-consumer-id-3 "+" 主题:" + record.topic() + "-" + record.partition() + "-" + record.value());
    }

其实实现这个功能的重点在于@KafkaListener这个注解中的groupId是不同的

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐