消费消息概述

一般而言,kafka消费者从属于消费者群组。一个群组里的消费者订阅的是同一个主题。每个消费者接收主题一部分分区的消息。如果我们往群组里添加更多的消费者,超过了主题分区的数量,那么就有一部分消费者被闲置,不会接收到任何消息。但是如果有多个消费者群组消费同一个主题,那么不同消费者群组之间都可以获取到这个主题的所有消息,相互之间没有任何影响。这样就造成了一个问题就是消息可能被消费多次。但是在分布式情况下,很容易出现同一个消费者群组我们部署了多个服务实例,这种情况消息是怎么消费的呢?

创建消费者

消费者类型

  • 消费者群组里的消费者
  • 独立消费者

消费者群组和分区再均衡

想一下这样的场景:如果一个消费者群组中添加了一个新的消费者,或者有一个消费者被关闭或发生崩溃退出消费者群组,或者同一个主题添加了新的分区,那这个时候消费者群组该怎么去消费消息呢?这时候就会对分区进行重新分配,以便主题里的所有消息都可以正常的被消费。
我们把分区的所有权从一个消费者转移到另一个消费者,这样的行为称为再均衡。再均衡非常的重要,它使得我们可以放心的添加和移除消费者。但是再均衡期间也会带来很多问题,例如:消息漏读或者重复读,我们后面会着重介绍这些特别的场景,所以我们并不希望再均衡频繁发生甚至不希望发生。

消费者在轮询消息(为了获取消息)或提交偏移量时通过向被指派为群组协调器的broker发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。如果消费者停止发送心跳的时间足够长,并停止读取消息,群组协调器在等待几秒后,认为它死亡了,就会触发一次再均衡。在这几秒钟时间里,死掉的消费者不会读取分区里的消息。在清理消费者时,消费者会通知群组协调器它将要离开群组,协调器会立即触发一次再均衡。

再均衡期间,消费者无法读取消息,造成整个群组一小段时间的不可用。另外,当分区被重新分配给另一个消费者时,消费者当前的读取状态会丢失,它有可能需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。

消费者心跳行为

在0.10.1及更新版本中,和之前版本中心跳检查不太一样,这里引入了一个独立的心跳线程,可以在轮询消息的空档发送心跳。这样一来,发送心跳的频率和消息轮询的频率(由处理消息所花费的时间确定)之间就是相互独立的。在新版本kafka里,可以指定消费者离开群组并触发再均衡之前可以有多长时间不进行消息轮询,这样可以避免出现活锁。例如有时应用程序并没有崩溃,只是由于某些原因导致无法正常工作。这个配置和session.timeout.ms是相互独立的,下面讲解相关配置时会区分两者的含义和区别。

分配分区过程

消费者加入群组时,会向群组协调器发送一个JsonGroup的请求。第一个加入群组的消费者将成为群主。群主从群组协调器那里获取群组的成员列表(列表中包含所有最近发送过心跳的消费者,它们被认为是活着的),并负责给它们分配分区。群主使用一个实现了PartitionAssignor接口的类来决定哪个分区应该分配给哪个消费者。

kafka内置了两种分配策略,通过配置参数来设置,后面我们会讲到。
群主把分配情况列表发送给群组协调器,协调器再把这些信息发送给所有消费者。每个消费者只能看到自己的分配信息,只有群主知道群组里所有消费者的分配信息。这个过程会在每次再均衡时重复发生。

轮询

消息轮询是消费者api的核心,通过一个简单的轮询向服务器请求数据。当然轮询不只获取数据那么简单。在第一次调用新消费者的poll()方法时,它会负责查找GroupCoordinator,然后加入群组,接收分配的分区。如果发生了再均衡,整个过程也是在轮询期间进行的。当然,心跳也是从轮询里发送出去的,所以,要确保在轮询期间所有的任何处理工作都应该尽快的完成。

轮询的过程:

轮询是一个无限循环。消费者实际上是一个长期运行的应用程序,它通过持续轮询向kafka请求数据。
消费者必须持续对kafka进行轮询,否者会被认为已经死亡,它的分区会被移交给群组里的其他消费者。传给poll()方法的参数是一个超时时间,用于控制poll()方法的阻塞时间(在消费者的缓冲区里没有可用数据时会发生阻塞)。
poll()方法返回一个记录列表。每条记录都包含了记录所属的主题的信息,记录所在分区的信息,记录在分区里的偏移量,以及记录的键值对。我们一般会遍历这个列表,逐条处理这些记录。把结果保存起来或者对已有的记录进行更新,处理过程也随之结束。

在退出应用程序之前使用close()方法关闭消费者。网络连接和socket也会随之关闭,并立即出发一次再均衡,而不是等待群组协调器发现它不再发送心跳并认定它已死亡。

消费者配置

  • fetch.min.bytes
    该参数指定了消费者从服务器获取记录的最小字节数。
  • fetch.max.wait.ms
    该参数告诉kafka等有足够的的数据后再返回给我们,但是如果超过了此参数设置的时间,即便没有足够的数据也会返回。
  • max.partiton.fetch.bytes
    该属性指定了服务器从每个分区里返回给消费者的最大字节数。默认值是1M,也就是说,kafkaConsumer.poll()方法从每个分区里返回的记录最多不超过这个参数指定的字节。例如:一个主题有20个分区,5个消费者,那么每个消费者需要至少4M的内存来接收记录。但实际设置时要分配多一点,因为如果群组有消费者发生崩溃,剩下的消费者需要处理更多的分区。并且此值必须比broker能够接收的最大消息(max.message.size)的字节数大。
  • session.timeout.ms
    该属性制定了消费者在被认为死亡之前可以与服务器断开连接的时间。默认3s
  • auto.offset.reset
    该属性置顶了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长时间失效,包含偏移量的记录已经过时并被删除)该作何处理。有两个选择值:latest:在偏移量无效的情况下,消费者将从最新的记录开始读取数据;earliest:在偏移量无效的情况下,消费者将从起始位置读取分区的记录。
  • enable.auto.commit
    该属性指定了消费者是否自动提交偏移量,默认是true。自动提交的一个最大的好处是,在实现消费者逻辑时可以少考虑一些问题。如果你在消费者轮询操作里处理所有的数据,那么自动提交可以保证之前提交已经处理过的偏移量。自动提交的主要缺点是,无法控制重复处理消息(比如消费者在自动提交偏移量之前停止处理消息),而且如果把消息交给另外一个后台线程去处理,自动提交机制可能会在消息还没有处理完毕就提交偏移量了。
  • auto.commit.interval.ms
    该属性指定自动提交的时间间隔。默认5s。如果选择了自动提交偏移量,可以通过该参数配置提交的额度。一般来说,频繁提交会增加额外的开销,但也会降低重复处理消息的概率。
  • partition.assignment.strategy
    该参数决定哪些分区应该分配给哪些消费者。有两种策略:
  • Range
    该策略会把主题的若干个连续的分区分配给消费者。
  • RoundRobin
    该策略把主题的所有分区逐个分配给消费者。
  • client.id
    该属性可以是任意的字符窜,broker用它来标识从客户端发送过来的消息,通常被用在日志,度量指标和配额里。
  • max.poll.records
    该属性用于控制单次调用call()方法能够返回的记录数量。

提交和偏移量

每次调用poll()方法,kafka总是返回由生产者写入但是还没有被消费者读取过的记录,我们因此可以追踪到哪些记录是被群组里的哪个消费者读取的。kafka不会像其他JMS队列那样需要得到消费者的确认。相反,消费者可以用kafka来追踪消息在分区里的位置(偏移量)。
我们把更新分区当前位置的操作叫做提交
  
那么消费者如何提交偏移量的呢?
消费者往一个叫_consumer_offset的特殊主题里发送消息,消息里包含每个分区的偏移量。如果消费者一直处在运行状态,那么偏移量就没有什么用处。不过,如果发生了再均衡,每个消费者都可能分配了新的分区,而不是之前那个。为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。

这里的提交操作可能会出现有两种问题:

  • 如果提交的偏移量小于客户端处理的最后一个消息的偏移量,那么处理两个偏移量之间的消息就会被重复处理。
  • 如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么就可能会丢失消息。

提交的方式

  • 自动提交
    这种方式是最简单的提交方式。通过这两个参数enable.auto.commit和auto.commit.interval.ms来控制。使用自动提交时,每次调用轮询方法都会把上次调用返回的偏移量提交上去,它并不知道具体哪些消息已经被处理了,所以在再次调用之前最好确保所有当前返回的消息都已经处理完毕(在调用close()方法之前也会进行自动提交)。

注意,自动提交虽然方便,但是并没有为开发者留余地来避免重复处理消息。

  • 提交当前偏移量
    大部分开发者通过控制偏移量提交时间来消除丢失消息的可能性,并在发生再均衡时减少重复消息的数据。消费者api提供了commitSync()方式提交偏移量。要记住,commitSync()将会提交由poll()返回的最新偏移量,所以在处理完所有记录后要确保调用commitSync(),否者还是会有丢失消息的风险。如果发生了再均衡,从最近一批消息到发生再均衡之间的所有消息都将被重复处理。另外我们需要知道是,在commitSync()提交过程中,只要没有遇到不可恢复的错误,此方法会一直尝试提交直到成功。并且
  • 异步提交
    手动提交有一个不足之处,在broker对提交请求作出响应之前,应用程序会一直阻塞。这样会限制应用程序的吞吐量。我们可以降低提交频率来提升吞吐量,但是如果发生了再均衡,会增加重复消息的数量。
    消费者api提供异步提交偏移量的api。我们只管发送提交请求,无需等待broker的响应。但是异步提交在成功提交或碰到无法恢复的错误之前不会一直重试,这是因为它收到服务器响应的时候,可能有一个更大的偏移量已经提交成功了。需要注意的是异步提交commitAsync()支持回调,一般回调被用于记录提交错误或生成度量指标,不过如果要用它来进行重试,一定要注意提交的顺序。
  • 同步和异步组合提交
  • 提交特定的偏移量
    提交偏移量的频率和处理消息批次的频率是一样的。但如果想要更频繁地提交该怎么办呢?如果poll()方法返回一大批数据,为了避免因再均衡引起的重复处理整批消息,想要在批次中间提交偏移量该怎么办?这种情况,无法通过调用commitSync()和commitAsync()来实现,因为它们只会提交最后一个偏移量。幸运的是,消费者api允许在调用commitSync()和commitAsync()方法时传入希望提交的分区和偏移量的map。不过,因为消费者可能不只读取一个分区,这就需要跟踪所有分区的偏移量,所以在这个层面上控制偏移量的提交会让代码变复杂。

再均衡监听器

消费者退出和进行分区再均衡之前,会做一些清理工作。消费者失去对一个分区的所有权之前提交最后一个已经处理记录的偏移量。如果消费者准备了一个缓冲区来处理偶发事件,那么在失去分区所有权之前,需要处理在缓冲区积累下来的记录。

在为消费者分配新分区或者移除旧分区时,可以通过消费者Api执行一些应用程序代码,在调用subscribe()方法时传入一个ConsumerRebalanceListener实例就可以了,ConsumerRebalanceListener有两个方法需要实现:

  • onPartitionsRevoked:此方法会在再均衡开始之前和消费者停止读取消息之后被调用。
  • onPartitionsAssigned:此方法会在重新分配分区之后和消费者开始读取消息之前被调用。

如果发生再均衡,我们需要在即将失去分区所有权时提交偏移量。要注意,提交的是最近处理过的偏移量,而不是批次中还在处理的最后一个偏移量。因为分区有可能在我们还在处理消息的时候被撤回。我们要提交所有分区的偏移量,而不只是那些即将失去所有权的分区的偏移量,因为提交的偏移量是已经处理过的,所以不会有什么问题

从特定偏移量开始处理记录

消费者api为我们提供了seekToBeginning(Collection tp)和seekToEnd(Collection tp)两个方法,可以从分区的起始位置和末尾位置读取消息。不过,kafka也为我们提供了用于查找特定偏移量的api,然后通过seek()方法,从某个分区的特定偏移量来读取消息。
  
如果我们对消费的重复读和漏读一点都无法容忍,那这个时候可能就要考虑将分区的偏移量保存在数据库中,因为这样可以做到消息的处理和偏移量的提交放到一个事务中进行原子处理。而如果将偏移量提交到kafka保存,即便你每处理一条消息提交一次偏移量,仍然会无法避免在你处理完消息之后提交偏移量之前应用程序崩溃的情况。

消费者如何退出

我们不需要担心消费者会在一个无线循环里轮询消息,我们会告诉消费者如何优雅的退出循环。
如果确定要退出循环,需要通过另一个线程调用consumer.wakeup()。如果循环运行在主线程里,可以在ShutdownHook里调用该方法。consumer.wakeup()是消费者唯一一个可以从其他线程里安全调用的方法。

可靠系统里使用消费者

我们知道只有被提交的数据,也就是那些已经被写入所有同步副本的数据,对消费者时可以用的。在从分区读取消息时,消费者会获取一批事件,检查这批事件里最大的偏移量,然后从这个偏移量开始读取另外一个事件。这样可以保证消费者总能以正确的顺序获取新数据。如果一个消费者退出了,另一个消费者需要知道从那个地方继续获取消息进行处理,这就需要我们非常重视偏移量提交的时间和提交的方式。为了更可靠的消费消息,我们需要在下面两个方面做出合理的配置。

消费者的可靠性配置

为了保证消费者行为的可靠性,需要注意一下4个非常重要的配置参数:

  • group.id
    如果两个消费者具有相同的group.id,并且订阅了同一个主题,那么每个消费者会分到主题分区的一个子集,也就是说它们只能读取到所有消息的一个子集(不过整个群组会读取主题所有的消息)。如果希望某个消费者看到主题的所有的消息,那么需要它设置唯一的group.id
  • auto.offset.reset:上面已详述不在赘述
  • enable.auto.commit:上面已详述不在赘述
  • autol.commit.interval.ms:上面已详述不在赘述

显示提交偏移量

如果选择了自动提交偏移量,就不需要关心显示提交的问题。不过如果希望能够更多控制偏移量提交的时间点,那么就要仔细想想该如何提交偏移量了–要么是为了减少重复处理消息,要么是为了把消息处理逻辑放在了轮询之外。
这里我们会重点说几个在开发具体可靠性的消费者应用程序时需要注意的事项:

  • 总是在处理完事件后在提交偏移量
    如果所有的处理都是在轮询里完成,并且不需要在轮询之间维护状态(比如为了实现聚合操作),那么就可以使用自动提交,或者在轮询结束时进行手动提交。

  • 提交频度是性能和重复消息数量之间的权衡
    即使是在最简单的场景里,比如所有的处理都在轮询里完成,并且不需要在轮询之间维护状态,你仍然可以在一个循环里多次提交偏移量(甚至可以每处理完一个事件之后),或者多个循环里只提交一次,这完全取决于我们在性能和重复处理消息之间的权衡。

  • 确保对提交的偏移量心里有数
    在轮询过程中提交偏移量有一个不好的地方,就是提交的偏移量有可能是读取到的最新偏移量,而不是处理过的最新偏移量。注意,在处理完消息后再提交偏移量是非常关键的–否则会导致消费者错过消息。

  • 再均衡
    在设计应用程序时要注意处理消费者的再均衡问题。一般要在分区被撤销之前提交偏移量,并在分配到新分区时清理之前的状态。

  • 消费者可能需要重试

  • 消费者可能需要维护状态
    有时候我们可能希望在多个轮询之间维护状态,例如,你想计算消息的移动平均数,希望在首次轮询之后计算平均数,然后再后续的轮询中更新这个结果。如果进程重启,我们除了希望从上一个偏移量开始处理数据,还要回复移动平均数。有一种办法是在提交偏移量的同时把最近计算的平均数写到一个“结果”主题中。消费者线程在重启之后,它可以拿到最近的平均数并接着计算。不过这并不是完全可靠,因为kafka不支持事务。消费者有可能在写入平均数来不及提交偏移量就崩溃了,反过来也一样。这个问题很复杂,建议我们尝试一下kafkaStreams这个类库。

  • 长时间处理
    有时候我们业务逻辑中处理数据需要很长时间。要记住,暂停轮询的时间不能超过几秒钟。及时不想获取更多的数据,也要保持轮询,这样客户端才能往broker发送心跳。一种常见的做法是使用一个线程池来处理数据,因为使用线程池可以进行并行处理,从而加快处理速度。再把数据移交给线程池去处理之后,你就可以暂停消费者,然后保持轮询,但不获取新数据,直达工作线程处理完成。在工作线程处理完成之后,可以让消费者继续获取新数据。因为消费者一直保持轮询,心跳会正常发送,就不会发生再均衡

  • 仅一次传递
    实现仅一次处理最简单且常用的办法是把结果写入一个支持唯一键的系统里。这种情况下,要么消息本身包含一个唯一键(通常是这样),要么使用主题、分区和偏移量的组合来创建唯一键----它们的组合可以唯一标识一个kafka记录。如果把消息和一个唯一键写入系统,然后碰巧又读到一个相同的消息,只要把原先的键值覆盖掉即可。数据存储引擎会覆盖已存在的键值对,就像没有出现过重复数据一样,这个模式叫幂等性写入
    如果写入消息的系统支持事务,就可以使用另一种方法。最简单的是使用关系型数据库。我们把消息和偏移量放在同一个事务里,这样他们就能保持同步。在消费者启动时,它会获取最近处理过的消息偏移量,然后调用seek()方法从该偏移量位置继续读取数据。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐