一、前期准备 POM文件引入依赖

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

二、自动配置

1 前言(了解)

自动配置实现在 org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
配置类为:

@ConfigurationProperties(prefix = "spring.kafka")
public class KafkaProperties {
 

}

2 、配置文件 application.yml配置文件(在项目里面配置文件配置)

spring:
  kafka:
    # kafka集群信息
    bootstrap-servers: 192.168.153.162:9092
    # 生产者配置
    producer: 
      # 设置大于0的值,则客户端会将发送失败的记录重新发送
      retries: 3 
      #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。16M
      batch-size: 16384
      linger: 1
      # 设置生产者内存缓冲区的大小。#32M
      buffer-memory: 33554432
      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
      acks: 1
      # 指定消息key和消息体的编解码方式 值的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      # 消费者组
      group-id: test 
      # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录  当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      auto-offset-reset: earliest
      # 自动提交的时间间隔  刷新间隔时间,负值失败时候刷新,0每次发送后刷新
      auto-commit-interval: 100
      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: true
      # 在侦听器容器中运行的线程数。
      concurrency: 5
      #如果在这个时间内没有收到心跳,该消费者会被踢出组并触发{组再平衡 rebalance}
      session.timeout.ms: 600000			
      # 键的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3、启动项目完成基础

三、自定义配置

1、前言

配置类org.springframework.boot.autoconfigure.kafka.KafkaProperties中并没有涵盖所有的org.apache.kafka.clients.producer.ProducerConfig和org.apache.kafka.clients.consumer.ConsumerConfig中的配置,这就导致某些特殊配置不能依赖spring boot自动创建,需要我们手动创建Poducer和comsumer。

2 、配置文件 application.yml配置文件(在项目里面配置文件配置)

spring:
  kafka:
    # kafka集群信息
    bootstrap-servers: 192.168.153.162:9092
    # 生产者配置
    producer: 
      # 设置大于0的值,则客户端会将发送失败的记录重新发送
      retries: 3 
      #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。16M
      batch-size: 16384
      linger: 1
      # 设置生产者内存缓冲区的大小。#32M
      buffer-memory: 33554432
      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
      acks: 1
      # 指定消息key和消息体的编解码方式 值的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      # 消费者组
      group-id: test 
      # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录  当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      auto-offset-reset: earliest
      # 自动提交的时间间隔  刷新间隔时间,负值失败时候刷新,0每次发送后刷新
      auto-commit-interval: 100
      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: true
      # 在侦听器容器中运行的线程数。
      concurrency: 5
      #如果在这个时间内没有收到心跳,该消费者会被踢出组并触发{组再平衡 rebalance}
      session.timeout.ms: 600000			
      # 键的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3、生产端自定义配置例子:

@EnableKafka:这个注解用来启用kafka相关注解配置功能

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

@Configuration
@EnableKafka
public class KafkaProducerConfiguration {

    @Value("${spring.kafka.bootstrap-servers:192.168.153.162:9092}")
    private String servers;
    @Value("${spring.kafka.producer.retries:3}")
    private int retries;
    @Value("${spring.kafka.producer.batch-size:16384}")
    private int batchSize;
    @Value("${spring.kafka.producer.linger:1}")
    private int linger;
    @Value("${spring.kafka.producer.buffer-memory:33554432}")
    private int bufferMemory;
    // 创建生产者配置map,ProducerConfig中的可配置属性比spring boot自动配置要多
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        //设置重试次数
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        //达到batchSize大小的时候会发送消息
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        //延时时间,延时时间到达之后计算批量发送的大小没达到也发送消息
        props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
        //缓冲区的值
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
//		props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "cn.ztuo.bitrade.kafka.kafkaPartitioner");
        //序列化手段
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);


//        //producer端的消息确认机制,-1和all都表示消息不仅要写入本地的leader中还要写入对应的副本中
//        props.put(ProducerConfig.ACKS_CONFIG, -1);
//        //单条消息的最大值以字节为单位,默认值为1048576
//        props.put(ProducerConfig.LINGER_MS_CONFIG, 10485760);
//        //设置broker响应时间,如果broker在60秒之内还是没有返回给producer确认消息,则认为发送失败
//        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
//        //指定拦截器(value为对应的class)
//        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.te.handler.KafkaProducerInterceptor");
//        //设置压缩算法(默认是木有压缩算法的)
//        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "LZ4");


        return props;
    }

    /**
     * 不使用spring boot的KafkaAutoConfiguration默认方式创建的DefaultKafkaProducerFactory,重新定义
     * @return
     */
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
    /**
     * 不使用spring boot的KafkaAutoConfiguration默认方式创建的KafkaTemplate,重新定义
     * @return
     */
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<String, String>(producerFactory());
    }

}

4、消费端自定义配置例子:


import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
//这里创建了对应类型的bean之后,org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration中的对应Bean定义将不起作用。
@Configuration
@EnableKafka
public class KafkaConsumerConfiguration {

    @Value("${spring.kafka.bootstrap-servers:192.168.153.162:9092}")
    private String servers;
    @Value("${spring.kafka.consumer.enable-auto-commit:true}")
    private boolean enableAutoCommit;
    @Value("${spring.kafka.consumer.auto-commit-interval:100}")
    private String autoCommitInterval;
    @Value("${spring.kafka.consumer.group-id:test}")
    private String groupId;
    @Value("${spring.kafka.consumer.auto-offset-reset:earliest}")
    private String autoOffsetReset;
    @Value("${spring.kafka.consumer.session.timeout.ms:120000}")
    private String sessionTimeout;
    @Value("${spring.kafka.consumer.concurrency:5}")
    private int concurrency;
    //构造消费者属性map,ConsumerConfig中的可配置属性比spring boot自动配置要多
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        //kafka集群信息
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        //是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        //自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //注意该值不要改得太大,如果Poll太多数据,而不能在下次Poll之前消费完,则会触发一次负载均衡,产生卡顿。
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
        //# 消费者组
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        //earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录  当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        // #如果在这个时间内没有收到心跳,该消费者会被踢出组并触发{组再平衡 rebalance}
        //两次Poll之间的最大允许间隔。
        //消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Consumer Group移除并触发Rebalance,默认30s。 单位毫秒
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        return propsMap;
    }
    /**
     * 不使用spring boot默认方式创建的DefaultKafkaConsumerFactory,重新定义创建方式
     * @return
     */
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    /**
     * 它具有并发属性。例如, container.setConcurrency(3) 创建了三个KafkaMessageListenerContainer实例。     *
     * 如果您提供了六个TopicPartition实例并且并发数为 3;每个容器有两个分区。对于五个 TopicPartition 实例,
     * 两个容器获得两个分区,第三个获得一个分区。如果并发数大于 TopicPartitions 的数量,则向下调整并发性,使每个容器获得一个分区。
     * 配置批处理侦听器
     *
     * 从版本1.1开始,可以将@KafkaListener方法配置为接收从消费者调查接收的整批消费者记录。配置监听器容器工厂创建一批听众,
     * 设置的的batchListener属性ConcurrentKafkaListenerContainerFactory来true。
     *
     * 我们可以选择BatchErrorHandler使用ConcurrentKafkaListenerContainerFactory#getContainerProperties().setBatchErrorHandler()
     * 并提供批处理错误处理程序来创建一个。
     *
     * 我们可以通过将Spring Kafka设置为ConsumerConfig.MAX_POLL_RECORDS_CONFIG适合您的值来配置Spring Kafka来设置批量大小的上限。默认情况下,
     * 动态计算每批中接收的记录数。在以下示例中,我们将上限配置为5。
     *
     *
     *
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        //设置批量消费
        factory.setBatchListener(true);
        factory.setMissingTopicsFatal(false);
        factory.getContainerProperties().setPollTimeout(1500);
        factory.setBatchListener(true);
        return factory;
    }

}

四、消费者

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class KafkaMessageListener {
    private static final Logger logger = LoggerFactory.getLogger(KafkaMessageListener.class);



    /**
     * 因为我在配置类中设置了批量监听,所以此处 listen 方法的入参是List:List<ConsumerRecord<String, String>>。
     *
     * 从版本1.1开始,可以将@KafkaListener方法配置为接收从消费者调查接收的整批消费者记录。配置监听器容器工厂创建一批听众,
     * 设置的的batchListener属性ConcurrentKafkaListenerContainerFactory来true。
     *
     * 我们可以选择BatchErrorHandler使用ConcurrentKafkaListenerContainerFactory#getContainerProperties().setBatchErrorHandler()
     * 并提供批处理错误处理程序来创建一个。
     *
     * 我们可以通过将Spring Kafka设置为ConsumerConfig.MAX_POLL_RECORDS_CONFIG适合您的值来配置Spring Kafka来设置批量大小的上限。默认情况下,
     * 动态计算每批中接收的记录数。在以下示例中,我们将上限配置为5。
     * topic1,topic2
     * "#{'${spring.kafka.topics:test}'.split(',')}"
     * "#{'${kafka.consumer.topics}'.split(',')}"
     *(topics ="#{'${kafka.consumer.topics}'.split(',')}")
     */
//    @KafkaListener(topics = {"${spring.kafka.topic:test}"})
//    public void listen(List<ConsumerRecord<String, String>> recordList) {
//        for (ConsumerRecord<String,String> record : recordList) {
//            // 打印消息的分区以及偏移量
//            logger.info("Kafka Consume partition:{}, offset:{}", record.partition(), record.offset());
//            //获取topic
//            String topic = record.topic();
//            String value = record.value();
//            if(StringUtils.isNotBlank(value)){
//                logger.info("value = " + value);
//                // 处理业务逻辑 ...
//            }
//
//        }
//    }

    @KafkaListener(topics = "#{'${spring.kafka.topics:test,test1}'.split(',')}")
    public void msgListen(List<ConsumerRecord<String, String>> recordList) {
        for (ConsumerRecord<String,String> record : recordList) {
            // 打印消息的分区以及偏移量
            logger.info("Kafka Consume partition:{}, offset:{}", record.partition(), record.offset());
            //获取topic
            String topic = record.topic();
            String value = record.value();
            if(StringUtils.isNotBlank(value)){
                logger.info("value = " + value);
                // 处理业务逻辑 ...
            }

        }
    }


//    // kafka的监听器,topic为"zhTest",消费者组为"zhTestGroup"
//    @KafkaListener(topics = "test")
//    public void listenZhugeGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
//        String value = record.value();
//        logger.info("value = " + value);
//        logger.info("record = " + record);
//        //手动提交offset
//        ack.acknowledge();
//    }

}

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐