目录

kafka生产者

生产者消息发送流程

发送原理

生产者重要参数列表

异步发送API

普通异步发送

带回调函数的异步发送

同步发送API

生产者分区

分区好处

生产者发送消息的分区策略

自定义分区器

生产经验—提高生产者的吞吐量

生产经验—数据可靠性

生产经验—数据去重

数据传递语义

幂等性

生产者事务

生产经验—数据有序

生产经验—数据乱序


kafka生产者

生产者消息发送流程

发送原理

在消息发送的过程中,涉及到了两个线程——main线程和Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。

batch.size:只有数据积累到batch.size之后,sender才会发送数据。默认16k。

linger.ms:如果数据迟迟未达到batch.size,sender等待linger.ms设置的时间到了之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。

应答acks:

0:生产者发送过来的数据,不需要等数据落盘应答。 

1:生产者发送过来的数据,Leader收到数据后应答。

-1(all):生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答。-1和all等价。

生产者重要参数列表

参数名称描述
bootstrap.servers

生产者连接集群所需的broker地址清单。例如:hadoop01:9092,hadoop02:9092,hadoop03:9092,可以设置1个或者多个,中间用逗号隔开。注意,这里并非需要所有的broker地址,因为生产者从给定的broker里查找到其他broker信息。

key.serializer和value.serializer

指定发送消息的key和value的序列化类型。(注:要写全列名)

buffer.memory

RecordAccumulator缓冲区总大小,默认32M。

batch.size

缓冲区一批数据的最大值,默认16k。适当增加该值可以提高吞吐量,但是如果该值设置太大会导致数据传输延迟增加。

linger.ms

如果数据迟迟未达到batch.size,sender等待linger.time设置的时间到了就会发送数据。单位ms,默认值是0ms:表示没有延迟。

生产环境建议该值大小为5-100ms之间。

acks

0:生产者发送过来的数据,不需要等数据落盘应答。

1:生产者发送过来的数据,Leader收到数据后应答。

-1(all):生产者发送过来的数据,Leader和isr队列里面的所有节点收齐数据后应答。默认值是-1,-1和all是等价的。

max.in.flight.requests.per.connection

允许最多没有返回ack的次数,默认为5,开启幂等性要保证该值是1-5的数字。

retry.backoff.ms

两次重试之间的时间间隔,默认是100ms。

enable.idempotence

是否开启幂等性,默认true,开启幂等性。

compression.type

生产者发送的所有数据的压缩方式。默认是none,也就是不压缩。

支持压缩类型:none、gzip、snappy、lz4和zstd。

retries

当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是int最大值:2147483647。

如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。

异步发送API

普通异步发送

(1)需求:创建kafka生产者,采用异步发送的方式发送到kafka broker。

(2)代码编写:

1)创建maven工程

2)导入依赖

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

3)创建包

4)编写API代码

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class CustomProducer {
    public static void main(String[] args) {
        /**
         编写不带回调函数的API代码
         */
        //1.创建kafka生产者的配置对象
        Properties properties = new Properties();
        //2.给kafka配置对象添加配置信息:bootstrap.server
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092");
        //key,value序列化:key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        //3.创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        //4.调用send方法,发送信息
        for (int i = 0;i < 5;i++) {
            kafkaProducer.send(new ProducerRecord<>("first","hello kafka" + i));
        }
        //5.关闭资源
        kafkaProducer.close();
    }
}

(3)测试

1)开启kafka

bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic first

2)执行idea的代码,查看kafka是否接收到消息

带回调函数的异步发送

回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果Exceptionnull说明消息发送成功,如果Exception不为null说明消息发送失败。

注:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

(1)代码编写

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducerCallBack {
    public static void main(String[] args) throws InterruptedException {
        /**
         * 编写回调函数异步发送API代码
         */
        //1.创建kafka生产者环境
        Properties properties = new Properties();
        //2.给kafka配置对象添加配置信息:bootstrap.server
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092");
        //key.value序列化:key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        //3.创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        //4.调用send方法,发送信息
        for (int a = 0;a < 5; a++) {
            kafkaProducer.send(new ProducerRecord<>("first", "hello mykafka" + a), new Callback() {
                //在kafkaProducer接收到ack时调用,异步调用
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.println("主题:"+ recordMetadata.topic() + "->"+"分区:"+recordMetadata.partition());
                    } else {
                        //出现异常打印
                        e.printStackTrace();
                    }
                }
            });
            //延迟一会会看到数据发往不同的分区
            Thread.sleep(2);
        }
        //5.关闭环境
        kafkaProducer.close();
    }
}

(2)测试

1)开启消费者

bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic first

2)执行idea的代码,查看控制台中是否有接收到回调消息

3)查看kafka是否接收到消息

同步发送API

只需在异步发送的基础上再调用一下get()方法即可。

(1)代码编写

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CustomProducerS {
    public static void main(String[] args) throws ExecutionException,InterruptedException {
        /**
         * 同步发送API
         */
        //1.创建kafka生产者的配置环境
        Properties properties = new Properties();
        //2.给kafka配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092");
        //key,value序列化:key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        //3.创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        //4.调用send方法,发送消息
        for (int i = 0; i < 8; i++) {
            //异步发送(默认)
            //kafkaProducer.send(new ProducerRecord<>("first","hello!"+i));
            //同步发送
            kafkaProducer.send(new ProducerRecord<>("first","kafka"+i)).get();
        }
        //5.关闭资源
        kafkaProducer.close();
    }
}

(2)测试

1)开启消费者

bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic first

2)执行idea的代码,查看是否有接收到消息

生产者分区

分区好处

(1)便于合理使用存储资源:每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。

(2)提高并行度:生产者可以以分区为单位发送数据,消费者可以以分区为单位进行消费数据。

生产者发送消息的分区策略

(1)默认分区器:DefaultPartitioner

1)指明partition的情况下,直接将指明的值作为partition值;

例如partition=0,所有数据写入分区0。

2)没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值;

例如:key1的hash值=5, key2的hash值=6 ,topic的partition数=2,那

么key1对应的value1写入1号分区,key2对应的value2写入0号分区。

3)既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器)随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)。

例如:第一次随机选择0号分区,等0号分区当前批次满了(默认16k)或者linger.ms设置的时间到, Kafka再随机一个分区进行使用(如果还是0会继续随机)。

(2)案例一

将数据发送指定partition,如:将数据指定发送分区0中。

1)代码

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CustomProducerPartitions {
    public static void main(String[] args) throws ExecutionException,InterruptedException{
        /**
         * 将数据发送到指定分区
         * 将所有数据发送到分区0中
         */
        //1.创建kafka生产者的配置对象
        Properties properties = new Properties();
        //2.给kafka配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092");
        //key,value序列化:key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        //3.创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        //4.调用send方式,方式消息
        for (int i = 0; i < 5; i++) {
            //指定数据发送到1分区
            kafkaProducer.send(new ProducerRecord<>("first", 0, "", "hi spark" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.println("主题:" + recordMetadata.topic() + "->" + "分区:" + recordMetadata.partition());
                    } else {
                        e.printStackTrace();
                    }
                }
            });
        }
        //5.关闭环境
        kafkaProducer.close();
    }
}

2)测试

开启kafka消费者

bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic first

执行idea中代码,观察控制台和kafka的消息

控制台

kafka

(3)案例二

没有指定partition值但是有key的情况,将key的hash值与topic的partition数进行取余得到partition值

1)代码

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducerPartitions2 {
    public static void main(String[] args) {
        /**
         * 没有指定partition值但是有key的情况
         * 将key的hash值与topic的partition数进行取余得到partition值
         */
        //1.创建kafka生产者的配置对象
        Properties properties = new Properties();
        //2.给kafka配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092");
        //key,value序列化:key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        //3.创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        //4.调用send方式,方式消息
        for (int i = 0; i < 5; i++) {
            //
            kafkaProducer.send(new ProducerRecord<>("first", "a","world" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.println("主题:" + recordMetadata.topic() + "->" + "分区:" + recordMetadata.partition());
                    } else {
                        e.printStackTrace();
                    }
                }
            });
        }
        //5.关闭环境
        kafkaProducer.close();
    }
}

2)测试

开启kafka消费者

bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic first

key=a        查看控制台结果

key=b        查看控制台结果

key=c        查看控制台结果

 key=f        查看控制台结果

自定义分区器

根据需求可以自己重新实现分区器

(1)需求:实现一个分区器,发送的数据出现jeffry就发送到分区1,否则发送到分区2.

(2)实现:

1)代码

partition:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.Cluster;

import java.util.Map;
/**
 * 1.实现接口partitioner
 * 2.实现三个方法:partition,close,configure
 * 3.编写partition方法,返回分区号
 */
public class MyPartition implements Partitioner {
    /**
     *
     * @param topic     主题
     * @param key       消息的key
     * @param keyBytes  消息的key序列化后的字节数组
     * @param value     消息的value
     * @param valueBytes 消息的value序列化后的字节数组
     * @param cluster    集群元数据可以查看分区信息
     * @return
     */

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //获取消息
        String msgValue = value.toString();
        //创建partition
        int partition;
        //判断消息是否包含jeffry
        if (msgValue.contains("jeffry")) {
            partition = 1;
        } else {
            partition = 2;
        }

        //返回分区号
        return partition;
    }

    //关闭资源
    @Override
    public void close() {

    }

    //配置方法
    @Override
    public void configure(Map<String, ?> configs) {

    }
}

测试自定义partition:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducerMyPartitions {
    public static void main(String[] args) {
        /**
         * 测试自定义partition
         */
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.zj.kafka.MyPartition");
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        for (int i = 0;i < 5;i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "jeffry" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.println("主题:" + recordMetadata.topic() + "->" + "分区:" + recordMetadata.partition());
                    } else {
                        e.printStackTrace();
                    }
                }
            });
        }
        kafkaProducer.close();
    }
}

2)测试

开启kafka消费者

bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic first

在idea启动执行程序

查看控制台的信息

 查看kafka的信息

 

生产经验—提高生产者的吞吐量

batch.size:批次大小,默认16k;

linger.ms:等待时间,修改为5-100ms;

compression.type:压缩形式为snappy;

RecordAccumulator:缓冲区大小,修改为64m;

(1)代码

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class Parameters {
    public static void main(String[] args) {
        //创建kafka生产者配置对象
        Properties properties = new Properties();
        //给kafka配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092");
        //key,value序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        //batch.size:批次大小,默认16K
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
        //linger.ms:等待时间,默认0
        properties.put(ProducerConfig.LINGER_MS_CONFIG,1);
        //RecordAccumulator:缓冲区大小,默认32M buffer。memory
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
        //compression.type:压缩,默认none,可以配置gzip、snappy、lzo、zstd
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");

        //创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        //调用send方法,发送信息
        for (int i = 0;i < 5;i++) {
            kafkaProducer.send(new ProducerRecord<>("first","jeffryOne" + i));
        }
        //关闭环境
        kafkaProducer.close();
    }
}

(2)测试

1)开启kafka消费者

bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic first

2)在idea中启动执行程序

观察kafka是否接收到消息

生产经验—数据可靠性

(1)ack应答原理

 

数据可靠性分析:

如果分区副本设置为1个,或者ISR里应答的最小副本数量设置为1( min.insync.replicas默认为1,和ack=1的效果是一样的,仍然有丢数的风险(leader:0,isr:0)。

数据完全可靠条件 = ACK级别为-1+分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

可靠性总结:

acks=0,生产者发送过来数据就不管了,可靠性差,效率高;

acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;

acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低;

在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1一般用于传输和钱相关的数据,对可靠性要求比较高的场景。

数据重复分析:

acks: -1(all):生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答。

(2)代码编写

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducerAck {
    public static void main(String[] args) {
        //创建kafka生产者的配置对象
        Properties properties = new Properties();
        //给kafka配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092");
        //key,value序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        //设置acks
        properties.put(ProducerConfig.ACKS_CONFIG,"all");
        //重试次数retries。默认是int最大的值:2147483647
        properties.put(ProducerConfig.RETRIES_CONFIG,5);

        //创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        //调用send方法,发送信息
        for (int i = 0;i < 5;i++) {
            kafkaProducer.send(new ProducerRecord<>("first","jeffry" + i));
        }
        //关闭环境
        kafkaProducer.close();
    }
}

(3)测试

1)开启kafka消费者

bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic first

2)在idea中启动执行程序,查看kafka情况

生产经验—数据去重

数据传递语义

(1)至少一次(At Least Once)= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2;

At Least Once可以保证数据不丢失,但是不能保证数据不重复。

(2)最多一次(At Most Once)= ACK级别设置为0;

At Most Once可以保证数据不重复,但是不能保证数据不丢失。

(3)精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。

幂等性

(1)幂等性原理

幂等性:指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。

精确一次 = 幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR最小副本数>=2)。

判断重复数据的标准:具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker只会持久化一条。

PID是Kafka每次重启都会分配一个新的;

Partition表示分区号;

Sequence Number是单调自增的。

幂等性只能保证在单分区单会话内不重复。

(2)如何使用幂等性

开启enable.idempotence默认为true,false关闭。

生产者事务

(1)kafka事务原理

注:开启事务必须开启幂等性。 

 (2)kafka事务的5个API

//1初始化事务
void initTransactions();
//2开启事务
void beginTransaction() throws ProducerFencedException;
//3在事务内提交已经消费的偏移量
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
       String consumerGroupId) throws ProducerFencedException;
//4提交事务
void commitTransaction() throws ProducerFencedException;
//5放弃事务
void abortTransaction() throws ProducerFencedException;

(3)单个producer,使用事务保证消息的仅一次发送

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class Transaction {
    public static void main(String[] args) throws InterruptedException {
        //1.创建kafka生产者的配置对象
        Properties properties = new Properties();
        //2.给kafka配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092");
        //key,value序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        //设置事务id(必须),事务id可以任意起名
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactionis-0");

        //3.创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        //初始化事务
        kafkaProducer.initTransactions();
        //开启事务
        kafkaProducer.beginTransaction();
        try {
            //4.调用send方法,发送信息
            for (int i = 0;i < 5;i++) {
                //发送消息
                kafkaProducer.send(new ProducerRecord<>("first","tom" + i));
            }
            int i = 1/0;
            //提交事务
            kafkaProducer.commitTransaction();
        } catch (Exception e) {
            //中止事务
            kafkaProducer.abortTransaction();
        } finally {
            //5.关闭环境
            kafkaProducer.close();
        }
    }
}

生产经验—数据有序

单分区内数据是有序的;

多发区内数据是无序的;

生产经验—数据乱序

 (1)kafka在1.x版本之前保证数据单分区有序,条件如下:  

 max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等性)。

(2)kafka在1.x及以后版本保证数据单分区有序,条件如下:

1)开启幂等性

max.in.flight.requests.per.connection需要设置小于等于5。

2)未开启幂等性

max.in.flight.requests.per.connection需要设置为1。

原因说明:因为在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据,故无论如何,都可以保证最近5个request的数据都是有序的。    

如果开启了幂等性且缓存的请求个数小于5个。会在服务端重新排序。

本文为学习笔记!!!

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐