生产者基本操作

启动生产者

Kafka 自带了 一个 kafka-console producer.sh 脚本,通过执行该脚本可 在终端调用 Kafka生产者向 Kafka 发送消息 。该脚本运行时需要 broker-list topic 两个必传参数,分别用来指定 Kafka 的代理地址列 以及消息被发送的目标主题。

kafka-console- producer.sh --broker-list server1:9092 , server-2:9092 , server-3:9092 --topic kafka-action --property parse.key=true

消息Key与消息净荷之间的分隔符默认是以制表符分隔,若希望修改分隔符,则通过配置项key.separator指定。

执行下面命令验证消息是否发送成功:

kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list server 1:9092,server-2:9092,server-3:9092 --topic kafka-action --time -1

该命令用于查看某个主题各分区对应消息偏移量

  • partitions:指定一个或者多个分区,多个分区以逗号分隔,若不指定则默认查看该主题所有分区;
  • time:表示查看在指定时间之前的数据,支持-1(lastest)、-2(earliest)两个时间选项,默认取值-1;
查看消息

Kafka生产消息以二进制的形式存在文件中,为了便于查看消息内容, Kafka 提供了一个查看日志文件的工具类 kafka.tools.DurnpLogSegments 。通过kafka-run-class脚本,可以直接在终端运行该工具类。

kafka-run-class.sh kafka.tools.DumpLogSegments --files /opt/data/kafka-logs/ producer-create - topic-0/00000000000000000000.log

producer程序实例

public class ProducerTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.46.89.46:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.ACKS_CONFIG, "-1");
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 323840);
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3000);

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        for (int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord<>("msk-test",Integer.toString(i),Integer.toString(i)));
            producer.close();
        }
        
    }

}
  • BOOTSTRAP_SERVERS_CONFIG:用于创建向Kafka broker服务器的连接;如果broker段没有显式配置listeners使用IP地址,那么最好该参数也配置成主机名而不是IP地址;
  • KEY_SERIALIZER_CLASS_CONFIG:key序列化,被发送到broker段的任务消息的格式都必须是字节数组,因此消息的各个组件必须首先做序列化,然后才能发送到broker;
  • VALUE_SERIALIZER_CLASS_CONFIG:value序列化;

构造消息实体ProducerRecord对象

发送消息

Kafka producer 发送消息的主方法是 send 方法。虽然 send 方法只有两个简单的方法签名,但其实 producer 在底层完全地实现了异步化发送,并且通过 Java 提供的 Future 同时实现了同步发送和异步发送+回调( Callback )两种发送方式。

  1. 异步发送

    ProducerRecord<String, String> record = new ProducerRecord<>("msk-test", Integer.toString(i), Integer.toString(i));
                    producer.send(record, new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            if (null==e){
                                // 消息发送成功
                            }else {
                                //处理错误处理逻辑
                            }
                        }
                    });
    
  2. 同步发送

    RecordMetadata record = producer.send(new ProducerRecord<>("msk-test", Integer.toString(i), Integer.toString(i))).get();
    

    get 将返回对应的 RecordMetadata 实例(包含了己发送消息的所有元数据信息),包

    括消息发送的 topic 、分区以及该消息在对应分区的位移信息。

对于可重试异常,如果在producer中配置了重试次数,那么只要在规定的重试次数内自行恢复了,则不会出现在exception中。

producer.send(record, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        if (null==e){
                            // 消息发送成功
                        }else {
                            if (e instanceof RetriableException){
                                // 处理可重试异常
                            }else{
                                // 处理不可重试异常
                            }
                        }
                    }
                });

关闭producer

producer参数解析
  • acks参数用于控制producer生产消息的持久性。

    当 producer 发送 条消息给 Kafka 集群时,这条消息会被发送到指定 topic区分

    leader 所在的 broker 上, producer 等待从该 leader broker 返回消息的写入结果(当然并不是无限等待,是有超时时间的)以确定消息被成功提交。这一切完成后 producer 可以继续发送新的消息 Kafka 能够保证的是 consumer 永远不会读取到尚未提交完成的消息一一这和关系型数据库很类似,即在大部分情况下,某个事务的 SQL 查询都不会看到另 个事务中尚未提交的数据。

    producer 端越快地接收到 leader broker 响应,它就能越快地发送下 条消息,即吞吐量也就越大 producer 端的 acks 参数就是用来控制做这件事情的 acks 指定了在给 producer 发送响应前, leader broker 必须要确保己成功写入该消息的副本数 当前 acks 个取值: 0、 1、 all:

    • acks = 0:设置成 表示 producer 完全不理睬 leader broker 端的处理结果。此时,producer 发送消息后立即开启下 条消息的发送,根本不等待 leader broker 端返回结果由于不接收发送结果,因此在这种情况下 producer.send 的回调也就完全失去了作用,即用户无法通过回调机制感知任何发送过程中的失败,所以 acks=0 producer 并不保证消息会被成功发送。但凡事有利就有弊,由于不需要等待响应结果,通常这种设置producer 的吞吐量是最高的。
    • acks all 或者-1 :表示当发送消息时, leader broker 不仅会将消息写入本地日志,同时还会等待 ISR 中所有其他副本都成功写入它们各自的本地日志后,才发送响应结果给producer 。显然当设置 acks=all 时,只要 ISR 至少有一个副本是处于“存活”状态的那么这条消息就肯定不会丢失,因而可以达到最高的消息持久性,但通常这种设置下producer 的吞吐 也是最低的。
    • acks = 1:是0和 all 折中的方案,也是默认的参数值。 producer 发送消息后 leader broker 仅将该消息写入本地日志,然后便发送响应结果 producer ,而无须等待 ISR中其他副本写入该消息。 那么此时只要 leader broker 直存活 Kafka 就能够保证这条消息不丢失。这实际上是一种折中方案,既可以达到适当的消息持久性,同时也保证了 producer 端的吞吐量。
  • buffer.memory

    该参数指定了 producer 端用于缓存消息的缓冲区大小,单位是字节,默认值是 33554432。producer启动时会首先创建一个内存缓冲区用于保存待发送的消息,然后由另一个专属线程负责从缓冲区中读取消息执行真正的发送。

    若producer向缓冲区写消息的速度超过专属IO线程发送消息的速度,那么必然造成该缓冲区空间不断增大,此时producer会停止手头的工作等待IO线程追上来,若一段时间后IO线程还是无法追上producer的速度,那么producer就会抛出异常并期望用户介入。

  • compression.type

    设置producer端是否压缩消息,默认值是none;

  • retries

    该参数表示进行重试的次数,默认值是 ,表示不进行重试。在实际使用过程中,设置重试可以很好地应对那些瞬时错误,因此推荐用户设置该参数为 个大于 的值 只不过在

    retries 的设置时,有两点需要着重注意:

    • 重试可能造成消息的重复发送;
    • 重试可能造成消息的乱序;
  • batch.size

    前面提到过, producer 会将发往同 分区的多条消息封装进 batch中。当 batch 满了的时候, producer 会发送 batch 中的所有消息。不过, producer 并不总是等待

    batch 满了才发送消息,很有可能当 batch 还有很多空闲空间时 producer 就发送该 batch。

    batch.size 参数默认值是 16384 ,即 16 这其实是一个非常保守的数字。 在实际使用过

    程中合理地增加该参数值,通常都会发现 producer 的吞吐量得到了相应的增加。

  • linger.ms

    linger.ms 数就是控制消息发送延时行为的。该参数默认值是 ,表示消息需要被立即

    发送,无须关心 batch 是否己被填满,大多数情况下这是合理的,毕竟我们总是希望消息被尽可能快地发送,不过这样做会拉低 produce 吞吐量,毕竟 produce 发送的每次请求中包含的消息数越 produce 就越能将发送请求的开销摊薄到更多的消息上,从而提升吞吐量。

  • max.request.size

    实际上该参数控制的是produce 端能够发送的最大消息大小,由于请求有一些头部数据结构,因包含 一条消息的请求的大小要比消息本身大,不过姑且把它当作请求的最大尺寸是安全的,如果 producer 要发送尺寸很大的消息 那么这个数就是要被设置 。默认的 1048576 字节太 通常无法满足企业级消息的大小要求。

  • request.timeout.ms

    当produce 发送请求给 broker后,broker 需要在规定的时间范围内将处理结果返还给

    produce 这段时间便是由该 参数控制的,默认是 30 ,这就是说,如果 broker 30 秒内都没有给 produc 发送响应,那么 produ er 就会认为该请求超时了,并在回调函数中显式地抛出TimeoutException 异常交由用户处理默认的 秒对于 般的情况而言是足够的 producer 发送的负载很大 ,超时的情况就很容易碰到,此时就应该适当调整该参数值。

消息分区机制

分区策略

Kafka producer 发送过程中一个很重要的步骤就是要确定将消息发送到指定的 topic 的哪个分区中。 producer 提供了分区策略以及对应的分区器 partitioner 供用户使用 Kafka 发布的默认 partitioner 会尽力确保具有相同 key 的所有消息都会被发送到相同的分区上:若没有为消息指定 key ,则该 partitioner 会选择轮询的方式来确保消息在 topic 的所有分区上均匀分配。

自定义分区

value取模partition,发送到余数对应的分区。

public class AuditPartitioner implements Partitioner {

    @Override
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        int value = Integer.parseInt((String)o1);
        List<PartitionInfo> partitionInfoList = cluster.availablePartitionsForTopic(s);
        int num = partitionInfoList.size();
        return value % num;
    }

    @Override
    public void close() {
        // 清理资源工作
    }

    @Override
    public void configure(Map<String, ?> map) {
        // 初始化操作
    }
}
properties.put(PARTITIONER_CLASS_CONFIG,AuditPartitioner.class);

producer拦截器

对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链。

  • onSend(ProducerRecord)::该方法封装进 KafkaProducer.send 法中,即它运行在用户主线程中。 producer 确保在消息被序列化以计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的 topic 和分区,否则会影响目标分区的计算。
  • onAcknowledgement(RecordMetadata, Exception):该方法会在消息被应答之前或消息发送失败时调用,井且通常都是在 producer 回调逻辑触发之前 onAcknowledgement运行在 producer 1/0 线程中,因此不要在该方法中放入很“重”的逻辑,否则会拖慢producer 的消息发送效率。

interceptor可能运行在多个线程中,因此再具体实现时用户需要确保线程安全。另外,若指定多个interceptor,则producer
将按照指定顺序调用它们,同时把每个interceptor中捕获的异常记录到错误日志而不是向上传递

无消息丢失配置

消息丢失场景:

  • KafkaProducer.send 方法仅仅把消息放入缓冲区中,由一个专属 1/0 线程负责从缓冲区中提取消息井封装进消息 batch 中,然后发送出去。显然,这个过程中存在着数据丢失的窗口:若 1/0 线程发送之前 producer 崩溃,则存储缓冲区中的消息全部丢失了。
  • 发送过程中出现失败重试造成消息乱序;

解决方法——无消息丢失配置

producer配置:

  • block.on.buffer.full/max.block.ms(新版本)=true

使得内存缓冲区被填满时 producer 处于阻塞状态并停止接收新的消息而不是抛出异常;否则 producer 生产速度过快会耗尽缓冲区。

  • acks = all or -1

必须要等到所有 follower 都响应了发送消息才能认为提交成功,这是 producer 端最强程度的持久化保证。

  • retries = Integer.MAX_VALUE

producer 要开启无限重试 用户
不必担心 producer 会重试那些肯定无法恢复的错误,当前 producer 会重试那些可恢复的异常
情况,所以放心地设置一个比较大的值通常能很好地保证消息不丢失。

  • max.in.flight.requests.per.connection= 1

设置该参数为1主要为了是防止topic 同分区下的消息乱序问题。这个参数的实际效果其
实限制了 producer 在单个 broker 连接上能够发送的未响应请求的数量。因此,如果设置成1,
producer 在某个 broker 发送响应之前将无法再给该 broker 发送 PRODUCE 请求

  • 使用带回调机制的 send 发送消息,即 KafkaProducer.send(record callback)

不要使用 KafkaProducer 中单参数的 send方法,因为该 send 调用仅仅是把消息发出而不会理会消息发送的结果。如果消息发送失败,该方法不会得到任何通知,故可能造成数据的丢失
实际环境中一定要使用带回调机制的 send 版本,即 KafkaProducer.send(record, callback

  • Callback 逻辑中显式地 即关闭 producer ,使用 close(O)

在Callback 的失败处理逻辑中显式调用 KafkaProducer.close(O) 这样做的目的是为了处理
消息的乱序问题 若不使用 close(O),默认情况下 producer 会被允许将未完成的消息发送出去,
这样就有可能造成消息乱序

broker端配置:

  • unclean.leader.election.enable= false

关闭unclean leader选举,即不允许非ISR中的副本被选举为leader,从而避免broker端因水位阶段而造成的消息丢失。

  • replication.factor >= 3

一定要使用多个副本来保存分区的消息。

  • min.insync.replicas > 1

用于控制某条消息至少被写入到ISR中的多少个副本才算成功,设置成大于1是为了提升producer端发送语义的持久性。只有在producer端acks被设置成
all或者-1时,这个参数才有意义。

  • replication.factor > min.insync.replicas

若两者相等,那么只要有一个副本挂掉,分区就无法正常工作,虽然有很高的持久性但是可用性被极大的降低了。

  • enable.auto.commit= false

消息压缩

kafka支持的压缩算法

kafka支持3种压缩算法:GZIP、Snappy和LZ4

properties.put(COMPRESSION_TYPE_CONFIG,"snappy");

消费者基本操作

消费者组

消费者使用一个消费者组名(即 group.id )来标记自己, topic 的每条消息都只会被发送到每个订阅它的消费者组的一个消费者实例上。

  • 一个 consumer group 可能有若干个 consumer实例( group 只有一个实例也是允许的);
  • 对于同一个 group 而言, topic 的每条消息只能被发送到 group 下的一个 consumer 实例上;
  • topic 消息可以被发送到多个group;
位移(offset)

每个 consumer 实例都会为它消费的分区维护属于自己的位置信息来记录当前消费了多少条消息,让 consumer group 保存 offset ,那么只需要简单地保存长整型数据就可以了,同时 Kafka consumer 还引入了检查点机制( checkpointing )定期对offset 进行持久化,从而简化了应答机制的实现

位移提交

consumer 客户端需要定期地向 Kafka 集群汇报自己消费数据的进度,这 过程被称为位移提交( offset commit )。位移提交这件 情对于 consumer 非常重要,它不仅表征了consumer 端的消费进度,同时也直接决定了 consumer 端的消费语义保证。

消费消息

Kafka的消费者以Pull的方式获取消息,同时Kafka采用了消费组的模式,每个消费者都属于某一个消费组,在创建消费者时,若不指定消费者的groupId,则该消费者属于默认消费组。消费组是一个全局的概念,要确保该值在Kafka集群中唯一。

同一个消费组下的各消费者在消费消息时是互斥的,也就是说,对于一条消息而言,就同个消费组下的消费者来讲,只能被同组下的某一个消费者消费,但不同消费组的消费者能消费同一条消息,正因如此,我们很方便通过消费组来实现消息的单播与广播。这里所说的单播与广播是相对消费者消费消息而言的。

启动一个消费者:

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-action --from-beginning

执行该脚本关键参数是bootstrap-server,因为以这种方式连接Kafka时才会调用新版本的KafkaConsumer,若通过参数ZooKeeper方式启动则调用的是老版本的消费者。

查看消费组名信息:

kafka-consumer-groups.sh --bootstrap-server server-1:9092,server-2:9092, server-3:9092 --list --new-consumer

查看topic各分区的偏移量信息:

kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list server 1:9092,server-2:9092,server-3:9092 --topic kafka-acton -time -1

消费多主题

kafka-console-consumer.sh --bootstrap-server server-1:9092,server-2:9092,server-3:9092 --new-consumer --consumer-property group.id=consume-multi-topic --whitelist "kafka-actionJproducer-perf-test"
单播与多播
  1. 单播

    一条消息只能被某一个消费者消费的模式称为单播,要实现消息单播只要让这些消费者属于同一消费组即可。

  2. 多播

    一条消息能够被多个消费者(同一个消费组)消费的模式称为多播。如果要实现广播则让这些消费者属于不同消费者即可。

查看消费偏移量
  1. ConsumerOffsetChecker用法

    kafka-consumer-offset-checker.sh --zookeeper server-1:2181,
    server2:2181, server-3:2181 --topic kafka-action --group consumer- offset-test --broker-info
    

    其中参数 zookeeper和 group 是必传参数,支持同时查看多个主题,多个主题之间以逗号分隔,不指定主题 ,则默认查看该消 费组消费的所有主题, broker-info 是可选参数,打印出各代理信息。

  2. ConsumerGroupCommand用法,支持一下3种类型的操作

    • list:返回与启动方式对应的所有消费组;
    • describe:查看某个消费组当前的消费情况;
    • delete:删除消费组;

构建consumer

consumer程序实例
public class ConsumerTest {
    public static void main(String[] args) {
        String topicName = "msk-test";
        String groupID = "group-test";

        Properties properties = new Properties();
        properties.put(BOOTSTRAP_SERVERS_CONFIG, "10.252.11.146:9092");
        properties.put(GROUP_ID_CONFIG, groupID);
        properties.put(ENABLE_AUTO_COMMIT_CONFIG, true);
        properties.put(AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
        properties.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        consumer.subscribe(Arrays.asList(topicName));
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Integer.MAX_VALUE);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(String.format("partition = %d,offset = %d, key = %s, value = %s", record.partition(), record.offset(), record.key(), record.value()));
                }
            }
        } finally {
            consumer.close();
        }

    }
}
consumer主要参数
  • session.timeout.ms

consumer group 检测组内成员发送崩溃的事件,当某个group成员突然崩溃,管理group的Kafka组件,即消费者组协调者需要session.timeout.ms
的时间才能感知这个崩溃。
该参数还有另一个含义,consumer消息处理逻辑的最大时间,如果consumer两次poll之间的间隔超过这个参数所设置的阈值,那么coordinator就会认为这个
consumer已经追不上组内其他成员的消费速度了,因此会将该consumer实例踢出组,该consumer负责的分区也会被分配给其他consumer。
在最好的情况下,这会导致不必要的 rebalance ,因为 consumer 需要重新加入 group 更糟的是,
对于那些在被踢出 group 后处理的消息, consumer 都无法提交位移一一这就意味着这些消息在
rebalance 之后会被重新消费一遍。如果一条消息或一组消息总是需要花费很长的时间处理,那么
consumer 甚至无法执行任何消费,除非用户重新调整参数。
0.10.1.0版本之后,对该参数的含义进行了拆分,明确为"coordinator检测失败的时间",所以在实际使用中,用户可以为该参数设置一个比较小的值,让coordinator能够更快地检测consumer崩溃的情况,从而更快
的开启rebalance,避免造成跟大的消费之后,目前默认值是10秒。

  • max.poll.interval.ms

"consumer处理逻辑最大时间"含义被剥离出来了,该参数就是用于设置消息处理逻辑的最大时间。通过将该参数设置成实际的逻辑处理时间
再结合较低的session.timeout.ms参数值,consumer group既实现了快速的consumer崩溃检测,也保证了复杂的事件处理
逻辑不会造成不必要的rebalance。

  • auto.offet.reset

指定了无位移信息或者位移越界,即consumer要消费的信息的位移不在当前消息日志的合理区间范围时Kafka的应对策略。

earliest:指定从最早位移开始消费;
lastest:指定从最新出位移开始消费;
none:指定如果未发现位移信息或者位移越界则抛出异常;
  • enable.auto.commit

该参数指定consumer是否自动提交位移

  • fetch.max.bytes

指定consumer段单次获取数据的最大字节数,若实际业务消息很大,则必须要设置该参数为一个很大的值
,否则consumer将无法消费这些信息。

  • max.poll.records

该参数控制单次poll调用返回的最大消息数

  • hearbeat.interval.ms

如果需要开启新一轮rebalance,consumer group 其他成员如何得知?
当coordinator决定开启新一轮rebalance时,它会将这个决定以REBALANCE_IN_PROGRESS
异常形式塞进consumer心跳请求的response中,其他成员拿到response后才能知道需要重新加入group
,显然这个过程越快越好,而hearbeat.interval.ms就是用来做这件事情的。

注意:该值一定要小于session.timeout.ms,如果consumer在session.timeout.ms这段时间内都不
发送心跳,coordinator就会认为它已经dead,因此也就没有必要让它知道coordinator的决定了。

  • connection.max.idle.ms

Kafka定期关闭空闲socket连接的时间

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐