kafka默认情况下,提供的是至少一次的可靠性保障。即broker保障已提交的消息的发送,但是遇上某些意外情况,如:网络抖动,超时等问题,导致Producer没有收到broker返回的数据ack,则Producer会继续重试发送消息,从而导致消息重复发送。

如果我们禁止Producer的失败重试发送功能,消息要么写入成功,要么写入失败,但绝不会重复发送。这样就是最多一次的消息保障模式。但对于消息组件,排除特殊业务场景,我们追求的一定是精确一次的消息保障模式。kafka通过 幂等性(Idempotence)和事务(Transaction) 的机制,提供了这种精确的消息保障。

在之前的旧版本中,Kafka只能支持两种语义:At most once和At least once。而Kafka在 0.11.0.0 版本支持增加了对幂等的支持。幂等是针对生产者角度的特性。幂等可以保证上生产者发送的消息,不会丢失,而且不会重复。

幂等性要解决的问题?

在 0.11.0 之前,Kafka 通过 Producer 端和 Server 端的相关配置可以做到 数据不丢 ,也就是 at least once,但是在一些情况下,可能会导致数据重复,比如:网络请求延迟等导致的重试操作,在发送请求重试时 Server 端并不知道这条请求是否已经处理(没有记录之前的状态信息),所以就会有可能导致数据请求的重复发送,这是 Kafka 自身的机制(异常时请求重试机制)导致的数据重复。

对于大多数应用而言,数据保证不丢是可以满足其需求的,但是对于一些其他的应用场景(比如支付数据等),它们是要求精确计数的,这时候如果上游数据有重复,下游应用只能在消费数据时进行相应的去重操作,应用在去重时,最常用的手段就是根据唯一 id 键做 check 去重。

在这种场景下,因为上游生产导致的数据重复问题,会导致所有有精确计数需求的下游应用都需要做这种复杂的、重复的去重处理。试想一下:如果在发送时,系统就能保证 exactly once,这对下游将是多么大的解脱。这就是幂等性要解决的问题,主要是解决数据重复的问题,正如前面所述,数据重复问题,通用的解决方案就是加唯一 id,然后根据 id 判断数据是否重复,Producer 的幂等性也是这样实现的。

Kafka 是怎么保证幂等性的?

Kafka为了实现幂等性,它在底层设计架构中引入了ProducerID和SequenceNumber

  • ProducerID:在每个新的Producer初始化时,会被分配一个唯一的ProducerID,这个ProducerID对客户端使用者是不可见的。
  • SequenceNumber:对于每个ProducerID,Producer发送数据的每个Topic和Partition都对应一个从0开始单调递增的SequenceNumber值。
    在这里插入图片描述
    当Producer发送消息(x2,y2)给Broker时,Broker接收到消息并将其追加到消息流中。此时,Broker返回Ack信号给Producer时,发生异常导致Producer接收Ack信号失败。对于Producer来说,会触发重试机制,将消息(x2,y2)再次发送,但是,由于引入了幂等性,在每条消息中附带了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber发送给Broker,而之前Broker缓存过之前发送的相同的消息,那么在消息流中的消息就只有一条(x2,y2),不会出现重复发送的情况。

开启幂等性配置

只需要把 Producer 的配置 enable.idempotence 设置为 true 即可

props.put(“enable.idempotence”, ture)
//或者
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)

Kafka幂等性的局限性

开启enable.idempotence后,kafka就会自动帮你做好消息去重的一系列工作。底层具体实现原理很简单,就是用空间换时间的优化思路,即在broker端多存一些字段来标识数据的唯一性。当Producer发送了具有相同字段值的消息后,broker会进行匹配去重,丢弃重复的数据。实际的代码没这么简单,但大致是这么个处理逻辑。

官方的这个幂等实现看似简单高效,但也存在他的局限性。他只能保证单分区上的幂等性,即一个幂等性Producer只能够保证某个topic的一个分区上不出现重复消息,无法实现多分区的幂等。此外,如果Producer重启,也会导致幂等重置。

事务

对于多分区保证幂等的场景,则需要事务特性来处理了。kafka的事务跟我们常见数据库事务概念差不多,也是提供经典的ACID,即原子(Atomicity)、一致性 (Consistency)、隔离性 (Isolation) 和持久性 (Durability)。

事务Producer保证消息写入分区的原子性,即这批消息要么全部写入成功,要么全失败。此外,Producer重启回来后,kafka依然保证它们发送消息的精确一次处理。事务特性的配置也很简单:

和幂等Producer一样,开启enable.idempotence = true设置Producer端参数transctional.id事务Producer的代码稍微也有点不一样,需要调一些事务处理的API。数据的发送需要放在beginTransaction和commitTransaction之间。Consumer端的代码也需要加上isolation.level参数,用以处理事务提交的数据。示例代码:

producer.initTransactions();
try {
     producer.beginTransaction();
     producer.send(record1);
     producer.send(record2);
     producer.commitTransaction();
} catch (KafkaException e) {
     producer.abortTransaction();
}

事务Producer虽然在多分区的数据处理上保证了幂等,但是处理性能上相应的是会有一些下降的。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐