Kafka consumer

consumer概览

消费者组

消费者组定义:消费者使用一个消费者组名(即group.id)来标记自己,topic的每条消息都只会被发送到每个订阅它的消费者组的一个消费者实例上

  • 一个consumer group可能有若干个consumer实例
  • 对于同一个group而言,topic的每条消息只能被发送到group下的一个consumer实例上
  • topic消息可以被发送到多个group中

Kafka同时支持基于队列和基于发布/订阅的两种消息引擎模型,事实上Kafka是通过consumer group实现对这两种模型的支持

  • 所有consumer实例都属于相同group—实现基于队列的模型,每条消息只会被一个consumer实例处理
  • consumer实例都属于不同group—实现基于发布/订阅的模型,极端的情况是每个consumer实例都设置完全不同都group,这样kafka消息就会被广播到所有consumre实例

在这里插入图片描述

消费者组A、消费者组B同时订阅相同topic,topic包含P0、P1、P2、P3四个分区,消费者组A包含两个实例,每个实例分别消费两个分区

消费者组B包含四个实例,每个实例消费一个分区数据。Kafka将每个消费实例均匀的分配消费分区

consumer group是为了实现高伸缩性、高容错性的消费机制,组内多个实例可以同时读取Kafka消息,一旦某个consumer实例挂了,consumer group会立即将已崩溃consumer负责的分区转交给其他consumer负责,从而保证整个group可以继续工作,不会丢失数据—这个过程被称为重平衡

Kafka目前只提供单分区内的消费顺序,不会维护全局的消费顺序,如果需要实现topic全局的消息读取顺序,只能通过每个consumer group只包含一个consumer实例的方式实现

consumer group含义和特点:

  • consumer group下可以有一个或多个consumer实例,一个consumer实例可以是一个线程,也可以是运行在其他机器上的进程
  • group.id唯一标识一个consumer group
  • 对某个group而言,订阅topic的每个分区只能分配给该group下的一个consumer实例

位移

消费端位移与分区日志中的位移含义不同,每个consumer实例都会为它消费的分区维护属于自己的位置信息来记录当前消费了多少条消息。这里所说的位置就是指位移。很多消息引擎将消费端的offset保存在服务端,这样虽然实现简单但会遇到一些问题

  • broker从此变成了有状态的,增加了同步成本,影响伸缩性
  • 需要引入应答机制来确认消费成功
  • 由于要保存许多consumer的offset,会引入复杂的数据结构,造成不必要的资源浪费

Kafka让consumer group保存offset,同时引入检查点机制定期对offset进行持久化,简化应答机制

在这里插入图片描述

位移提交

consumer客户端需要定期向Kafka集群汇报自己消费数据的进度,这个过程被称为位移提交。位移提交这件事情对于consumer而言非常重要,它不仅表征了consumer端的消费进度,同时也决定了consumer端的消费语义保证

旧版本consumer会定期将位移信息提交到ZooKeeper下的固定节点上,该路径是/consumers/<group.id>/offsets//,
其中group.id、topic和partitionId是变化值,但ZooKeeper的做法并不合适,ZooKeeper本质上只是一个协调服务组件,并不适合作为位移信息存储组件,毕竟频繁高并发的读/写操作并不是ZooKeeper擅长的事情,在0.9.0.0版本Kafka推出新的consumer,consumer把位移提交到Kafka的一个内部topic(__consumer_offsets)上,这个topic是Kafka内置topic,不要直接操作该topic,删除或移动该topic的日志文件

内置topic __consumer_offsets

__consumer_offsets通常是给新版本consumer使用,但旧版本consumer也可以通过offsets.storage=kafka设置使用该topic,不过基本上不会这样用。该topic是由Kafka自动创建,因此不要去删除、操作该topic。考虑到生产环境可能很多consumer或consumer group,如果这些consumer同时提交位移,将加重__consumer_offsets的写入负载,因此社区特意为该topic创建50个分区,并对每个consumer的group.id做hash求模运算,
从而将负载分散到不同的__consumer_offsets分区上。通常情况下用户会在Kafka的日志目录发现__consumer_offsets的日志文件,编号从0到49每个文件是一个正常的Kafka topic日志文件目录,至少一个日志文件(.log)和两个索引文件(.index和.timeindex),只不过该日志文件保存的都是consumer消费的位移信息,__consumer_offsets每条消息的格式KV键值对Key是group.id + topic + 分区号,Value是offset。每当更新同一个
key的最新offset时,该topic会写入一条含有最新offset的消息,同时Kafka会定期对该topic执行压实操作,为每个消息key只保留最新offset消息,既避免对分区日志对修改,也控制__consumer_offset topic总体日志容量,反映最新消费进度

消费者组重平衡

rebalance消费者组重平衡本质上是一种协议,规定一个consumer group下所有consumer如何达成一致来分配订阅topic的所有分区。因此rebalance只对consumer group有效。假设我们有一个consumer group,有20个consumer实例,该group订阅了一个具有100个分区的topic,正常情况下,consumer group平均会为每个consumer分配5个分区,这个分配过程被称为rebalance

构建consumer

consumer程序实例

构建一个consumer group从指定Kafka topic消费消息

    package com.aim.kafka.client.consumer;

    import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.TopicPartition;

    import java.util.*;
    import java.util.regex.Pattern;

    public class ConsumerHandle {

        public void consumerMessage(String topic, String groupId) {
            Properties props = new Properties();
            /**
             * 指定一组host:port对,用于创建与Kafka broker服务器的Socket连接,可以指定多组,使用逗号分隔,对于多broker集群,只需配置
             * 部分broker地址即可,consumer启动后可以通过这些机器找到完整的broker列表
             */
            props.put("bootstrap.servers", "localhost:9092");
            /**
             * 指定group名字,能唯一标识一个consumer group,如果不显示指定group.id会抛出InvalidGroupIdException异常,通常为group.id
             * 设置一个有业务意义的名字即可
             */
            props.put("group.id", groupId);
            /**
             * 自动提交位移
             */
            props.put("enable.auto.commit", "true");
            /**
             * 位移提交超时时间
             */
            props.put("auto.commit.interval.ms", "1000");
            /**
             * 从最早的消息开始消费
             */
            props.put("auto.offset.reset", "earliest");
            /**
             * 指定消费解序列化操作。consumer从broker端获取的任何消息都是字节数组的格式,因此需要指定解序列化操作才能还原为原本对象,
             * Kafka对绝大部分初始类型提供了解序列化器,consumer支持自定义解序列化器org.apache.kafka.common.serialization.Deserializer
             */
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            /**
             * 对消息体进行解序列化,与key解序列化类似
             */
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            /**
             * 通过Properties实例对象构建KafkaConsumer对象,可同时指定key、value序列化器
             */
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

            /**
             * 独立consumer可以使用下面方式实现手动订阅
             */
            List<TopicPartition> topicPartitions = new ArrayList<>();
            topicPartitions.add(new TopicPartition("topic-name", 0));
            topicPartitions.add(new TopicPartition("topic-name", 1));
            consumer.assign(topicPartitions);

            /**
             * 订阅consumer group需要消费的topic列表
             */
            consumer.subscribe(Arrays.asList(topic));

            /**
             * 支持正则表达式指定
             */
            consumer.subscribe(Pattern.compile("kafka.*"));

            /**
             * 支持指定指定消费重平衡策略
             */
            consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

                }

                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {

                }
            });

            /**
             * 支持指定指定消费重平衡策略,最后的subscribe会覆盖之前的,因此不是增量式
             */
            consumer.subscribe(Pattern.compile("kafka.*"), new ConsumerRebalanceListener() {
                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

                }

                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {

                }
            });
            /**
             * 并行从订阅topic获取多个分区消息,为此新版本consumer的poll方法使用类似Linux的 selec I/O机制,
             * 所有相关的事件都发生在一个事件循环中,这样consuner端只使用一个线程就能完成所有类型I/o操作
             */
            try {
                while (true) {
                    /**
                     * 指定超时时间,通常情况下consumer拿到了足够多的可用数据,会立即从该方法返回,但若当前没有足够多数据
                     * consumer会处于阻塞状态,但当到达设定的超时时间,则无论数据是否足够都为立即返回
                     */
                    ConsumerRecords<String, String> records = consumer.poll(1000);
                    /**
                     * poll调用返回ConsumerRecord类分装的Kafka消息,之后会根据自己业务实现信息处理,对于consumer而言poll方法
                     * 返回即认为consumer成功消费了消息
                     */
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(),
                                record.value());
                    }
                }
            } finally {
                /**
                 * consumer程序结束后一定要显示关闭consumer以释放KafkaConuser运行过程中占用的各种系统资源
                 * KafkaConsumer.close():关闭consumer并等待30秒
                 * KafkaConsumer.close(timeout): 关闭consumer并最多等待给定的timeout秒
                 */
                consumer.close();
            }
        }


    }

构造consumer需要6个步骤

  • 构造一个java.util.Properties对象,至少指定bootstrap.servers、key.deserializer、value.deserializer和group.id的值
  • 使用上一步构建的Properties实例构造KafkaConsumer对象
  • 调用KafkaConsumer.subscribe方法订阅consumer group感兴趣topic列表
  • 循环调用KafkaConsumer.poll方法获取封装在ConsumerRecord的topic消息
  • 处理获取到的ConsumerRecord对象
  • 关闭KafkaConsumer

consumer主要参数

完整参数列表请点击这里查看

session.timeout.ms

  • 在0.10.1.0版本之前该参数包含两层含义:
    1.consumer group检测组内成员发送崩溃的时间,假设该参数设置为5分钟,当某个group成员崩溃了,消费者协调组有可能需要5分钟才能感知到崩溃,我们希望尽量缩短这个时间,以便更快的检测到consumer失败
    2.消息处理逻辑的最大时间,假设consumer两次poll之间时间间隔超过该参数所设置阈值,消费者协调组会认为这个consumer已经追不上组内其他成员的消费进度,因此会将consumer踢出组,该consumer负责分区会被分配给其他consumer,对于被踢出group后处理消息,consumer都无法提交位移,这意味着这些消息会被重复消费一遍。在0.10.1.0版本之后该参数只表示检测失败时间

max.poll.interval.ms

  • 设置consumer处理逻辑最大事件,即consumer两次poll之间时间间隔超过该参数所设置阈值,消费者协调组会认为这个consumer已经追不上组内其他成员的消费进度,因此会将consumer踢出组

auto.offset.reset

  • 指定无位移信息或位移越界时Kafka的应对策略,此参数只在无位移信息或位移越界才会生效。假设首次运行一个consumer group并且指定从头消费,显然该group会从头消费所有数据,一旦group成功提交位移后,重启group,group不会再从头消费,因为Kafka已经保存了该group的位移信息,因此会无视auto.offset.reset的设置

    目前该参数会有3个可能值:

    • earliest: 指定从最早的位移开始消费,这里最早的位移不一定是0
    • latest:指定从最新处位移开始消费
    • none:指定如果未发现位移信息或位移越界,则抛出异常

enable.auto.commit

  • 指定consumer是否自动提交位移,若为true,consumer在后台自动提交位移,否则用户需要手动提交位移,对于有较强精确处理一次语义需求的用户来说,最好将该参数设置为false,由用户自行处理位移提交问题

fetch.max.bytes

  • 指定consumer端单次获取数据的最大字节数,若实际业务消息很大,则必须设置该参数为一个较大值,否则consuemr将无法消费这些消息

max.poll.records

  • 该参数控制单次poll调用返回最大消息数,比较极端的做法是将该参数设置成1,每次poll只会返回1条消息,如果consumer端瓶劲在poll速度太慢,可以适当增加该参数值,如果用户的消息处理逻辑很轻,默认的500条消息通常不能满足实际消息处理需求

heartbeat.interval.ms

  • 当消费者协调组决定开启新一轮rebalance时,会以REBALANCE_IN_PROGRESS异常形式放到consuer心跳请求的response中,这样其他成员拿到response后才能知道需要重新加入group,显然这个过程越快越好,次参数就是用来设置心跳请求的间隔。比较推荐的做法是设置一个比较低的值,让group下的consumer成员能更快的感受新一轮rebalance开启,但该值必须小于session.timeout.ms,否则消费者协调组会认为该consuer已经dead

connection.max.idle.ms

  • 控制consumer到broker空闲Socket连接的关闭时间,默认值为9分钟,即空闲时间超过9分钟的socket连接将会被关闭,如果用户实际环境中不在乎这些Socket资源开销,推荐设置该参数为-1,即不关闭空闲连接

消息轮询

poll内部原理

consumer需要同时读取多个topic的多个分区消息,若实现并行的消息读取,一种方式是使用多线程,为每个要读取的分区创建一个专有线程去消费,旧版本consumer就是使用这种方式;另一种方式类似于Linux I/O模型的poll或select,使用一个线程来同时管理多个socket连接,即同时与多个broker实现消息的并行消费,新版本consumer采用这种设计

一旦consumer订阅了topic,所有消费逻辑包括coordinator的协调、消费者组的rebalance以及数据的获取都会在主逻辑poll方法的一次调用中被执行,这样用户很容易使用一个线程来管理consumer I/O操作

对于新版本consumer Kafka 1.0.0而言,是一个双线程Java进程,创建KafkaConsumer的线程被称为用户主线程,同时consumer在后台会创建一个心跳线程。KafkaConsumer的poll方法在用户主线程中运行,这也表明消费者组rebalance、消息获取、coordinator管理、异步任务结果的处理甚至位移提交等操作都运行在用户主线程中

poll使用方法

consumer订阅topic后通常以事件循环方式获取订阅方案并开启消息读取。poll方法根据当前consumer的消费位移返回消费集合。当poll首次被调用,新的消费者组会被创建并根据对应的位移重设策略来设定消费者组的位移,一旦consumer开始提交位移,每个后续rebalance完成后都会将位移设置为上次已提交位移。传递给poll方法的超时设定参数用于控制consumer等待消息的最大阻塞时间。有可能broker端无法立即满足consumer端的获取请求,比如consumer端要求一次至少获取10MB数据,但broker端无法立即全部给出此时consumer会阻塞等待数据不断累计并满足consumer需求。如果不想让consumer一直处于阻塞状态,可设定一个超时时间,则当consumer在超时时间内累计到需要的数据量则立即返回,当超过设定的超时时间仍然未累计到需要的数据量也立即返回

consumer是单线程设计,因此consumer应该运行在专属线程中,新版本consumer不是线程安全的,如果没显示同步锁机制,将同一个KafkaConsumer实例用在多个线程,kafka会抛出异常

KafkaConsumer的poll方法超时设置,一方面是作为超时设置本身的意义,另一方面对于想consumer能定期醒来做一些其他事情,比如定期执行日志操作。如果consumer程序纯粹是消费
消息并处理可以将超时时间设置为Long.MAX_VALUE,这种方式需要在另一个线程调用consumer.wakeup()方法触发consumer关闭,虽然KafkaConsumer不是线程安全,但此方法是线程安全。调用此方法后并不会马上退出消费,会在下一次poll时抛出异常停止消费。因此consumer需要定期执行其他任务:推荐poll(较小超时时间)+ 运行标识布尔变量退出方式;consumer不需要定期执行子任务: 推荐poll(Long.MAX_VALUE) + 捕获WakeupException异常方式

        try {
            while (true) {
                /**
                 * 指定超时时间,通常情况下consumer拿到了足够多的可用数据,会立即从该方法返回,但若当前没有足够多数据
                 * consumer会处于阻塞状态,但当到达设定的超时时间,则无论数据是否足够都为立即返回
                 */
                ConsumerRecords<String, String> records2 = consumer.poll(Long.MAX_VALUE);
                /**
                 * poll调用返回ConsumerRecord类分装的Kafka消息,之后会根据自己业务实现信息处理,对于consumer而言poll方法
                 * 返回即认为consumer成功消费了消息
                 */
                for (ConsumerRecord<String, String> record : records2) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(),
                            record.value());
                }
            }
        } catch (WakeupException e) {
            //忽略异常
        } finally {
            /**
             * consumer程序结束后一定要显示关闭consumer以释放KafkaConuser运行过程中占用的各种系统资源
             * KafkaConsumer.close():关闭consumer并等待30秒
             * KafkaConsumer.close(timeout): 关闭consumer并最多等待给定的timeout秒
             */
            consumer.close();
        }

消息轮询

consumer位移

consumer端需要为每个它要读取的分区保存消费进度,即分区中当前最新消费消息的位移,该位移被称为位移(offset)。consumer需要定期向Kafka提交自己的位置信息,这里的位移值通常是下一条待消费信息的位置,假设consume读取了某个分区的第N条消息,那么它应该提交位移值为N,因为位移是从0开始,位移为N的消息是第N+1条消息,下次consumer重启会从第N+1条消息开始消费,offset就是consumer端维护的位置信息,offset对于consumer非常重要,它是实现消息交付语义保证的基石,常见的3中消息语义保证如下:

  • 最多一次处理语义:消息可能丢失,但不会被重复处理
  • 最少一次处理语句:消息不会丢失,但可能被处理多次
  • 精确一次处理语义:消息一定会被处理且只会被处理一次

若consumer在消息消费之前就提交位移,便可实现最多一次处理语义,但如果consumer提交位移与消息消费之间奔溃,则consumer重启后会从新的offset位置开始消费。前面那条消息就丢失了;若提交位移在消息消费之后,则可实现最少一次处理语义。由于Kafka没办法保证这两步在一个事务中,因此Kafka默认提供最少一次处理语义

Kafka相关位置信息如下
在这里插入图片描述
每个位置信息对应含义如下

  • 上次提交位移:consumer最近一次提交的offset值
  • 当前位置:consumer已读取但尚未提交时的位移
  • 水位:也被称为高水位,严格意义上说它不属于consumer管理范围,而是属于分区日志的概念,对于水位线下的所有消息,consumer都可以读取,但无法读取水位线以上的消息
  • 日志终端位移:被称为日志最新位移,不属于consumer范畴,而是属于分区日志管辖,表示某个分区副本当前保存消息对应的最大位移值,正常情况下日志终端位移不会比水位值小,事实上只有当所有分区副本都保存了某条消息,该分区leader副本才会向上移动水位值

新版本consumer位移管理

consumer会在Kafka集群的所有broker中选择一个broker作为consumer group的coordinator,用于实现组成员管理、消费分配方案制定以及提交位移等。为每个组选择对应coordinator的依据是__consumer_offsets内部topic,该内部topic唯一目的就是保存consumer提交位移。

当消费者首次启动,由于没有初始位移信息coordinator必须为其确定初始位移值,这就是auto.offset.reset的作用,通过该参数确定初始消费位移

当consumer运行一段时间后,必须要提交自己的消费位移值,如果consumer崩溃或关闭,它负责的分区会被分配给其他consumer,因此要在其他consumer读取这些分区前做好位移提交工作,否则会出现重复消费消息的情况

consumer提交位移的主要机制是通过向所属coordinator发送位移提交请求来实现,每个位移提交请求都会往__consumer_offsets对应分区追加写入一条消息。消息的key是group.id、topic和分区的元组,而value就是位移值,如果consumer为同一个group的同一个topic分区提交多次位移,那么__consumer_offsets对应分区会有多条key相同而value不相同的消息,但我们只关心最近一条消息,Kafka通过压实策略来处理冗余的消息

自动提交与手动提交

位移提交策略对于提供消息交付语义至关重要,默认情况下consumer是自动提交位移,自动提交间隔5秒,通过设置auto.commit.interval.ms参数可以控制自动提交间隔。但自动提交位移用户不能细粒度处理位移提交,特别是在有较强精确一次处理语义时,这种情况下用户可以使用手动提交位移。手动提交位移是由用户确定消息何时被真正处理完并可以提交位移。在consumer应用场景中,用户需要对poll方法返回的消息集合中的消息执行业务级处理,用户想要确保消息真正被处理完成后再提交位移,如果使用自动提交则无法保证这种时序性,因此这种情况下必须使用手动提交位移

    Properties props = new Properties();
        props.put("bootstrap.server", "localhost:9092");
        props.put("group.id", "test-group");
        props.put("enable.auto.commit", "false");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("test-topic"));
        final int minBatchSize = 500;
        List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
        while (true){
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for(ConsumerRecord<String, String> record : records){
                buffer.add(record);
            }
            if(buffer.size() >= minBatchSize){
                insertIntoDb(buffer);
                consumer.commitAsync();
                buffer.clear();
            }
        }

consumer持续将消息保存到缓冲区,当消息达到500条则写入到数据库中,并调用KafkaConsumer.commitSync方法进行手动位移提交,然后清空缓存。若成功插入数据库之后但提交位移语句执行之前consumer程序崩溃,由于未提交位移,consumer重启后会重新处理之前的一批消息并将他们再次插入数据库,造成消息重复消费

手动提交位移API进一步细分为同步手动提交和异步手动提交,即commitSync和commitAsync,如果调用的是commitSync,用户程序会等待位移提交结束才会执行下一条语句。如果调用commitAsync,则是一个异步非阻塞调用,consumer在后续poll调用时轮询该位移提交的结果,这里的异步提交位移不是指consumer使用单独线程进行位移提交,实际上consumer依然会在用户主线程poll方法中轮询这次异步提交结果,只是该提交发起时此方法是不会阻塞的,因此被称为异步提交

用户除了使用无参版的提交位移方法,也可以使用指定带参重载方法指定为哪些分区提交位移,特别需要注意的是,提交的位移是consumer下一条待读取消息的位移

    try{
            //在其他线程通过控制running参数实现消费的停止
        while (running) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                for (ConsumerRecord<String, String> record : partitionRecords) {
                    System.out.println(record.offset() + ":" + record.value());
                }
                long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
            }
        }
    } finally {
        consumer.close();
    }

重平衡

rebalance概览

consumer group的rebalance本质上是一组协议,它规定consumer group是如果达成一致来分配订阅topic的所有分区,假设某个组下有20个consumer实例,该组订阅了一个有着100个分区的topic,正常情况下,kafka会为每个consumer平均分配5个分区,这个分区分配过程就被称为rebalance,当consumer成功执行rebalance后,组订阅topic的每个分区
会分配给组内的一个consumer实例

旧版本consumer依托于ZooKeeper进行rebalance不同,新版本consumer使用Kafka内置的一个全新的组协调协议,对于每个组而言,Kafka的某个broker被选举为组协调者,coordinator负责对组的状态进行管理,他的主要职责就是当新成员到达时促成组内所有成员达成新的分区分配方案,即coordinator负责对组执行rebalance操作

rebalance触发条件

组rebalance触发的条件有三个

  • 组成员发生变更,比如新consumer加入组,已有consumer离开组,已有consumer奔溃触发rebalance

  • 组订阅topic数发生变更,比如使用正则表达式的订阅,当匹配正则表达式的新topic被创建时则会触发rebalance

    • 组订阅topic的分区数发生变更,比如使用命令行脚本增加了订阅topic的分区数

在真实场景中引发rebalance最常见的原因就是违背第一个条件,当consumer无法在指定时间内完成消息的处理,那么coordinator会认为consumer已经崩溃,引发新一轮rebalance。鉴于目前一次rebalance操作的开销很大,生产环境中用户一定要结合自身业务特点仔细仔细调优consumer参数request.timeout.ms、max.poll.records和max.poll.interval.ms,以避免不必要的rebalance出现

rebalance分区分配

Kafka新版本consumer默认提供了3钟分配策略,分别是range策略、round-robin策略和stick策略

  • range策略:针对每个topic,将每个topic的所有分区按数字顺序排列,将consumer按字典顺序排列,将分区数除以消费者数,以确定每个消费者分配的分区数,如果没有平均分配前几个消费者将拥有一个额外分区。具体计算公式 n = 分区数/消费者数,m = 分区数%消费者数,前m个消费者分配n+1个分区,后(消费者数-m)个消费者分配n个分区。例如:
    假设有两个消费者c0和c1,两个主题t0和t1,每个主题分别三个分区t0p0、t0p1、t0p2、t1p0、t1p1、t1p2,则c0将消费t0p0、t0p1、t1p0、t1p1,c1将消费t0p2、t1p2
  • round-robin策略:把所有topic的所有分区顺序排列,然后轮询分配给各个consumer。例如:假设我们有三个消费者c0、c1、c2,三个主题t0、t1、t2,分别对应分区数为1、2、3,
    将所有topic所有分区依次排序t0p0、t1p0、t1p1、t2p0、t2p1、t2p2。c0订阅t0,c1订阅t0、t1,c2订阅t0、t1、t2,则每个消费者消费分区分配为c0:t0p0; c1:t1p0 c2:t1p1、t2p0、t2p1、t2p2
  • stick策略:采用有粘性的策略对所有consumer实例进行分配,可以规避极端情况下的数据倾斜并且在两次rebalance间最大限度地维持之前的分配方案。其主要目标有两点:
    1.主题分区仍然尽可能均匀地分布
    2.分区的分配尽可能与上次分配保持一致

在实际再平衡过程中第一目标优先于第二目标。例:有三个消费者c0、c1、c2,三个主题t0、t1、t2分区分别为1、2、3个t0p0、t1p0、t1p1、t2p0、t2p1、t2p2,c0订阅t0,c1订阅t0、t1,c2订阅t0、t1、t2,采用round-robin策略将有如下分配
c0:t0p0
c1:t1p0
c2:t1p1、t2p0、t2p1、t2p2

stick策略分区分配:
c0:t0p0
c1:t1p0、t1p1
c2:t2p0、t2p1、t2p2

如果c0奔溃退出消费组,使用round-robin策略将保留三个上次分区分配
c1:t0p0、t1p1
c2:t1p0、t2p0、t2p1、t2p

采用stick策略将保留5个上次分区分配
c1:t0p0、t1p0、t1p1
c2:t2p0、t2p1、t2p2

rebalance generation

consumer group可以执行任意次rebalance,为了隔离每次rebalance数据,Kafka通过rebalance generation加以区分,在consumer中他是一个整数,通常从0开始,每次rebalance generation都会加1,引入rebalanc generation主要是为了保护consumer group,特别是无效offset提交。比如上一届consumer成员由于某些原因导致未及时提交offset,但在rebalance后产生了新一届group成员,而这次延迟提交的offset携带的是旧的generation信息,因此这次提交会被consumer group拒绝

rebalance协议

rebalance本质上是一组协议,group和coordinator共同使用这组协议完成group rebalance,最新版本Kafka提供5个协议来处理rebalance相关事宜

  • JoinGroup请求:consumer请求加入组
  • SyncGroup请求:group leader把分配方案同步更新到组内所有成员
  • Heartbeat请求:consumer定期向coordinator汇报心跳表明自己依然存活
  • LeaveGroup请求:consumer主动通知coordinator该consumer即将离组
  • DescribeGroup请求:查看组的所有信息,包括成员信息、协议信息、分配方案及订阅信息等。该请求类型主要提供管理员使用,coordinator不使用该请求执行rebalance

在rebalance过程中coordinator主要处理consumer发送过来的JoinGroup和SyncGroup请求,当consumer主动离开组时会发生LeaveGroup请求给coordinator。

成功rebalance后,组内所有consumer都需要定期向coordinator发送HeartBeat请求,每个consumer也根据Heartbeat请求的响应中是否包含REBALANCE_IN_PROGRESS来判断当前group是否开启新一轮rebalance

rebalance流程

consumer group在执行rebalance之前必须确定coordinator所在的broker,并创建与该broker相互通信的socket连接,确定coordinator的算法与确定offset被提交到__consumer_offsets目标分区的算法相同

  • 计算Math.abs(groupID.hashCode) % offsets.topic.num.partitions参数值(默认是50),假设为10
  • 寻找__consumer_offsets分区10的leader副本所在的broker,该broker即为这个group的coordinator

成功连接coordinator之后便可以执行rebalance操作,目前rebalance主要分为两步:加入组和同步更新分配方案

  • 加入组:组内所有consumer向coordinator发送JoinGroup请求,当搜集全JoinGroup请求后,coordinator从中选择一个consumer担任group的leader,并把所有成员信息以及他们订阅的信息发送给leader。需要注意的是group的leader和coordinator不是一个概念,leader是某个consumer实例,coordinator通常是Kafka集群中的一个broker,leader而非coordinator负责为整个group的所有成员制定分配方案
  • 同步更新分配方案:这一步leader开始制定分配方案,即根据分配策略确定每个consumer都负责哪些topic的哪些分区,一旦分配完成,leader会把这个分配方案封装进SyncGroup请求
    并发送给coordinator,并不是只有leader会发送,组内所有成员都会发送SyncGroup请求,不过只有leader发送的SyncGroup请求中包含了分配方案,coordinator接收到分配方案后会把每个consumer的方案单独抽取出来作为SyncGroup请求的response返回给各自的consumer

consumer group分配方案是在consumer端执行,Kafka将这个权力下放给客户端主要是因为这样做可以有更好的灵活性。比如同一机架上的分区数据被分配给相相同机架上的consumer,减少网络传输的开销而且即使以后分区策略发生变更,也只需要重启consumer应用即可,不必重启Kafka服务器

rebalance监听器

新版本consumer默认将位移提交到__consumer_offsets中,Kafka也支持用户将位移提交到外部存储中,比如数据库。用户可以通过使用rablance监听器,实现位移的外部存储,但使用rebalance监听器的前提是用户使用consumer group,对于独立consumer或直接手动分配分区,rebalance监听器是无效的

rebalance监听器有一个主要的接口回调类ConsumerRebalanceListener,里面有两个方法onPartitionsRevoked和onPartitionAssigned,在coordinator
开启新一轮rebalance前onPartitionsRevoked方法会被调用,而rebalance完成后会调用onPartitionsAssigned方法

rebalance监听器常见的用法就是手动提交位移到第三方存储及rebalance前后执行一些必要的审计操作

    public void consumerMessage(Properties props, Collection topics,
                                ConsumerFunction consumerFunction,
                                OffsetExternalStore offsetExternalStore,
                                OffsetExternalRead offsetExternalRead){
        final KafkaConsumer consumer = new KafkaConsumer(props);
        final AtomicLong totalRebalanceTimeMs = new AtomicLong(0L);
        final AtomicLong joinStart = new AtomicLong(0L);
        consumer.subscribe(topics, new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                partitions.forEach(partition -> {
                    offsetExternalStore.saveOffsetInExternalStore(consumer.position(partition));
                });
                joinStart.set(System.currentTimeMillis());
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                totalRebalanceTimeMs.addAndGet(System.currentTimeMillis() - joinStart.get());
                partitions.forEach(partition -> {
                    //将consumer当前位移指定到读取位移处并从该位移处开始读取消息
                    consumer.seek(partition, offsetExternalRead.readOffsetExternal(partition));
                });
            }
        });
        try{
            while (true){
                consumerFunction.consumer(consumer.poll(Long.MAX_VALUE));
            }
        }catch (WakeupException e){
            consumer.close();
        }
    }


    @FunctionalInterface
    public interface OffsetExternalStore{
        /**
         * 保存位移
         */
        void saveOffsetInExternalStore(long offset);
    }

    @FunctionalInterface
    public interface OffsetExternalRead{
        /**
         * 读取位移
         */
        long readOffsetExternal(TopicPartition topicPartition);
    }

    @FunctionalInterface
    public interface ConsumerFunction{
        /**
         * 消费消息
         */
        void consumer(ConsumerRecords<String, String> records);
    }

注意:如果使用自动提交位移,则不需要在rebalance监听器中再提交位移,consumer每次rebalance时会检查用户是否启用了自动提交位移,如果是,它会帮用户执行提交。鉴于consumer通常要求rebalance在很短时间内完成,用户不应该在rebalance监听器的两个方法中放入执行时间很长的逻辑,特别是一些
阻塞方法

解序列化

默认序列化器

Kafka consumer端获取消息的格式是字节数组,consumer需要把它还原回指定的对象类型,而这个类型通常与序列化对象类型一致

Kafka 1.0.0版本默认提供了多达十几中deserializer:

  • ByteArrayDeserializer:什么都不做

  • ByteBufferDeserializer:解序列化为ByteBuffer

  • BytesDeserializer:解序列化Kafka自定义的Bytes类

  • DoubleDeserializer:解序列化Double类型

  • IntegerDeserializer:解序列化Integer类型

  • LongDeserializer:解序列化Long类型

  • StringDeserializer:解序列化String类型

但对于复杂类型,需要用户自定义deserializer。只需在构造consumer时指定参数key.deserializer和value.deserializer值就可使用指定的序列化和解序列化器

自定义序列化器

用户自定义deserializer需要完成一下三步:

  • 定义或复用serializer的数据对象格式

  • 创建自定义deserializer类,实现org.apache.kafka.common.serialization.Deserializer接口,在deserializer方法中实现deserialize逻辑

  • 在构造KafkaConsumer的Properties对象中设置key.deserializer

UserDeserializer类

    package com.aim.kafka.client.deserializer;

    import com.aim.kafka.client.serializer.User;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import org.apache.kafka.common.serialization.Deserializer;

    import java.util.Map;

    public class UserDeserializer implements Deserializer {

        private ObjectMapper objectMapper;

        @Override
        public void configure(Map configs, boolean isKey) {
            objectMapper = new ObjectMapper();
        }

        @Override
        public Object deserialize(String topic, byte[] data) {
            User user = null;
            try{
                user = objectMapper.readValue(data, User.class);
            } catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
                return user;
            }
        }

        @Override
        public void close() {

        }
    }

多线程消费实例

KafkaConsumer是非线程安全的,而KafkaProducer是线程安全的,因此用户可以在多线程中放心的使用同一个KafkaProducer实例,这也是推荐的做法,因为这通常比每个线程维护一个KafkaProducer实例效率要高。而对于consumer用户无法在多个线程中共享一个KafkaConsumer实例,对于想要在多线程中实现消费有两种方案

每个线程维护一个KafkaConsumer

用户创建多个线程,每个线程创建专属于该线程的KafkaConsumer实例
在这里插入图片描述
consumer group由多个线程的KafkaConsumer组成,每个线程负责消费固定数量的分区

    //ConsumerRunnable类
    package com.aim.kafka.client.consumer;

    import org.apache.kafka.clients.consumer.KafkaConsumer;

    import java.util.Collection;
    import java.util.Properties;

    public class ConsumerRunnable implements Runnable{

        private final KafkaConsumer kafkaConsumer;
        private final ConsumerHandle.ConsumerFunction consumerFunction;

        public ConsumerRunnable(Properties props, Collection topics,
                                ConsumerHandle.ConsumerFunction consumerFunction){
            this.kafkaConsumer = new KafkaConsumer(props);
            kafkaConsumer.subscribe(topics);
            this.consumerFunction = consumerFunction;
        }

        @Override
        public void run() {
            while (true){
                this.consumerFunction.consumer(this.kafkaConsumer.poll(200));
            }
        }
    }


    //ConsumerGroup类
    package com.aim.kafka.client.consumer;

    import org.springframework.util.Assert;

    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.List;
    import java.util.Properties;

    public class ConsumerGroup {
        private List<ConsumerRunnable> consumers = new ArrayList<>();

        public ConsumerGroup(Properties props, Collection topics,
                             List<ConsumerHandle.ConsumerFunction> consumerFunctions) {
            Assert.notEmpty(consumerFunctions, "请给出需要消费的处理实例");
            consumerFunctions.forEach(consumerFunction -> {
                consumers.add(new ConsumerRunnable(props, topics, consumerFunction));
            });
        }

        public void execute() {
            consumers.forEach(task -> {
                new Thread(task).start();
            });
        }
    }




    //ConsumerRunnableTests测试类
    package com.aim.kafka;

    import com.aim.kafka.client.consumer.ConsumerGroup;
    import com.aim.kafka.client.consumer.ConsumerHandle;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.junit.runner.RunWith;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;

    import java.util.ArrayList;
    import java.util.List;
    import java.util.Properties;

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class ConsumerRunnableTests {

        public void consumerRunnable(){
            Properties props = new Properties();
            /**
             * 指定一组host:port对,用于创建与Kafka broker服务器的Socket连接,可以指定多组,使用逗号分隔,对于多broker集群,只需配置
             * 部分broker地址即可,consumer启动后可以通过这些机器找到完整的broker列表
             */
            props.put("bootstrap.servers", "localhost:9092");
            /**
             * 指定group名字,能唯一标识一个consumer group,如果不显示指定group.id会抛出InvalidGroupIdException异常,通常为group.id
             * 设置一个有业务意义的名字即可
             */
            props.put("group.id", "order");
            /**
             * 自动提交位移
             */
            props.put("enable.auto.commit", "true");
            /**
             * 位移提交超时时间
             */
            props.put("auto.commit.interval.ms", "1000");
            /**
             * 从最早的消息开始消费
             */
            props.put("auto.offset.reset", "earliest");
            /**
             * 指定消费解序列化操作。consumer从broker端获取的任何消息都是字节数组的格式,因此需要指定解序列化操作才能还原为原本对象,
             * Kafka对绝大部分初始类型提供了解序列化器,consumer支持自定义解序列化器org.apache.kafka.common.serialization.Deserializer
             */
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            /**
             * 对消息体进行解序列化,与key解序列化类似
             */
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

            List<ConsumerHandle.ConsumerFunction> consumerFunctions = new ArrayList<>();

            consumerFunctions.add(new ConsumerHandle.ConsumerFunction() {
                @Override
                public void consumer(ConsumerRecords<String, String> records) {
                    //消费实例1
                }
            });

            consumerFunctions.add(new ConsumerHandle.ConsumerFunction() {
                @Override
                public void consumer(ConsumerRecords<String, String> records) {
                    //消费实例2
                }
            });

            consumerFunctions.add(new ConsumerHandle.ConsumerFunction() {
                @Override
                public void consumer(ConsumerRecords<String, String> records) {
                    //消费实例3
                }
            });

            List<String> topics = new ArrayList<>();
            topics.add("topic1");

            new ConsumerGroup(props, topics, consumerFunctions).execute();
        }
    }

单KafkaConsumer实例+多worker线程

将消息的获取与消息的处理解耦,把消息的处理放入单独的工作者线程中,同时全局维护一个或若干个consumer实例执行消息获取任务

本例使用全局KafkaConsumer实例执行消息获取,把获取的消息集合交给线程池中的worker线程执行工作,之后worker线程完成处理后上报位移状态,由全局consumer提交位移

    //ConsumerThreadHandler.java

    package com.aim.kafka.client.consumer;

    import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.errors.WakeupException;

    import java.util.*;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;

    public class ConsumerThreadHandler<K, V> {
        private final KafkaConsumer<K, V> consumer;

        private ExecutorService executors;

        private final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();

        public ConsumerThreadHandler(Properties props, Collection topics){
            consumer = new KafkaConsumer<K, V>(props);
            consumer.subscribe(topics, new ConsumerRebalanceListener() {
                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                    consumer.commitSync(offsets);
                }

                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    offsets.clear();
                }
            });
        }

        public void consumer(List<ConsumerHandle.ConsumerFunction> consumerFunctions){
            int threadNumber = consumerFunctions.size();
            executors = new ThreadPoolExecutor(
                    threadNumber,
                    threadNumber,
                    0L,
                    TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue<>(1000),
                    new ThreadPoolExecutor.CallerRunsPolicy()
            );
            try{
                while (true){
                    ConsumerRecords<K, V> records = consumer.poll(1000L);
                    if(!records.isEmpty()){
                        executors.submit(new ConsumerWorker<>(records, offsets, consumerFunctions));
                    }
                    commitOffsets();
                }
            } catch (WakeupException e){

            } finally {
                commitOffsets();
                consumer.close();
            }
        }

        private void commitOffsets(){
            Map<TopicPartition, OffsetAndMetadata> unmodfiedMap;

            synchronized (offsets){
                if(offsets.isEmpty()){
                    return;
                }
                unmodfiedMap = Collections.unmodifiableMap(new HashMap<>(offsets));
                offsets.clear();
            }
            consumer.commitSync(unmodfiedMap);
        }

        public void close(){
            consumer.wakeup();
            executors.shutdown();
        }
    }


    //ConsumerWorker.java

    package com.aim.kafka.client.consumer;

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    import org.apache.kafka.common.TopicPartition;

    import java.util.List;
    import java.util.Map;

    public class ConsumerWorker<K, V> implements Runnable {
        private final ConsumerRecords<K, V> records;

        private final Map<TopicPartition, OffsetAndMetadata> offsets;

        private final List<ConsumerHandle.ConsumerFunction> consumerFunctions;

        public ConsumerWorker(ConsumerRecords<K, V> records, Map<TopicPartition, OffsetAndMetadata> offsets, List<ConsumerHandle.ConsumerFunction> consumerFunctions) {
            this.records = records;
            this.offsets = offsets;
            this.consumerFunctions = consumerFunctions;
        }

        @Override
        public void run() {
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<K, V>> partitionRecords = records.records(partition);
                consumerFunctions.forEach(consumerFunction -> {
                    //消息处理业务逻辑
                    consumerFunction.consumer(records);
                });
                //上报位移信息
                long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                synchronized (offsets) {
                    if (!offsets.containsKey(partition)) {
                        offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
                    } else {
                        long curr = offsets.get(partition).offset();
                        if (curr <= lastOffset + 1) {
                            offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
                        }
                    }
                }
            }
        }
    }


    //MultithreadedTests.java

    package com.aim.kafka;

    import com.aim.kafka.client.consumer.ConsumerHandle;
    import com.aim.kafka.client.consumer.ConsumerThreadHandler;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.junit.runner.RunWith;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;

    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Properties;

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class MultithreadedTests {

        public void consumerThreadHandler() {
            Properties props = new Properties();
            /**
             * 指定一组host:port对,用于创建与Kafka broker服务器的Socket连接,可以指定多组,使用逗号分隔,对于多broker集群,只需配置
             * 部分broker地址即可,consumer启动后可以通过这些机器找到完整的broker列表
             */
            props.put("bootstrap.servers", "localhost:9092");
            /**
             * 指定group名字,能唯一标识一个consumer group,如果不显示指定group.id会抛出InvalidGroupIdException异常,通常为group.id
             * 设置一个有业务意义的名字即可
             */
            props.put("group.id", "order");
            /**
             * 自动提交位移
             */
            props.put("enable.auto.commit", "true");
            /**
             * 位移提交超时时间
             */
            props.put("auto.commit.interval.ms", "1000");
            /**
             * 从最早的消息开始消费
             */
            props.put("auto.offset.reset", "earliest");
            /**
             * 指定消费解序列化操作。consumer从broker端获取的任何消息都是字节数组的格式,因此需要指定解序列化操作才能还原为原本对象,
             * Kafka对绝大部分初始类型提供了解序列化器,consumer支持自定义解序列化器org.apache.kafka.common.serialization.Deserializer
             */
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            /**
             * 对消息体进行解序列化,与key解序列化类似
             */
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

            final ConsumerThreadHandler handler = new ConsumerThreadHandler(props, Arrays.asList("test-topic"));

            List<ConsumerHandle.ConsumerFunction> consumerFunctions = new ArrayList<>();


            consumerFunctions.add(new ConsumerHandle.ConsumerFunction() {
                @Override
                public void consumer(ConsumerRecords records) {
                    //处理逻辑
                }
            });

            consumerFunctions.add(new ConsumerHandle.ConsumerFunction() {
                @Override
                public void consumer(ConsumerRecords records) {
                    //处理逻辑
                }
            });


            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    handler.consumer(consumerFunctions);
                }
            };

            new Thread(runnable).start();

            try {
                Thread.sleep(20000L);
            } catch (InterruptedException e) {
                //异常处理
            }
            handler.close();
        }
    }

两种方法对比

每个线程维护专属KafkaConsumer

  • 优点:实现简单;速度较快,因为无线程间交互开销;方便位移管理;易于维护分区间的消费顺序

  • 缺点:Socket连接开销大;consumer数受限于topic分区数,扩展性差;broker端处理负载高(因为发往broker的请求数多);rebalance可能性增大

全局consumer + 多worker线程

  • 优点:消息获取与处理解耦;可独立扩展consumer数和worker数,伸缩性好

  • 缺点:实现负载;难于维护分区内的消息顺序;处理链路变长,导致位移管理困难;worker线程异常可能导致消费数据丢失

独立consumer

使用Kafka consumer group的形式消费消息,group自动帮用户执行分配分区和rebalance,对于需要有多个consumer共同读取某个topic的需求来说,使用group是非常方便,但有时候用户需要精确控制消费

  • 如果进程自己维护分区的状态,那么它就可以固定消费某些分区而不用担心消费状态丢失的问题
  • 如果进程本身已经是高可用且能够自动重启恢复错误,那么它就不需要让Kafka来帮它完成错误检测和状态恢复

对于以上情形,使用独立consumer(standalone consumer)更为合适,standalone consumer间彼此独立工作互不干扰,任何一个consumer崩溃都不影响其他standalone consumer工作

使用KafkaConsumer.assign方法可以以独立consumer方式消费消息,指定需要消费的分区,如果发生多次assign调用,最后一次assign调用的分配生效,之前的会被覆盖,并且同一个consumer不能混用assign和subscribe

    package com.aim.kafka.client.consumer;

    import org.apache.commons.collections4.CollectionUtils;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.PartitionInfo;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.errors.WakeupException;

    import java.util.ArrayList;
    import java.util.List;
    import java.util.Properties;

    public class StandaloneConsumer<K, V> {
        public void consumer(Properties properties, ConsumerHandle.ConsumerFunction consumerFunction) {
            KafkaConsumer<K, V> consumer = new KafkaConsumer<K, V>(properties);
            List<TopicPartition> partitions = new ArrayList<>();
            List<PartitionInfo> allPartitions = consumer.partitionsFor("test-topic");
            if(CollectionUtils.isNotEmpty(allPartitions)){
                for(PartitionInfo partitionInfo : allPartitions){
                    partitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
                }

                consumer.assign(partitions);
            }

            try {
                while (true){
                    ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
                    consumerFunction.consumer(records);
                    consumer.commitAsync();
                }

            } catch (WakeupException e){
                //异常处理
            } finally {
                consumer.commitAsync();
                consumer.close();
            }
        }
    }
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐