一、需求介绍

有一个Topic:hw_data 有3个分区 3个副本
组:hw-data-group
将这个主题的消息分发给两个(或者多个)消费者消费,(不能消费相同的消息)

1.图解

在这里插入图片描述

2.关键注解@kafkaListener

在这里插入图片描述

@Target({ 
     ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE }) 
 
@Retention(RetentionPolicy.RUNTIME) 
 
@MessageMapping 
 
@Documented 
 
@Repeatable(KafkaListeners.class) 
 
public @interface KafkaListener { 
     
 
   /** 
    * 消费者的id,当GroupId没有被配置的时候,默认id为GroupId 
    */ 
   String id() default ""; 
 
   /** 
    * 监听容器工厂,当监听时需要区分单数据还是多数据消费需要配置containerFactory      属性 
    */ 
   String containerFactory() default ""; 
 
   /** 
    * 需要监听的Topic,可监听多个,和 topicPattern 属性互斥 
    */ 
   String[] topics() default { 
    }; 
 
   /** 
    * 需要监听的Topic的正则表达。和 topics,topicPartitions属性互斥 
    */ 
   String topicPattern() default ""; 
 
 
   /** 
    * 可配置更加详细的监听信息,必须监听某个Topic中的指定分区,或者从offset为200的偏移量开始监听,可配置该参数, 和 topicPattern 属性互斥 
    */ 
   TopicPartition[] topicPartitions() default { 
    }; 
 
   /** 
    *侦听器容器组  
    */ 
   String containerGroup() default ""; 
 
   /** 
    * 监听异常处理器,配置BeanName 
    */ 
   String errorHandler() default ""; 
 
 
   /** 
    * 消费组ID  
    */ 
   String groupId() default ""; 
 
   /** 
    * id是否为GroupId 
    */ 
   boolean idIsGroup() default true; 
 
   /** 
    * 消费者Id前缀 
    */ 
   String clientIdPrefix() default ""; 
    
   /** 
    * 真实监听容器的BeanName,需要在 BeanName前加 "__" 
    */ 
   String beanRef() default "__listener"; 
} 

二、代码实现

1)第一个消费者

package com.dataWarehouseOss.consumer; 
 
import lombok.extern.slf4j.Slf4j; 
import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.springframework.kafka.annotation.KafkaListener; 
import org.springframework.kafka.annotation.TopicPartition; 
import org.springframework.stereotype.Component; 
 
/** 
 * @author :LiuShihao 
 * @date :Created in 2020/9/16 4:15 下午 
 * @desc : 
 * containerGroup:侦听器容器组 
 * topicPartitions:可配置更加详细的监听信息,必须监听某个Topic中的指定分区,或者从offset为200的偏移量开始监听,可配置该参数, 和 topicPattern 属性互斥 
 */ 
@Slf4j 
@Component 
public class Consumer1 { 
     
    @KafkaListener(containerGroup="first-group",topicPartitions = { 
    @TopicPartition(topic = "first",partitions = { 
    "0","1"})}) 
    public void m1(ConsumerRecord<String, String> record){ 
     
        log.info("分区0,1 :"+record.topic()+" : "+record.value()); 
    } 
} 

2)第二个消费者

package com.dataWarehouseOss.consumer; 
 
import lombok.extern.slf4j.Slf4j; 
import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.springframework.kafka.annotation.KafkaListener; 
import org.springframework.kafka.annotation.TopicPartition; 
import org.springframework.stereotype.Component; 
 
/** 
 * @author :LiuShihao 
 * @date :Created in 2020/9/16 4:15 下午 
 * @desc : 
 * containerGroup:侦听器容器组 
 * topicPartitions:可配置更加详细的监听信息,必须监听某个Topic中的指定分区,或者从offset为200的偏移量开始监听,可配置该参数, 和 topicPattern 属性互斥 
 */ 
@Slf4j 
@Component 
public class Consumer2 { 
     
    @KafkaListener(containerGroup="first-group",topicPartitions = { 
    @TopicPartition(topic = "first",partitions = { 
    "2"})}) 
    public void m1(ConsumerRecord<String, String> record){ 
     
        log.info("分区2 :"+record.topic()+" : "+record.value()); 
    } 
} 

3)生产者

@Component 
@Slf4j 
public class SendKafkaToFirst { 
     
    @Autowired 
    KafkaTemplate kafkaTemplate; 
    public static final String  TOPIC = "first"; 
    @Scheduled(cron = "0 */2 * * * ?") 
    public void sendKafka(){ 
     
        log.info("---====定时任务执行了:向first发送10条数据====---"); 
        for (int i = 1; i <=10 ; i++) { 
     
            kafkaTemplate.send(TOPIC,i+""); 
            log.info("---==="+i+"===---"); 
        } 
    } 
} 

三、测试

创建first主题 三个分区 三个副本
向first主题中发送10条数据
在这里插入图片描述
可以看到,我们发送的数据并没有被重复消费

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐