在说这个Kafka同步发送和异步发送之前我们首先要了解一个事情,那就是这个同步发送和异步发送是在什么时候发生的?

所谓的同步和异步就是对于用户线程来讲的,发送线程只有异步。kafka默认的模式是异步发送

 同步模式

同步就是逐条发送。用户线程选择同步,效果是逐条发送,因为请求队列InFlightRequest中永远最多有一条数据。异步+设置 后台线程的异步发送参数:max.in.flight.requests.per.connection=1 & batch.size=1,效果也是逐条发送。一定是逐条发送的,第一条响应到达后,才会请求第二条

异步模式

异步就是批量发送。如果设置成异步的模式,可以运行生产者以batch的形式push数据,这样会极大的提高broker的性能,但是这样会增加丢失数据的风险。

异步方式,可以发送一条,也可以批量发送多条,特性是不需等第一次(注意这里单位是次,因为单次可以是单条,也可以是批量数据)响应,就立即发送第二次

PropertyDefaultDescription
queue.buffering.max.ms5000启用异步模式时,producer缓存消息的时间。比如我们设置成1000时,它会缓存1s的数据再一次发送出去,这样可以极大的增加broker吞吐量,但也会造成时效性的降低。
queue.buffering.max.messages10000启用异步模式时,producer缓存队列里最大缓存的消息数量,如果超过这个值,producer就会阻塞或者丢掉消息。
queue.enqueue.timeout.ms-1当达到上面参数时producer会阻塞等待的时间。如果设置为0,buffer队列满时producer不会阻塞,消息直接被丢掉;若设置为-1,producer会被阻塞,不会丢消息。
batch.num.messages200启用异步模式时,一个batch缓存的消息数量。达到这个数值时,producer才会发送消息。(每次批量发送的数量)

 同步和异步指client(producer)是否收到leader给的ack后才发下一条(对于异步就是同一批),acks = 0, -1和1是指leader节点和follower节点数据同步的方式,可靠性机制,是保证数据能成功备份到其他节点的机制,二者是独立关系。说简单点就是(ack就代表消息发送成功失败与否,acks的配置代表是否写入磁盘)

其他细节

 发送消息会默认重试三次,每次间隔100ms。

发送的消息会先进入到本地缓冲区(32mb),kakfa会跑一个线程,该线程去缓冲区中取16k的数据,发送到kafka,如果到 10 毫秒数据没取满16k,也会发送一次。异步的时候假如设置了缓存消息数量为200,但是一直没有200条数据,那么不可能会一直等下去,就会取16kb大小的数据,直接发,不够16kb也会发。

java代码同步和异步

同步

生产者同步发消息,在收到kafka的ack告知发送成功之前一直处于阻塞状态

//等待消息发送成功的同步阻塞方法
RecordMetadata metadata = producer.send(producerRecord).get();
System.out.println("同步方式发送消息结果:" + "topic-" +metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" +metadata.offset());

异步

生产者发消息,发送完之后不用等待broker给回复,直接执行下面的业务逻辑。可以提供回调方法,让broker异步的调用callback,告知生产者,消息发送的结果。

//要发送 5 条消息
Order order = new Order((long) i, i);
//指定发送分区
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, 0 , order.getOrderId().toString(),JSON.toJSONString(order));
//异步回调方式发送消息
producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
    System.err.println("发送消息失败:" +
    exception.getStackTrace());
}
if (metadata != null) {
System.out.println("异步方式发送消息结果:" + "topic-" +metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" + metadata.offset());
         }
    }
});

ack的回复有三种模式。

  • ( 1 )acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。这里要注意这个回复,回复的是这条消息是否写入磁盘,设置为0,就代表只要消息发出去了,就认为消息发送成功,就会继续发。
  • ( 2 )acks=1: 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下一条消息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。
  • ( 3 )acks=-1或all: 需要等待 min.insync.replicas(默认为 1 ,推荐配置大于等于2) 这个参数配置的副本个数都成功写入日志,这种策略会保证只要有一个副本存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。

一般配置

对于sync的发送方式:

producer.type=sync
request.required.acks=1

对于async的发送方式:

producer.type=async
request.required.acks=1
queue.buffering.max.ms=5000
queue.buffering.max.messages=10000
queue.enqueue.timeout.ms = -1
batch.num.messages=200

对于oneway的发送发送:

oneway是只顾消息发出去而不管死活,消息可靠性最低,但是低延迟、高吞吐,这种对于某些完全对可靠性没有要求的场景还是适用的,即request.required.acks设置为0。

oneway的效果也是异步的。因此,设置同步和异步分时候,要综合考虑。

producer.type=async           '既然都已经设置ack=0忽略高可靠性了,也就没必要设置为同步了'
request.required.acks=0

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐