不同broker之间的关系

Kafka使用zookeeper来维护集群成员的信息。每个broker都有一个唯一标识符,这个标识符可以在配置文件中指定,也可以自动生成。在broker启动时,它通过建立临时节点把自己的ID注册到zookeeper。kafka组件订阅broker在zookeeper上的注册路径,当有broker进入或退出集群时,这些组件就可以获得通知

在broker停机、出现网络分区或长时间垃圾回收停顿时,broker会从zookeeper上断开连接,此时broker的临时节点会自动从zookeeper上移除。监听broker列表的kafka组件会被告知该broker已移除。

在关闭broker时,它对应的节点也会消失,不过它的ID会继续存在于其他数据结构中。例如,主题的副本列表里就可能包含这些ID。在完全关闭一个broker之后,如果使用相同的ID启动另一个全新的broker,它会立即加入集群,并拥有与旧broker相同的分区和主题。

控制器

控制器其实也是一个broker,只不过它除了具有一般broker的功能之外,还负责分区首领的选举集群里第一个启动的broker通过在zookeeper里创建一个临时节点让自己成为控制器。其他的broker在控制器节点上创建zookeeper watch对象,这样它们就可以收到这个节点的变更通知。

如果控制器被关闭或者与zookeeper断开连接,zookeeper上的临时节点就会消失。集群里的其他broker通过watch对象得到控制器节点消失的通知,它们尝试让自己成为新的控制器。第一个在zookeeper里成功创建控制器节点的broker成为新的控制器,其他的broker会在新的控制器节点上创建新的watch对象。每个新选出的控制器通过zookeeper的条件递增操作获得一个全新的数值更大的controller epoch。同时,控制器使用epoch来避免“脑裂”。脑裂是指两个节点同时认为自己是当前的控制器。

当控制器发现一个broker已经离开集群(通过观察相关的zookeeper路径),它就知道,那些失去首领的分区需要一个新首领(这些分区的首领刚好是在这个broker上)。控制器遍历这些分区,并确定谁应该成为新首领,然后向所有包含新首领或者现有跟随者的broker发送请求。该请求中包含了谁是新首领以及谁是分区跟随者的信息。

当控制器发现一个broker加入集群时,它会使用brokerID来检查新加入的broker是否包含现在分区的副本。如果有,控制器就把变更通知发送给新加入的broker和其他broker,新broker上的副本开始从首领那里复制消息。

复制

kafka使用主题来组织数据,每个主题被分为如干个分区,每个分区有多个副本。那些副本被保存在broker上,每个broker可以保存成百上千个属于不同主题和分区的副本。

副本类型

  • 首领副本
    每个分区都有一个首领副本。为了保证一致性,所有生产者和消费者请求都会经过这个首领副本
  • 跟随者副本
    首领以外的副本都是跟随者副本跟随者副本不处理来自客户端的请求,它们唯一的任务就是从首领那里复制消息,保持与首领一致的状态。如果首领发生崩溃,其中一个跟随者会被提升为新首领,选举新首领是由控制器broker来完成的。

首领的另一个任务是搞清楚哪个跟随者的状态和自己是一致的。为了与首领保持一致,跟随者向首领发送获取数据的请求,这种请求和消费者为了读取消息而发送的请求是一样的。首领将响应消息发送给跟随者。请求消息里包含了跟随者想要获取消息的偏移量,而且这些偏移量总是有序的。例如,一个跟随者副本先请求消息1,再请求消息2,然后请求消息3,在收到这三个请求的响应前,它是不会发送第4个请求消息的。如果跟随者发送了第4个消息,首领就知道它已经收到了前三个响应。通过查看每个跟随者的请求的最新偏移量,首领就会知道每个跟随者的复制进度。

从复制的角度,副本又可以分为同步副本非同步副本

同步副本需要满足的条件

分区首领是同步副本,但对于跟随者副本来说是否是同步副本,需要满足下面三个条件:

  • 与zookeeper之间有一个活跃的会话,也就是说,它在过去的6s(可配置)内向zookeeper发送过心跳。
  • 在过去10s内(可配置replica.lag.time.max.ms)从首领那里获取过最新消息。光从首领那里获取消息是不够的,它还必须是几乎零延迟的。

如果跟随者副本不能满足以上任何一点,那么它就会被认为是不同步副本。跟随者的正常不活跃时间或者称为不同步副本之前的时间是通过replica.lag.time.max.ms参数来设置的。这个时间间隔直接影响着首领选举期间的客户端行为和数据保留机制。一个不同步副本通过和zookeeper重新建立连接,并从首领那里获取最新消息,可以重新变成同步的。但是重新变成同步的过程在网络出现临时问题并很快得到修复的情况下会很快完成,但如果broker

一个滞后的同步副本会导致生产者和消费者变慢,因为在消息被认为已提交之前,客户端会等待所有的同步副本接收到消息。而如果一个副本不同步了,我们就不再关心它是否已经收到消息。虽然非同步副本同样滞后,但是它不会对性能产生任何影响。但是,更少的同步副本意味着更低的有效复制系数,在发生宕机时丢失数据的风险更大。当然是否需要所有同步副本接收到消息才算提交,是可以通过生产者acks配置的,当然这个参数的配置需要我们在性能和一致性之间做出选择。

我们知道一般在首领失效时,只有同步副本才有可能被选为最新首领副本。不同步副本是不能被选举为首领的,毕竟它没有包含全部消息。但是,如果我们将参数unclean.leader.election.enable设为true,就是允许不同步副本成为首领(也就是不完全的选举)。一般我们不提倡开启不完全选举,但是对于一些可用性要求比较高,比如实时点击流分析系统,一般会启用不完全的首领选举。

根据kafka对可靠性数据保证的定义,消息只有在被写入到所有的同步副本之后才被认为是已提交的。但如果这里的“所有同步副本”只包含一个,那么在这个副本变为不可用的时,数据就会丢失。这时我们最好确保数据被写入不止一个副本,这就是需要把最小同步副本数量设置的大些。这样能保证在首领失效时,有其他的同步副本能被选为首领,但是如果同步副本数量小于我们设置的最小同步副本,那么broker就会停止接受生产者的请求。最小同步副本是通过min.insync.replicas设置的,在主题级别和broker级别都有这个参数,根据需要去设置。

除了当前首领外,每个分区都有一个首选首领(分区的副本清单中第一个副本一般就是首选首领)—创建主题时选定的首领就是分区的首选首领。之所以把它叫做首选首领,是因为在创建分区时,需要在broker之间均衡首领。因此,我们希望首选首领成为真正的首领。默认情况下,auto.leader.rebalance.enable为true,它会检查首选首领是不是当前首领,如果不是,并且该副本是同步副本的,就会触发首领选举,让首选首领成为当前首领。

broker处理请求

kafka提供一个二进制协议,制定了请求消息的格式以及broker如何对请求作出响应。客户端发起连接和发送请求,broker按照请求到达的顺序来处理他们,这种顺序保证了kafka具有消息队列的特性,同时保证保存的消息也是有序的。

那么broker是如何处理请求的呢?
答:broker会在它所监听的每个端口上运行一个Acceptor线程,这个线程会创建一个连接,并把它交给Processor线程去处理。Processor线程(网络线程)的数量是可配置的。网络线程负责从客户端获取请求消息,把它们放进请求队列,请求消息放入请求队列后,IO线程会处理它们,并将处理结果放入响应队列,然后网络线程从响应队列中获取响应消息,把它们发送给客户端。

生产请求和获取请求都必须发送给分区的首领副本。如果broker收到一个针对特定分区的请求,而该分区的首领在另一个broker上,那么发送请求的客户端会收到一个“非分区首领”的错误响应。同样获取请求时也会有同样的要求。kafka客户端要自己负责把生产请求和获取请求发送给正确的broker上。

那么客户端怎么知道该往哪里发送请求呢?
答:客户端使用了另一种请求类型,也就是元数据请求。这些请求中包含了客户端感兴趣的主题列表,以及这些主题包含的分区,每个分区都有哪些副本,以及哪个副本是首领。元数据请求可以发送给任何一个broker,因为所有broker都缓存了这些消息。
一般情况下,客户端会把这些信息缓存起来,并直接往目标broker上发送生产请求和获取请求。客户端需要时不时地通过发送元数据请求来刷新这些信息(刷新频率可以通过metadata.max.age.ms配置)。如果客户端收到“非首领”错误,客户端在尝试重发请求之前先刷新元数据,因为这个错误说明了客户端正在使用过期的元数据信息。

客户端从broker上获取消息时,客户端可以指定broker最多可以从一个分区里返回多少数据。这个限制非常重要的,因为客户端需要为broker返回的数据分配足够的内存。如果请求的偏移量存在,broker将按照客户端指定的数据上限从分区里返回读取消息,再把消息返回给客户端。kafka使用零复制技术向客户端发送消息—kafka直接把消息从文件里发送到网络通道,而不需要经过任何的缓冲区。客户端除了可以设置broker返回数据的上限,还可以设置下限。

分区分配

kafka的基本存储单元就是分区。分区无法在多个broker间进行再细分,也无法在同一个broker的多个磁盘上进行细分。
对于分区分配的理解,我们可以用一个实际场景来解释。例如,假设我们有6个broker,打算创建一个包含10个分区的主题,复制系数3,那么kafka就会有30个分区副本。在进行分区分配时,我们要达到如下的目标:

  • 在broker间均匀地分布分区副本。
  • 确保每个分区的每个副本分布在不同的broker上。
  • 如果为broker制定了机架信息,那么尽可能把每个分区的副本分配到不同的机架的broker上。

为了实现这个目标,我们先随机选择一个broker(假设是4),然后使用轮询的方式给每个broker分配首领。于是,首领分区0会在broker4上,首领分区1在broker5上,首领分区2在broker0上,以此类推。然后从分区首领开始,依次分配其跟随者副本。如果分区0的首领在broker4上,那么它的第一个跟随者在broker5上,第二个跟随者副本在broke0上。其他的分区也是同样的道理。

broker中可靠性保证

复制系数

主体级别的配置参数是replication.factor,而在broker级别则可以通过default.replication.factor来配置自动创建的主题。
复制系数N就需要至少N个broker,而且会有N个数据副本。也就是说复制系数其实也就决定这分区的副本个数。那么在N-1个broker失效的情况下,仍然能够从主体读取数据或向主体写入数据。
例如我们假设主题的复制系数为3,也就是说每个分区总共会被3个不同的broker复制三次。不过用户可以修改它。即使是在主题创建之后,也可以通过新增或者移除副本来改变复制系数。
从上面的描述可以看出更高的复制系数意味着更高的可用性、可靠性,但同时也意味着要付出更多磁盘空间,这就需要我们在可用性和存储空间之间做出权衡。那复制系数设置多少合适呢?遗憾的是这个没有统一的标准,要根据主题的重要层度来定。如果复制系数为1,那就意味着如果这个分区的副本所在的broker宕机时,这个分区就会立马不可用;如果复制系数为2可以容忍一个broker发生失效,看起来是足够了,但是有时候1个broker是可能会导致集群不稳定,迫使你重启另一个broker(集群控制器)。也就是说复制系数为2时,有可能因为重启等问题导致集群不可用,所以一般推荐设置为3,这也是kafka默认的设置
另外,出于高可用的考虑,kafka默认会确保分区的每个副本被放在不同的broker上。但是为了更安全的考虑,我们最好将不同broker放到不同的机架上,这样防止一个机架发生故障导致所有的broker全军覆没。

不完全首领选举

允许不同步副本成为首领就是不完全的选举。现实业务中可能会出现首领不可用的同时其他的副本又都是不同步的,这时如果我们不允许不完全的选举,那么就要接收较低的可用性,因为分区在旧首领(最后一个同步副本)恢复之前是不可用的,有时候这种状态会持续数小时;但是如果我们允许不完全的选举就会承担丢失数据和数据不一致的风险。所以是否允许不完全的选举需要我们根据具体的业务场景来定,如果对可用性要求比较高,并且对消息丢失是可接受的,就可以允许不完全的选举。但是对于一些对数据质量和数据一致性要求比较高的系统会禁用这种不完全的首领选举。是否允许不完全选举是通过unclean.leader.election.enable参数来控制的,此参数只能在broker级别(实际上是在集群范围内)进行配置,默认值是true。
但是需要注意的是如果不同步副本被选举为首领后,旧的首领重新变得可用,那么旧的首领会成为新首领的跟随者。这个时候,旧的首领会把比当前首领旧的消息全部删除,而这些消的对于所有消费者来说都是不可用的。

最小同步副本

在主题级别和broker级别上,通过min.insync.replicas来设置。
我们知道kafka对可靠性的保证的定义,消息只有在被写入到所有的同步副本之后才被认为是已提交的
为了数据的可靠性,我们需要把上述参数设置为大于1,这样就可以保证已被提交的消息在超过一个副本上。例如,如果我们把这个参数设置为2,某个分区的同步副本数只要不小于2,那么这个分区就可以正常的工作,但是如果同步副本数小于2,那么broker就会停止接收生产者的消息。尝试发送消息的生产这会收到NotEnoughReplicasException异常。**消费者仍然可以继续读取已有的数据。其实这时当只剩下一个同步副本时,它就变成只读了,这是为了避免在发生不完全选举时数据的写入和读取出现非预期的行为。要想从只读状态中恢复,必须让之前其他不同步的副本变得同步才可以。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐