1. 开发环境说明

本文主要介绍SpringBoot整合Kafka的完整过程,需要提前安装配置好开发环境。
SpringBoot:2.3
JDK:1.8
Maven:3.6.3
Zookeeper:3.6.3
Kafka:2.12-3
有不清楚如何安装Kafka的小伙伴,可以参阅阿里云服务器CentOS8安装Kafka,Kafka可以使用自带的Zookeeper,也可以自己安装,有需要了解如何安装Zookeeper的小伙伴,可以参阅Zookeeper3.6搭建单机版和集群版

2. 项目搭建

本文采用生产端和消费端分别新建一个项目整合Kafka

2.1. 搭建生产端项目

新建一个SpringBoot项目,在pom.xml文件中引入以下核心依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2.2. 搭建消费端项目

新建项目和pom.xml依赖同上

3. 生产端代码开发

3.1. yml配置文件

spring:
  kafka:
    bootstrap-servers: xx.xx.xx.xx:9092
    producer:
      batch-size: 16384 #批次大小,默认16k
      acks: -1 #ACK应答级别,指定分区中必须要有多少个副本收到消息之后才会认为消息成功写入,默认为1只要分区的leader副本成功写入消息;0表示不需要等待任何服务端响应;-1或all需要等待ISR中所有副本都成功写入消息
      retries: 3 #重试次数
      value-serializer: org.apache.kafka.common.serialization.StringSerializer #序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      buffer-memory: 33554432 #缓冲区大小,默认32M
      client-id: kafka.producer.client.id #客户端ID
      compression-type: none #消息压缩方式,默认为none,另外有gzip、snappy、lz4
      properties:
        retry.backoff.ms: 100 #重试时间间隔,默认100
        linger.ms: 0 #默认为0,表示批量发送消息之前等待更多消息加入batch的时间
        max.request.size: 1048576 #默认1MB,表示发送消息最大值
        connections.max.idle.ms: 540000 #默认9分钟,表示多久后关闭限制的连接
        receive.buffer.bytes: 32768 #默认32KB,表示socket接收消息缓冲区的大小,为-1时使用操作系统默认值
        send.buffer.bytes: 131072 #默认128KB,表示socket发送消息缓冲区大小,为-1时使用操作系统默认值
        request.timeout.ms: 30000 #默认30000ms,表示等待请求响应的最长时间
    topic-name: kafka-topic

以上配置中有3个时必须的
bootstrap-servers、key-serializer、value-serializer

3.2. 消息发送业务接口

public interface ProducerService {

    /**
     * 发送同步消息
     * @param topic
     * @param data
     * @throws ExecutionException
     * @throws InterruptedException
     */
    void sendSyncMessage(String topic, String data) throws ExecutionException, InterruptedException;

    /**
     * 发送普通消息
     * @param topic
     * @param data
     */
    void sendMessage(String topic, String data);

    /**
     * 发送带附加信息的消息
     * @param record
     */
    void sendMessage(ProducerRecord<String, String> record);

    /**
     * 发送Message消息
     * @param message
     */
    void sendMessage(Message<String> message);

    /**
     * 发送带key的消息
     * @param topic
     * @param key
     * @param data
     */
    void sendMessage(String topic, String key, String data);

    /**
     * 发送带key和分区的消息
     * @param topic
     * @param partition
     * @param key
     * @param data
     */
    void sendMessage(String topic, Integer partition, String key, String data);

    /**
     * 发送有分区,当前时间,key的消息
     * @param topic
     * @param partition
     * @param timestamp
     * @param key
     * @param data
     */
    void sendMessage(String topic, Integer partition, Long timestamp, String key, String data);
}

3.3. 消息发送业务接口实现

@Service
@RequiredArgsConstructor
public class ProducerServiceImpl implements ProducerService {

    private final KafkaTemplate<String, String> kafkaTemplate;
    private static final Logger logger = LoggerFactory.getLogger(ProducerServiceImpl.class);

    @Override
    public void sendSyncMessage(String topic, String data) throws ExecutionException, InterruptedException {
        SendResult<String, String> sendResult = kafkaTemplate.send(topic, data).get();
        RecordMetadata recordMetadata = sendResult.getRecordMetadata();
        logger.info("发送同步消息成功!发送的主题为:{}", recordMetadata.topic());
    }

    @Override
    public void sendMessage(String topic, String data) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, data);
        future.addCallback(success -> logger.info("发送消息成功!"), failure -> logger.error("发送消息失败!失败原因是:{}", failure.getMessage()));
    }

    @Override
    public void sendMessage(ProducerRecord<String, String> record) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(record);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                logger.error("发送消息失败!失败原因是:{}", throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, String> sendResult) {
                RecordMetadata metadata = sendResult.getRecordMetadata();
                logger.info("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition());
            }
        });
    }

    @Override
    public void sendMessage(Message<String> message) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                logger.error("发送消息失败!失败原因是:{}", throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, String> sendResult) {
                RecordMetadata metadata = sendResult.getRecordMetadata();
                logger.info("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition());
            }
        });
    }

    @Override
    public void sendMessage(String topic, String key, String data) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, data);
        future.addCallback(success -> logger.info("发送消息成功!"), failure -> logger.error("发送消息失败!失败原因是:{}", failure.getMessage()));
    }

    @Override
    public void sendMessage(String topic, Integer partition, String key, String data) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, partition, key, data);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                logger.error("发送消息失败!失败原因是:{}", throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, String> sendResult) {
                RecordMetadata metadata = sendResult.getRecordMetadata();
                logger.info("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition());
            }
        });
    }

    @Override
    public void sendMessage(String topic, Integer partition, Long timestamp, String key, String data) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, partition, timestamp, key, data);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                logger.error("发送消息失败!失败原因是:{}", throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, String> sendResult) {
                RecordMetadata metadata = sendResult.getRecordMetadata();
                logger.info("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition());
            }
        });
    }
}

4. 消费端代码开发

4.1. yml配置文件

spring:
  kafka:
    bootstrap-servers: xx.xx.xx.xx:9092
    consumer:
      auto-commit-interval: 5000 #自动提交消费位移时间隔时间
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 500 #批量消费每次最多消费多少条消息
      enable-auto-commit: true #开启自动提交消费位移
      auto-offset-reset: latest #其他earliest、none
      group-id: kafka.consumer.group #消费者组名称
      client-id: kafka.consumer.client.id #消费者客户端ID
      fetch-max-wait: 400 #最大等待时间
      fetch-min-size: 1 #最小消费字节数
      heartbeat-interval: 3000 #分组管理时心跳到消费者协调器之间的预计时间
      isolation-level: read_committed
    topic-name: kafka-topic

以上配置有4个参数时必须的
bootstrap-servers、group-id、key-deserializer、value-deserializer

4.2. 消费端监听类

@Component
public class ConsumerListener {

    private static final Logger logger = LoggerFactory.getLogger(ConsumerListener.class);

    @KafkaListener(topics = {"${spring.kafka.topic-name}"})
    public void listenerMessage(ConsumerRecord<String, String> record) {
        logger.info("接收到kafka消息键为:{},消息值为:{},消息头为:{},消息分区为:{},消息主题为:{}", record.key(), record.value(), record.headers(), record.partition(), record.topic());
    }
}

5. 测试

@SpringBootTest
public class KafkaProducerApplicationTest {

    @Autowired
    private ProducerService producerService;
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    @Value("${spring.kafka.topic-name}")
    private String topicName;

    @Test
    void cotextLoads() {
        producerService.sendMessage(topicName, "测试");
    }

    @Test
    void sendMessage1() {
        ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topicName, 0, System.currentTimeMillis(), "topic-key", "测试");
        producerRecord.headers().add("user", "zhangsan".getBytes());
        producerService.sendMessage(producerRecord);
    }

    @Test
    void sendMessage2() {
        String event = "测试";
        Map<String, Object> map = new HashMap<>();
        map.put("user", "zhangsan");
        MessageHeaders headers = new MessageHeaders(map);
        Message<String> message = MessageBuilder.createMessage(event, headers);
        kafkaTemplate.setDefaultTopic(topicName);
        producerService.sendMessage(message);
    }
}

测试结果如下:

2021-10-27 15:40:29.246  INFO 55088 --- [ntainer#0-0-C-1] com.xlhj.boot.listener.ConsumerListener  : 接收到kafka消息键为:null,消息值为:测试,消息头为:RecordHeaders(headers = [RecordHeader(key = user, value = [122, 104, 97, 110, 103, 115, 97, 110]), RecordHeader(key = spring_json_header_types, value = [123, 34, 117, 115, 101, 114, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 125])], isReadOnly = false),消息分区为:0,消息主题为:kafka-topic
Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐