基本使用-简单的生产消费

项目的基本构建

新建一个 maven 项目,引入 kafka 依赖,pom 文件内容如下

	<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.3</version>
        <relativePath/>
    </parent>

   <dependencies>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-web</artifactId>
       </dependency>

       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-test</artifactId>
           <scope>test</scope>
       </dependency>

       <dependency>
           <groupId>org.projectlombok</groupId>
           <artifactId>lombok</artifactId>
           <version>1.18.16</version>
       </dependency>

       <dependency>
           <groupId>org.springframework.kafka</groupId>
           <artifactId>spring-kafka</artifactId>
       </dependency>
   </dependencies>

编写配置文件 application.yml,进行 kafka 的基本配置

spring:
  # Kafka 基本配置
  kafka:
    # ===============Kafka 服务器配置===============
    bootstrap-servers: localhost:9092
    listener:
      # 消费者监听的topic不存在时,项目会报错,设置为false
      missing-topics-fatal: false
      # 设置批量消费消息(不批量处理时需要注释掉)
      #type: batch
    # ===============消费者配置===============
    consumer:
      # 是否自动提交偏移量offset
      enable-auto-commit: true
      # 提交offset延时
      auto:
        commit:
          intervals:
            ms: 1000
      # 当kafka中没有初始offset或offset超出范围时将自动重置offset
      # (1) earliest:重置为分区中最小的offset;
      # (2) latest:重置为分区中最新的offset(消费分区中新产生的数据);
      # (3) none:只要有一个分区不存在已提交的offset,就抛出异常;
      auto-offset-reset: latest
      # 消息的反序列化
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 设置批量消费的消息数
      max-poll-records: 10
      properties:
        # 消费会话超时时间
        session:
          timeout:
            ms: 120000
        # 消费请求超时时间
        request:
          timeout:
            ms: 180000
        # 默认的消费组id
        group:
          id: defaultConsumerGroup
    # ===============生产者配置===============
    producer:
      # 重试次数
      retries: 1
      # 应答级别:多少个分区副本完成备份后向生产者发送应答消息(可选0、1、all/-1)
      acks: -1
      # 批量大小
      batch-size: 16384
      # 生产端缓冲区大小
      buffer-memory: 33554432
      # 消息的序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 设置生成事务的前缀(使用事务时开启,不使用则注释掉)
      #transaction-id-prefix: transaction_
      properties:
        # 提交延时
        # 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
        # linger.ms为0表示每接收到一条消息就提交给kafka,此时batch-size失效
        linger:
          ms: 0
        # 自定义分区器
        #partitioner:
          #class: com.zqf.config.MyPartitioner # 配置自定义分区策略后所有消息都在 partition 0

创建启动类

@SpringBootApplication
public class KafkaDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaDemoApplication.class);
    }

}

可以编写一个配置类,创建一个名为 testTopic 的主题,并将其分区数设置为 8(当我们发送消息到主题时,Kafka 会自动创建该主题,但此时仅存在一个分区)

@Configuration
public class KafkaInitialConfiguration {

    // 创建一个名为 testTopic 的 topic 并设置分区数为 8,分区副本数为 2
    @Bean
    public NewTopic initialTopic() {
        return new NewTopic("testTopic", 8, (short) 2);
    }

    // 如果要修改分区数,只需修改配置值重启项目即可
    // 修改分区数并不会导致数据的丢失,但是分区数只能增大不能减小
    @Bean
    public NewTopic updateTopic() {
        return new NewTopic("testTopic", 10, (short) 2);
    }

}

编写生产者 Controller 控制类模拟消息的生产,引入 KafkaTemplate

@RestController
@RequestMapping("/kafka/producer")
public class ProducerController {

    @Resource
    private KafkaTemplate<Object, Object> kafkaTemplate;
    
}

简单的生产消费

在 ProducerController 中模拟消息的生产

	/**
     * 简单生成消息
     * @param topic 主题
     * @param msg 消息
     * @return 结果
     */
    @GetMapping("/simpleSend/{topic}/{msg}")
    public String simpleSend(@PathVariable("topic") String topic,
                             @PathVariable("msg") String msg) {
        kafkaTemplate.send(topic, msg);
        return "Kafka 生产消息成功~";
    }

创建 ConsumerService 作为消费者进行消息的消费,@KafkaListenertopics 属性指定了监听的主题,可同时监听多个,用逗号隔开即可

	/**
     * 简单监听
     * @param record 消息
     */
    @KafkaListener(topics = {"testTopic"})
    public void simpleGetMsg(ConsumerRecord<Object, Object> record) {
        // 方法内定义消息的处理逻辑...
        System.out.println("topic:" + record.topic());
        System.out.println("partition:" + record.partition());
        System.out.println("msg:" + (String) record.value());
    }

使用 postman 测试结果如下
在这里插入图片描述
消费结果
在这里插入图片描述

进阶使用-生产者

带回调的生产者

	/**
     * 带回调的发送消息
     * @param topic 主题
     * @param msg 消息
     * @return 结果
     */
    @GetMapping("/callbackSend/{topic}/{msg}")
    public String callbackSend(@PathVariable("topic") String topic,
                               @PathVariable("msg") String msg) {
        try {
            // 发送消息并设置成功和失败的逻辑
            kafkaTemplate.send(topic, msg).addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
                // 发送失败处理逻辑
                @Override
                public void onFailure(Throwable e) {
                    System.out.println("消息发送失败..." + e.getMessage());
                }
                // 发送成功处理逻辑
                @Override
                public void onSuccess(SendResult<Object, Object> result) {
                    System.out.println("消息发送成功..." + result.getRecordMetadata().topic() + "-" +
                            result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
                }
            });
        } catch (Throwable throwable) {
            return "kafka 发送消息失败~";
        }
        return "kafka 发送消息成功~";
    }

测试结果
在这里插入图片描述
发送消息成功回调打印信息
在这里插入图片描述

事务提交消息

	/**
     * kafka 事务发送消息
     * @param topic 主题
     * @param msg 消息
     * @return 结果
     */
    @GetMapping("/transactionSend/{topic}/{msg}")
    public String transactionSend(@PathVariable("topic") String topic,
                                  @PathVariable("msg") String msg) {
        final Boolean[] result = {true};
        kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback<Object, Object, Object>() {
            @Override
            public Object doInOperations(KafkaOperations<Object, Object> kafkaOperations) {
                try {
                    if ("HelloKafka_transaction".equals(msg)) {
                        throw new RuntimeException("消息异常,启动 Kafka 事务,不生产对应消息~");
                    }
                    try {
                        kafkaTemplate.send(topic, msg).get();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    }
                    return "生产消息无异常,生产成功~";
                } catch (Exception e) {
                    e.printStackTrace();
                    result[0] = false;
                    return "生产消息有异常,生产失败~";
                }
            }
        });
        if (!result[0]) {
            return "kafka 发送消息失败~";
        }
        return "kafka 发送消息成功~";
    }

使用事务发送消息时,需要在配置文件中配置transaction-id-prefix属性,即事务前缀,详见开篇的配置文件,打开相应的注释即可
在这里插入图片描述
生产异常处理,当有异常产生时,消息不会被发送,故消费者没有监听到信息
在这里插入图片描述

自定义分区器

Kafka 中 每一个 Topic 都可以划分成多个分区,而消息将被 append 到哪一个分区,则有对应的分区策略:

  • 若我们在发送消息时制定了分区策略,则将消息按照策略 append 到相应分区;
  • 若我们在发送消息时没有指定分区,但消息携带了 key,此时 kafka 将根据 key 将消息划分到对应分区,该策略将保证 key 相同的消息被 append 到同一分区;
  • 若分区和 key 都没有指定,则使用 kafka 的默认分区策略,轮询得到一个分区;

编写自定义分区器

public class MyPartitioner implements Partitioner {
    @Override
    public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
        Partitioner.super.onNewBatch(topic, cluster, prevPartition);
    }

    // 在此处自定义分区策略,目前该策略默认使用 partition 0
    @Override
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

配置好自定义的分区逻辑后,需要在配置文件中配置自定义分区策略
spring.kafka.producer.properties.partitioner.class=com.zqf.config.MyPartitioner
详见开篇配置文件末尾,将注释去掉即可

测试发现,所有消息都会被分发进入分区 0
在这里插入图片描述

进阶使用-消费者

指定消费者监听主题、分区、偏移量

配置@KafkaListener的属性可指定消费者监听的主题、分区以及偏移量

	/**
     * 监听特定主题、特定分区
     * @param record 消息
     */
    @KafkaListener(id = "consumer01", groupId = "testGroup", topicPartitions = {
            @TopicPartition(topic = "topic1", partitions = "0"),
            @TopicPartition(topic = "testTopic", partitions = {"1", "3", "5", "7", "9"})
    })
    public void targetGetMsg1(ConsumerRecord<Object, Object> record) {
        // 方法内定义消息的处理逻辑...
        System.out.println("=======consumer01收到消息=======");
        System.out.println("topic:" + record.topic());
        System.out.println("partition:" + record.partition());
        System.out.println("msg:" + (String) record.value());
    }

    /**
     * 监听特定主题、特定分区
     * @param record 消息
     */
    @KafkaListener(id = "consumer02", groupId = "testGroup", topicPartitions = {
            @TopicPartition(topic = "topic2", partitions = "0"),
            @TopicPartition(topic = "testTopic", partitions = {"0", "2", "4", "6", "8"})
    })
    public void targetGetMsg2(ConsumerRecord<Object, Object> record) {
        // 方法内定义消息的处理逻辑...
        System.out.println("=======consumer02收到消息=======");
        System.out.println("topic:" + record.topic());
        System.out.println("partition:" + record.partition());
        System.out.println("msg:" + (String) record.value());
    }

测试结果
在这里插入图片描述

消费者批量消费

修改配置文件,开启批量消费模式,并指定每一次消费的消息数量

# 设置批量消费
spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
spring.kafka.consumer.max-poll-records=50

详见开篇配置文件,将对应属性的注释去掉即可

编写生产者方法批量生产数据并发送

	/**
     * 批量消息处理
     * @param topic 主题
     * @param msg 消息
     * @return 结果
     */
    @GetMapping("/multiAccept/{topic}/{msg}")
    public String multiAccept(@PathVariable("topic") String topic,
                              @PathVariable("msg") String msg) {
        for (int i = 1; i <= 30; i++) {
            kafkaTemplate.send(topic, msg + i);
        }
        return "Kafka 发送消息成功~";
    }

消费者批量进行消息的消费

    /**
     * 批量消息处理
     * @param records 消息
     */
    @KafkaListener(topics = {"multiAcceptTopic"})
    public void multiAccept(List<ConsumerRecord<Object, Object>> records) {
        System.out.println("======= Kafka 开始批量消费 =======");
        System.out.println("消息数量 >>> " + records.size());
        records.forEach(consumer -> {
            System.out.println("msg:" + (String) consumer.value());
        });
    }

测试结果
在这里插入图片描述

消费者异常处理

消费者异常处理需要编写异常处理器,并配置到监听方法中

/**
 * 消费异常处理器
 */
@Configuration
public class MyConsumerAwareErrorHandler {

    @Bean
    public ConsumerAwareListenerErrorHandler consumerAwareListenerErrorHandler() {
        return new ConsumerAwareListenerErrorHandler() {
            @Override
            public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) {
                // 编写消费者异常处理逻辑
                System.out.println("消费发生异常...");
                System.out.println("异常信息:" + e.getMessage());
                return null;
            }
        };
    }

}

模拟消费者消费异常

    /**
     * 消费异常处理
     * @param record 消息
     */
    @KafkaListener(topics = {"errorHandler"}, errorHandler = "consumerAwareListenerErrorHandler")
    public void errorHandlerListener(ConsumerRecord<Object, Object> record) {
        throw new RuntimeException("模拟接收消息异常...");
    }

测试结果
在这里插入图片描述

消费者消息过滤

消费者进行消息的过滤需要编写过滤器

/**
 * 消息过滤器
 */
@Component
public class MyKafkaFilter {

    @Autowired
    ConsumerFactory<Object, Object> consumerFactory;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        // 设置被过滤的消息将被丢弃
        factory.setAckDiscarded(true);
        // 设置消息过滤的策略
        factory.setRecordFilterStrategy(consumerRecord -> {
            if (consumerRecord.value().toString().length() < 5) {
                // 返回 false 则消息不被过滤
                return false;
            }
            // 返回 true 则消息被过滤
            System.out.println("消息不合格~被过滤丢弃~");
            return true;
        });
        return factory;
    }

}

编写消费者消费方法

    /**
     * 消费者消息过滤
     * @param record 消息
     */
    @KafkaListener(topics = {"msgFilter"}, containerFactory = "listenerContainerFactory")
    public void msgFilterListener(ConsumerRecord<Object, Object> record) {
        // 方法内定义消息的处理逻辑...
        System.out.println("topic:" + record.topic());
        System.out.println("partition:" + record.partition());
        System.out.println("msg:" + (String) record.value());
    }

测试结果
根据过滤逻辑,未被过滤的消息,正常被消费
在这里插入图片描述
被过滤掉的消息,执行过滤逻辑,消息未被消费
在这里插入图片描述

消费者消息转发

消费者可以在监听到消息后,接受消息并对消息进行处理,并将处理后的消息转发到另外的主题,只需要添加注解@sendTo即可

    /**
     * 消费者消息转发
     * @param record 消息
     */
    @KafkaListener(topics = {"sendTo"})
    @SendTo("testTopic")
    public String sendToListener(ConsumerRecord<Object, Object> record) {
        // 此处编写消息处理逻辑,处理完成后将处理后的消息转发至目标主题
        return record.value()+"--我被处理了~";
    }

进行测试,发送消息内容为:“Hi~Kafka~”
在这里插入图片描述
消费者进行监听,接收到消息后进行处理并转发到主题testTopic,并被对应的消费者处理
在这里插入图片描述

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐