消息发送示例

使用kafka生产者发送一条消息的时候,示例代码,可能如下:

    public static void main(String[] args) throws InterruptedException {
        String server = "localhost:9092";
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");

        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 安全认证配置,如果需要会这样配置,未启用ACL,忽略这几项配置
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
        props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"admin\";");

        KafkaProducer<Integer, String> producer = new KafkaProducer<>(props);

        String topic = "test_topic";
        String message = "hello, kafka";
        producer.send(new ProducerRecord<>(topic, message), (metadata, exception) -> {
            log.info("metadata: {}", metadata);
            if (exception != null) {
                log.error("send exception: ", exception);
            }
        });

        Thread.currentThread().join();
    }

示例代码,仅供参考,下面分析一下消息是如何发送出去的。

发送流程分析

鉴于代码量太大,所以不会贴太多源码分析,尽量使用流程图和文字表达清楚。

查看源码

基本流程

示例代码中,消息发送整体分两个步骤:

  1. 构造生产者实例
  2. 消息发送

 下面对这两个过程分析

创建生产者

创建流程

示例中,创建的关键代码就这一行 :

KafkaProducer<Integer, String> producer = new KafkaProducer<>(props);

主要流程如流程图

  1. 调用KafkaProducer接口构造生产者实例
  2. 将配置的Properties转换为ProducerConfig,kafka生产者的配置属性有很多,如果我们没有配置的属性就会使用默认配置
  3. 初始化生产者实例的各个属性,如上面注释中图片里显示的这些属性
  4. 启动消息发送线程,完成

这里面有两个属性需要本文重点关注,与消息发送有直接关系(比如监控统计类属性,就不算直接关系,因为即使没有,消息也可以发送,就这个意思):

这两个属性:

            // 缓存待发送的消息
            this.accumulator = new RecordAccumulator(logContext,
                    config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                    this.compressionType,
                    lingerMs(config),
                    retryBackoffMs,
                    deliveryTimeoutMs,
                    metrics,
                    PRODUCER_METRIC_GROUP_NAME,
                    time,
                    apiVersions,
                    transactionManager,
                    new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));
            // 进行实际的消息发送运作
            this.sender = newSender(logContext, kafkaClient, this.metadata);

消息发送

生产者-消费者模式发送消息

上面提到了producer有两个属性:accumulator和sender。

kafka producer发送消息并不是同步发送的,而类似于生产者-消费者模式的异步消息发送:

当produer调用send方法,发送消息的时候,只是先把消息缓存到一个队列,由该模式的消费者(另一个线程)来消费消息并执行真正的发送逻辑。这样主要是为了发送的时候尽量是批次的消息发送,而非单条单条消息的发送用来提升发送性能。

prdoucer实例就相当于该模式的生产者,accumulator是其中的缓存队列,sender便是消费者。

sender是在一个异步线程(ioThread)中执行主要逻辑,不停的从accumulator中获取准备发送的消息批次并通过网络发送到目标broker上,基本流程如下:

所以,到这里,也是主要分两步:

  1. producer调用send方法把消息放入accumulator
  2. sender从accumulator拿到消息发送到kafka broker

消息放入accumulator

我们发送消息可能这样写:

        producer.send(new ProducerRecord<>(topic, message), (metadata, exception) -> {
            log.info("metadata: {}", metadata);
            if (exception != null) {
                log.error("send exception: ", exception);
            }
        });

 看一下调用send方法后的执行流程:

消息实际是批量发送,从流程图里可以看到一个批次的消息满足条件才会唤醒sender准备发送,接下来看一下sender实际进行消息发送的流程。

消息批量异步发送

上文说明sender是在调用 new KafkaProducer()构造producer实例的时候,初始化的,代码如下:

            this.sender = newSender(logContext, kafkaClient, this.metadata);
            String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
            this.ioThread.start();

是在一个异步线程(ioThread)内执行相关逻辑,是一个死循环:

    @Override
    public void run() {
        log.debug("Starting Kafka producer I/O thread.");

        // main loop, runs until close is called
        while (running) {
            try {
                // 调用Sender.runOnce()方法
                runOnce();
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }

        log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");

        // 下面执行关闭逻辑,代码忽略
    }

在Sender.runOnce方法里不停的调用发送生产消息的请求:


        long currentTimeMs = time.milliseconds();
        long pollTimeout = sendProducerData(currentTimeMs);
        client.poll(pollTimeout, currentTimeMs);

看这两行代码,是否会奇怪调用sendProducerData方法后,又调用下面这个client.poll方法?

我简单解释下:

  1. kafka的网络通信框架是自己基于java nio封装的一套实现,不是像rocketmq等用的netty那一套
  2. 调用sendProducerData方法只是把要发送的请求准备好,还未进行实际的网络传输
  3. 调用client.poll方法会进行实际io操作,将所有channel待发送数据通过socket发送出去

基本流程大概是这样:

结语

消息的发送过程基本就是这样,主要采用的就是生产者-消费者模式,异步批量发送的形式。

其中处理过程中还是有特别多的细节,不再全部展开来说了。

示例代码中,配置了ACL认证,kafka的认证机制是在连接连接建立的时候做的。如果是默认的话,就是没有认证且明文传输,关键的握手那一块是空实现,这个会放在其实篇单独说明。它不像rocketmq那样,rocketmq是请求的时候每次都会带上相关秘钥进行权限认证

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐