网上关于SpringBoot 集成kafka的批量消费功能需要手动创建类(这篇文章不错:【弄nèng - Kafka】应用篇(三) —— Springboot整合Kafka(批量消费)_司马缸砸缸了-CSDN博客_kafka springboot批量消费https://blog.csdn.net/yy756127197/article/details/103895413),但是感觉对代码的改动比较大。因此花了点时间研究了一下如何简单的集成。

maven依赖

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.4.RELEASE</version>
    </parent>


    <!-- 自动引入kafka版本2.2.5.RELEASE -->
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
    </dependency>

配置文件

#============== kafka ===================
# 指定kafka server的地址,集群配多个,中间,逗号隔开
spring.kafka.bootstrap-servers=127.0.0.1:9092
#spring.kafka.producer.bootstrap-servers=127.0.0.1:9092
#=============== consumer  =======================
# 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
spring.kafka.consumer.group-id=mygroup
# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
spring.kafka.consumer.auto-offset-reset=latest
# enable.auto.commit:true --> 设置自动提交offset
spring.kafka.consumer.enable-auto-commit=false
#如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
spring.kafka.consumer.auto-commit-interval=100
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.listener.poll-timeout=1000
spring.kafka.listener.type=batch
spring.kafka.listener.concurrency=1
spring.kafka.consumer.max-poll-records=1000

最后几个批量设置的关键配置:

  • spring.kafka.listener.poll-timeout=1000  #看了说明也没搞懂这个参数什么作用,无论设置大小对后续的测试没有任何影响,有知道的大神还望点拨下
  • spring.kafka.listener.type=batch             #指定监听的模式,好多文章里没有此项
  • spring.kafka.listener.concurrency=1       #同时处理线程数,应设置与brocker数量一致,由于测试服务器没有多个brocker,因此不知道影响
  • spring.kafka.consumer.max-poll-records=1000 #每批最大条数,默认500

消费者代码:仅观察接收到的集合大小

    @KafkaListener(topics = "MY_TOPIC")
    public void myListener(List<ConsumerRecord<Integer, JSONObject>> list){
        System.out.print(list.size()+",");
        /* list.forEach( it -> {
            System.out.println(it.value());
        });*/
    }

    //参数可以使用泛型 List<ConsumerRecord<?, ?>>

生产者测试

public static void myTest(){
        Properties p = new Properties();
        p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p);

        Random random = new Random();
        for(int i=0;i<20000;i++){
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("productName", random.nextInt(10));
            jsonObject.put("couponCount", random.nextInt(50));
            jsonObject.put("siteId",random.nextInt(5));
            ProducerRecord<String, String> recordTopic5 =  new ProducerRecord<String, String>("MY_TOPIC", jsonObject.toString());
            kafkaProducer.send(recordTopic5);
            /*try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }*/
        }
        kafkaProducer.close();
    }

测试结果

模拟生产者发消息时的不连续性,对消费者批量接受消息数量的影响

没有sleep时:

sleep=1时:

sleep=10时:

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐