数据不丢不漏、不重不错

一、不丢(生产写入消息不丢失)

数据组织形式: 首先,从数据组织形式来说,kafka有三层形式,kafka有多个主题(topic),每个主题有多个分区,分区分为主分区和副本分区,每个分区又有多条消息。而每个分区可以分布到不同的节点broker上,这样一来,从服务端来说,分区可以实现高伸缩性,以及负载均衡,动态调节的能力。

在这里插入图片描述

ISR(In-Sync Replicas)同步副本机制: 而ISR 是一个副本的列表,里面存储的都是能跟leader主分区数据一致的副本分区,确定一个副本在ISR列表中有以下两个条件:

# 默认10000 即 10秒, 若follower10s没有和leader通信,则踢出ISR
replica.lag.time.max.ms  
# 允许 follower 副本落后 leader 副本的消息数量,超过这个数量后,follower 会被踢出 ISR
replica.lag.max.messages 

生产者端的ACKS配置

  • 0:不会等待server端的确认,消息会立刻被加入到socket缓冲区并被认为已经发送,不能保证server已经收到这条消息,而且retry配置也不会生效(因为client端不知道是否失败)发送完每条消息后返回的offset总是-1。这意味如果leader宕机了,则会丢失数据

  • 1:只有leader会把消息写到自己本地的日志中,不会等待其他followers的响应,在这种情况下,leader确认收到消息后,其他followers还没有同步消息前,如果leader宕机,则会丢失数据

  • ALL/-1:producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。

    但是这样也不能保证数据不丢失

    • 当ISR中只有leader时,这样就变成了acks=1的情况
    • 当server的配置项min.insync.replicas配置的是1时

生产者重试参数retries: 该参数的设置已经在kafka 2.4版本中默认设置为Integer.MAX_VALUE。retries参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待100ms ,可以通过retry.backoff.ms 参数来配置时间间隔。

min.insync.replicas(最小同步副本): 当生产者的request.required.acks配置项配置的是all或者-1时,此配置项才有意义。min.insync.replicas指定了一个写操作被认为是成功时最小的副本确认数,如果不能满足这个条件producer将会触发异常(either NotEnoughReplicas or NotEnoughReplicasAfterAppend)。当这两个配置一块使用时,跟够提供更好的数据可靠性和持久性,典型的场景是3副本的topic,设置min.insync.replicas值为2生产者设置acksall这能够保证生产者会收到异常当大部分副本没有收到写操作。

如果要让写入 Kafka 的数据不丢失,需要保证如下几点:

  • 每个 Partition 都至少得有 1 个 Follower 在 ISR 列表里,跟上了 Leader 的数据同步。
  • 每次写入数据的时候,都要求至少写入 Partition Leader 成功,同时还有至少一个 ISR 里的 Follower 也写入成功,才算这个写入是成功了。
  • 如果不满足上述两个条件,那就一直写入失败,让生产系统不停的尝试重试,直到满足上述两个条件,然后才能认为写入成功。
  • 按照上述思路去配置相应的参数,才能保证写入 Kafka 的数据不会丢失。
  • 一旦 Leader Partition 宕机了,就会选举其他的 Follower Partition 作为新的 Leader Partition 对外提供读写服务。

分析一下上面几点要求

第一条,必须要求至少一个 Follower 在 ISR 列表里。

那必须的啊,要是 Leader 没有 Follower 了,或者是 Follower 都没法及时同步 Leader 数据,那么这个事儿肯定就没法弄下去了。

第二条,每次写入数据的时候,要求 Leader 写入成功以外,至少一个 ISR 里的 Follower 也写成功。

大家看下面的图,这个要求就是保证说,每次写数据,必须是 Leader 和 Follower 都写成功了,才能算是写成功,保证一条数据必须有两个以上的副本。这个时候万一 Leader 宕机,就可以切换到那个 Follower 上去,那么 Follower 上是有刚写入的数据的,此时数据就不会丢失了。

img

第三条,假如现在 Leader 没有 Follower 了,或者是刚写入 Leader,Leader 立马就宕机,还没来得及同步给 Follower。在这种情况下,写入就会失败,然后你就让生产者不停的重试,直到 Kafka 恢复正常满足上述条件,才能继续写入。这样就可以让写入 Kafka 的数据不丢失。


二、消息不漏(消费消息不漏掉)

consumer group是kafka提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者或消费者实例(consumer instance),它们共享一个公共的ID,即group ID。组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。当然,每个分区只能由同一个消费组内的一个consumer来消费。对于当前消费组来说,topic中每条数据只要被消费组内任何一个消费者消费一次,那么这条数据就可以认定被当前消费组消费成功。不要让消费者的数量多于partition的数量,此时多余的消费者会空闲。此外,Kafka还允许多个应用程序从同一个Topic读取所有的消息,此时只要保证每个应用程序有自己的消费者组即可。

消费者rebalance: rebalance本质上是一种协议,规定了一个consumer group下的所有consumer如何达成一致来分配订阅topic的每个分区。比如某个group下有20个consumer,它订阅了一个具有100个分区的topic。正常情况下,Kafka平均会为每个consumer分配5个分区。这个分配的过程就叫rebalance。消费者rebalance的触发条件有三种:

  • 组成员发生变更(新consumer加入组、已有consumer主动离开组或已有consumer崩溃
  • 订阅主题数发生变更——这当然是可能的,如果你使用了正则表达式的方式进行订阅,那么新建匹配正则表达式的topic就会触发rebalance。
  • 订阅主题的分区数发生变更。

消费者组内分区分配:group下的所有consumer都会协调在一起共同参与分配,这是如何完成的?Kafka新版本consumer默认提供了两种分配策略:range和round-robin。当然Kafka采用了可插拔式的分配策略,你可以创建自己的分配器以实现不同的分配策略。默认情况下,该参数的值为RangeAssignor,RangeAssignor在分配partition的时候,它首先会对Consumer进行排序,排序的依据是字典序,然后依次分配分区。它的目的是尽量保证将分区平均分配给消费者。

消费者offset提交机制:kafka有消息offset的概念,当每个消息被写进去后,都有一个offset,代表它的序号,然后consumer消费该数据之后,隔一段时间,会把自己消费过的消息的offset提交一下,代表我已经消费过了。下次我要是重启,就会继续从上次消费到的offset来继续消费。

  • 自动提交offset: 设置enable.auto.commit=true,更新的频率根据参数【auto.commit.interval.ms】(默认为5s)来定。这种方式也被称为【at most once】,fetch到消息后就可以更新offset,无论是否消费成功。将enable.auto.commit设置为true,那么消费者会在poll方法调用后每隔五秒(由auto.commit.interval.ms指定)提交一次位移。和很多其他操作一样,自动提交也是由poll方法来驱动的,在调用poll方法的时候,消费者判断是否到达提交时间,如果是则提交上一次poll返回的最大位移。需要注意的是,这种方式可能会导致消息重复消费,假如,某个消费者poll消息后,应用正在处理消息,在3秒后kafka进行了重平衡(消费组其它消费者接管该分区,但只知道上次提交的分区消费位移),那么由于没有更新位移导致重平衡后这部分消息重复消费。

  • 手动提交offset: 设置enable.auto.commit=false,这种方式称为【at least once】。fetch到消息后,等消费完成再调用方法【consumer.commitSync()/consumer.commitAsync();】,手动更新offset;如果消费失败,则offset也不会更新,此条消息会被重复消费一次。

    • 同步提交consumer.commitSync(),指的是Consumer会一直等待提交offset成功,在此期间不能继续拉取以及消费消息,如果提交失败, consumer会一直重复尝试提交,直到超时,默认的时间是60秒。

    • 异步提交consumer.commitAsync(),不会阻塞消费者线程,提交失败的时候不会进行重试。

    • 同步和异步组合提交,在关闭消费者或者再均衡前的最后一次提交,必须要确保提交成功,因此,再消费者关闭前一般会组合使用 commitAsync() 和 commitSync()。它的工作原理如下:先使用 commitAsync() 方法来提交,这样的速度更快,而且即使这次提交失败,但下次可能会成功,直到关闭消费者,没有所谓的下一次提交了,使用 commitSync() 会一直重试,知道提交成功或发生无法恢复的错误。

      try{
          while(true){
              consumer.poll(Duration.ofSeconds(1));
              // ....        
              consumer.commitAsync();
          }
      }catch(Exception e){
          // ...
      }finally{
          try{
              consumer.commitSync();
          }finally{
              consumer.close();
          }
      

三、消息不错序(消息保证有序)

Kafka分布式的单位是partition,同一个partition用一个write ahead log组织,所以可以保证单个分区内消息FIFO的顺序。不同partition之间不能保证顺序。但是绝大多数用户都可以通过message key来定义,因为同一个key的message可以保证只发送到同一个partition,比如说key是user id,table row id等等,所以同一个user或者同一个record的消息永远只会发送到同一个partition上,保证了同一个user或record的顺序。

MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION参数

该参数指定了生产者在收到服务端响应之前可以发送多少个batch消息,默认值为5。batch是 当有多个消息需要背发送到同一个分区时,生产者会把它们放到同一个批次中,该参数指定了一个批次可以使用的内存大小。一般要求MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION小于等于 5 的主要原因是:Server 端的 ProducerStateManager 实例会缓存每个 PID 在每个 Topic-Partition 上发送的最近 5 个batch 数据(这个 5 是写死的,至于为什么是 5,可能跟经验有关,当不设置幂等性时,当这个设置为 5 时,性能相对来说较高,社区是有一个相关测试文档),如果超过 5,ProducerStateManager 就会将最旧的 batch 数据清除。

因为Kafka的重试机制有可能会导致消息乱序,所以我们一般为了保证消息有序会把MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION设置为1,即使发生了重试也会保证分区写入有序. 比如我们常见的订单系统和会员积分系统就是非常鲜明的场景,订单是要创建过后才能取消的,而对应的会员积分是要先增后减的,如果这个顺序不能保证,系统就会出现问题。但设置为1时,传输效率非常差,但是可以解决乱序的问题(当然这里有序只是针对单 client 情况,多 client 并发写是无法做到的)。

在kafka2.0+版本上,只要开启幂等性,不用设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION为1也能保证发送数据的顺序性。可以设置enable.idempotence=true,开启生产者的幂等生产,可以解决顺序性问题,并且允许max.in.flight.requests.per.connection设置大于1。当enable.idempotence设置成true后,Producer自动升级成幂等性Producer。Kafka会自动去重。Broker会多保存一些字段。当Producer发送了相同字段值的消息后,Broker能够自动知晓这些消息已经重复了。


四、不重(消息不重复)

为了程序的健壮性,在使用 Kafka 的时候一般都会设置重试的次数,但是因为网络的一些原因,设置了重试就有可能导致有些消息重复发送了(当然导致消息重复也有可能是其他原因),需要保证数据的不重复消费。对于一些应用场景(比如支付数据等),它们是要求精确计数的,这时候如果上游数据有重复,下游应用只能在消费数据时进行相应的去重操作,应用在去重时,最常用的手段就是根据唯一 id 键做 check 去重。在这种场景下,因为上游生产导致的数据重复问题,会导致所有有精确计数需求的下游应用都需要做这种复杂的、重复的去重处理。试想一下:如果在发送时,系统就能保证 exactly once,这对下游将是多么大的解脱。这就是幂等性要解决的问题,主要是解决数据重复的问题,正如前面所述,数据重复问题,通用的解决方案就是加唯一 id,然后根据 id 判断数据是否重复,Producer 的幂等性也是这样实现的。

生产者幂等性保证机制: 生产者幂等性是通过两个关键信息保证的,PID(Producer ID)和sequence numbers。

  • PID 用来标识每个producer client
  • sequence numbers 客户端发送的每条消息都会带相应的 sequence number,Server 端就是根据这个值来判断数据是否重复

producer初始化会由server端生成一个PID,然后发送每条信息都包含该PID和sequence number,在server端,是按照partition同样存放一个sequence numbers 信息,通过判断客户端发送过来的sequence number与server端number+1差值来决定数据是否重复或者漏掉。通常情况下为了保证数据顺序性,我们可以通过max.in.flight.requests.per.connection=1来保证,这个也只是针对单实例。在kafka2.0+版本上,只要开启幂等性,不用设置这个参数也能保证发送数据的顺序性。引入了幂等性Producer 不论向 Server 发送多少重复数据,Server 端都只会持久化一条。

限制条件:Producer 的幂等性指的是当发送同一条消息时,数据在 Server 端只会被持久化一次,数据不丟不重。生产者幂等性是有条件的:

  • 只能保证 Producer 在单个会话内不丟不重,如果 Producer 出现意外挂掉再重启是无法保证的(幂等性情况下,是无法获取之前的状态信息,因此是无法做到跨会话级别的不丢不重);
  • 幂等性不能跨多个 Topic-Partition,只能保证单个 partition 内的幂等性,当涉及多个 Topic-Partition 时,这中间的状态并没有同步。

如果需要跨会话、跨多个 topic-partition 的情况,需要使用 Kafka 的事务性来实现。

消费者数据幂等性保护机制: 数据不重复也需要保证数据的幂等性:当消费一条消息时就往数据库插入一条数据。如何保证重复消费也插入一条数据呢?那么我们就需要从幂等性角度考虑了。幂等性,我通俗点说,就一个数据,或者一个请求,无论来多次,对应的数据都不会改变的,不能出错。幂等性保证最简单常用的方法是把结果写到一个支持唯一键的系统,在这种情况下要么消息包含唯一的键,要么使用主题分区、偏移量的组合创建唯一键,数据存储时发现已经存在同样键的数据会跳过或覆盖。


五、kafka 事务

Apache Kafka 从 0.11.0 开始,支持了一个非常大的 feature,就是对事务性的支持

kafka事务: Kafka 事务机制支持了跨分区的消息原子写功能。因为producer发送消息可能是分布式事务,所以引入了常用的2PC,所以有事务协调者(Transaction Coordinator)。Transaction Coordinator和之前为了解决脑裂和惊群问题引入的Group Coordinator在选举和failover上面类似。事务管理中事务日志是必不可少的,kafka使用一个内部topic来保存事务日志,这个设计和之前使用内部topic保存位点的设计保持一致。事务日志是Transaction Coordinator管理的状态的持久化,因为不需要回溯事务的历史状态,所以事务日志只用保存最近的事务状态。这一保证在生产者运行时出现异常甚至宕机重启之后仍然成立。此外,同一个事务内的消息将以生产者发送的顺序,唯一地提交到 Kafka 集群上。也就是说,事务机制从某种层面上保证了消息被恰好一次地提交到 Kafka 集群。

应用场景:

  • Kafka 生产者在同一个事务内提交到多个分区的消息,要么同时成功,要么同时失败。

  • 应用先消费一个 topic ,然后做处理再发到另一个 topic ,这个 consume-transform-produce 过程需要放到事务里面,比如消息在 处理或者发送的过程中如果失败了,消费偏移量也不能提交。

事务实现: 实现事务机制最关键的概念就是事务的唯一标识符( TransactionalID ),Kafka 使用 TransactionalID 来关联进行中的事务。TransactionalID 由用户提供,这是因为 Kafka 作为系统本身无法独立的识别出宕机前后的两个不同的进程其实是要同一个逻辑上的事务。对于同一个生产者应用前后进行的多个事务,TransactionalID 并不需要每次都生成一个新的。这是因为 Kafka 还实现了 ProducerID 以及 epoch 机制。这个机制在事务机制中的用途主要是用于标识不同的会话,同一个会话 ProducerID 的值相同,但有可能有多个任期。ProducerID 仅在会话切换时改变,而任期会在每次新的事物初始化时被更新。这样,同一个 TransactionalID 就能作为跨会话的多个独立事务的标识。

如下图所示整个过程类似一个两阶段提交的过程:

abefa304a1e03ce455e8a1659a7f48fb.png

Kafka引入了事务( Transactional )机制和 broker 中的一个新模块 Transaction Coordinator 。生产者首先会发起一个查找事务协调者 (TransactionalCoordinator) 的请求 (FindCoordinatorRequest),Broker 集群根据 Request 中包含的 transactionalId 查找对应的 TransactionalCoordinator 节点并返回给 Producer。

  1. 当 producer 初始化事务的时候,需要先向 Coordinator 注册一个 transactional id ,每 个 producer 的 Transaction ID 必须唯一,同时还会获取一个单调递增的 epoch 。 Coordinator 因此拒绝相同的 Transaction ID 的 producer 的注册,同时可以根据 epoch 拒绝掉老的僵尸进程。

  2. Coordinator 拥有自己的事务日志 transaction log 。该日志也是Kafka的一个topic。这 样保证了事务的持久化和可靠性。Coordinator 会维护每个 Transaction ID 和事务日志 partition 的映射。这样当 broker 异常时,也能通过事务日志恢复到原有的事务状态。

  3. 为了实现 producer 的 幂等特性 ,当 producer 开始一个事务的时候,它会有一个内部的事务序列号 Sequence Number 。当重复提交的消息数据的时候, broker 会检查该序列号:

    • 如果序列号比 broker 维护的序列号大1, broker 会接受它并写入数据。

    • 如果序列号小于等于 broker 维护的序列号,说明数据重复提交, broker 丢弃该消息。

    • 如果序列号比 broker 维护的序列号大于1以上,说明中间有消息尚未被处理, broker 会拒绝该消息。

Consumer端的代码也需要加上isolation.level参数,用以处理事务提交的数据。而consumer端读取事务消息主要由consumer端隔离级别体现,它类似于数据库中隔离级别的概念,目前只是简单分为:read_uncommitted和read_committed,其中后者指的是consumer只能读取已成功提交事务的消息(当然也包括非事务型producer生产的消息)。isolation.level参数设置情况

  • 设置为read_committed时候是生产者已提交的数据才能读取到
  • 设置为read_uncommitted时候可以读取到未提交的数据

六、kafka实现延时队列

1、划分topic进行拦截消费: 如果队列允许一定时延误差(秒级):可以按照消息的预计发送时间,划分出来一些延时队列等级。例如延时时长最长为1小时,那么我们 可以划分出来,5s、10s、30s、1min、2min、5min、10min、20min、30min、45min、1h。延时的消息按照不同的延时等级,被放入不同的 topic 当中,如果是和等级不一致的消息,会被强制转为和等级一致的延时时间,这样延时的误差可以控制在两个延时等级的时间差范围内。producer写入数据到不同topic延时队列(delay_topic)中 ,消费者消费真正(real_topic)主题中的数据,我们可以在produce中默认启动一个拦截器,把拦截写入的数据写入到delay_topic中,当时间到了后会被拦截服务把延时队列中的数据加载到real_topic中供消费者消费。

2、时间轮延时队列:Kafka有专门用于实现延迟功能的定时器(SystemTimer)。底层使用的是时间轮(TimingWheel)模型(环形队列,下面挂接双向链表)。时间轮由多个时间格组成,每个时间格代表当前时间轮的基本时间跨度(tickMs)。时间轮的时间格个数是固定的,可用wheelSize来表示,那么整个时间轮的总体时间跨度(interval)可以通过公式 tickMs × wheelSize计算得出。时间轮还有一个表盘指针(currentTime),用来表示时间轮当前所处的时间,currentTime是tickMs的整数倍。currentTime可以将整个时间轮划分为到期部分和未到期部分,currentTime当前指向的时间格也属于到期部分,表示刚好到期,需要处理此时间格所对应的TimerTaskList的所有任务。

图1

Kafka中的时间轮(TimingWheel)是一个存储定时任务的环形队列,钟面上有很多 bucket ,每一个 bucket 上可以存放多个任务,使用一个 List 保存该时刻到期的所有任务,同时一个指针随着时间流逝一格一格转动,并执行对应 bucket 上所有到期的任务。任务通过 取模决定应该放入哪个 bucket 。bucket底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList)。TimerTaskList是一个环形的双向链表,链表中的每一项表示的都是定时任务项(TimerTaskEntry),其中封装了真正的定时任务TimerTask。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐