Kafka生产真实案例-生产者阻塞事件
Kafka真实案例场景一:生产者阻塞事件1、事件背景该事件的背景是基于一起真实故障事件,在公司的一个项目上线运行过程中,由于某种原因kafka集群挂死一台(总共3台作为一个集群),导致kafka客户端(该客户端是一个web服务)用户线程不释放(由于编码不严谨的原因,导致用户线程跟kafka生产者发送线程在同一个线程中),导致客户端程序不可访问,引起业务系统故障;基于该事件背景,我们需要思考一下几个
Kafka真实案例场景一:生产者阻塞事件
1、事件背景
该事件的背景是基于一起真实故障事件,在公司的一个项目上线运行过程中,由于某种原因kafka集群挂死一台(总共3台作为一个集群),导致kafka客户端(该客户端是一个web服务)用户线程不释放(由于编码不严谨的原因,导致用户线程跟kafka生产者发送线程在同一个线程中),导致客户端程序不可访问,引起业务系统故障;基于该事件背景,我们需要思考一下几个问题:
-
kafka集群挂死1台,为什么会造成不可用?kafka的高可用呢?
-
kafka集群不可用后,为什么会堵住客户端线程,不是号称异步发送吗?
-
如果kafka集群不可用后,客户端该如何保证不影响用户线程?
要解决或者说要搞明白以上几个问题,我们需要深入了解kafka一些底层原理
2、kafka架构简介
上图架构中,producer用于生产数据,topic partition用于接收并存储数据,consumer用于消费数据,zookeeper用于协调集群状态
我们可以看到上图中Broker和Consumer有使用到ZooKeeper,而Producer并没有使用到ZooKeeper,因为Kafka从0.8版本开始,Producer并不需要根据ZooKeeper来获取集群状态,而是在配置中指定多个Broker节点进行发送消息,同时跟指定的Broker建立连接,来从该Broker中获取集群的状态信息,这是Producer可以知道集群中有多少个Broker是否在存活状态,每个Broker上的Topic有多少个Partition,Producer会讲这些元信息存储到Producer的内存中。如果Producer像集群中的一台Broker节点发送信息超时等故障,Producer会主动刷新该内存中的元信息,以获取当前Broker集群中的最新状态,转而把信息发送给当前可用的Broker,当然Producer也可以在配置中指定周期性的去刷新Broker的元信息以更新到内存中
备注:broker使用zookeeper用于controller选举,consumer使用zookeeper用于offset管理
注意:只有Broker和ZooKeeper才是服务端,而Producer和Consumer只是Kafka的SDK罢了
基本特性
可扩展:
1.在不需要下线的情况下进行扩容
2.数据流分区(Partition)存储在多个机器上
高性能:
1.单个Broker节点就能服务上千个客户端
2.单个Broker节点每秒钟读/写可达每秒几百兆字节
3.多个Brokers组成的集群将达到非常强的吞吐能力
4.性能稳定,无论数据多大
5.Kafka在底层弃用了Java堆缓存机制,采用了操作系统级别的页缓存,同时将随机写操作改为顺序写,再结合Zero-Copy的特性极大地改善了IO性能。
持久存储:
1.存储在磁盘上
2.冗余备份到其它服务器上以防止节点故障及丢失
3、kafka中几种对象
3.1、broker
Kafka 集群包含一个或多个服务器,服务器节点称为broker,在kafka启动时,需在配置文件中指定集群中唯一的broker.id
3.2、controller
Kafka集群的多个broker中,同一时刻有且仅有一个会被选举controller,负责管理整个集群中partition和replicas(follower)的状态。只有 Broker Controller 会向 zookeeper 中注册 Watcher,其他 broker 及分区无需注册。controller的职责有两个方面,一方面他要为集群中的所有topic partitioin选取leader;另一方面,它还承载着集群的全部元数据信息,并负责将这些元数据信息同步到其他broker上
总结:Controller 负责 leader 的选举,维护与同步元数据。
3.3、topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处),类似于数据库的表名
3.4、partition
topic中的数据分割为一个或多个partition。每个topic至少有一个partition。每个partition中的数据使用多个segment文件存储。partition中的数据是有序的,不同partition间的数据丢失了数据的顺序。如果topic有多个partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。
在一个可配置的时间段内,Kafka集群保留所有发布的消息,不管这些消息有没有被消费。比如,如果消息的保存策略被设置为2天,那么在一个消息被发布的两天时间内,它都是可以被消费的。之后它将被丢弃以释放空间。Kafka的性能是和数据量无关的常量级的,所以保留太多的数据并不是问题。
3.5、partition leader
每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition,leader是由controller进行选举的
3.6、partition follower
Follower是分区的副本,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,重新创建一个Follower。
3.7、 producer
生产者即数据的发布者,该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。
3.8、 consumer
消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。
3.9、consumer group
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。组内的消费者共同消费同一个topic的不同partition,一个partition不会被一个组内的不同消费者消费
3.10、zookeeper
Zookeeper 负责维护和协调 broker,负责 Broker Controller 的选举。
3.11、segment
4、kafka中几种概念
4.1、ISR
ISR指副本同步列表。 ISR列表是由Leader负责维护。ISR中的副本都是可用的副本(与leader数据一致的,不超过阈值差距的)
ISR (In-Sync Replicas)是Leader在Zookeeper(/brokers/topics/[topic]/partitions/[partition]/state)目录中动态维护基本保持同步的Replica列表,该列表中保存的是与Leader副本保持消息同步的所有副本对应的节点id。如果一个Follower宕机或者其落后情况超过任意参数replica.lag.time.max.ms(延迟时间)、replica.lag.max.messages(延迟条数,Kafka 0.10.x版本后移除)设置阈值,则该Follower副本节点将从ISR列表中剔除并存入OSR(Outof-Sync Replicas)列表。
ISR集合发生变更时,除了会记录到/brokers/topics/[topic]/partitions/[partition]/state路径中,还会记录到/isr_change_notification路径中,在该路径下会创建一个以isr_change开头的持久化顺序节点,并将变更信息存入其中,controller会注册该路径的watcher,该节点发生变更时,controller将会受到通知并执行元数据更新操作,更新完成后将删除/isr_change_notification路径下已经处理过的节点
频繁地触发 Watcher会影响 Kafka 控制器、 ZooKeeper 甚至其 他 broker 节 点的性能。为了避免这种情况 , Kafka 添加了限定 条件,当检测到分区的 ISR 集合发生变化时,还需要检查以下两个条件 :
1、上一次 ISR 集合发生变化距离现在己经超过5s。
2、上一次写入 ZooKeeper的时间距离现在已经超过 60s。
4.2、HW与LEO
HW俗称高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个offset之前的消息。
下图表示一个日志文件,这个日志文件中只有9条消息,第一条消息的offset(LogStartOffset)为0,最有一条消息的offset为8,offset为9的消息使用虚线表示的,代表下一条待写入的消息。日志文件的 HW 为6,表示消费者只能拉取offset在 0 到 5 之间的消息,offset为6的消息对消费者而言是不可见的
LEO
(Log End Offset),标识当前日志文件中下一条待写入的消息的offset。上图中offset为9的位置即为当前日志文件的 LEO,LEO 的大小相当于当前日志分区中最后一条消息的offset值加1.分区 ISR 集合中的每个副本都会维护自身的 LEO ,而 ISR
集合中最小的 LEO 即为分区的 HW,对消费者而言只能消费 HW 之前的消息。
HW值的含义为,当前位置在kafka集群中各个partition中数据是完全同步的,而LEO是各个分区自身当前最新的位置,这个最新位置可能因为每个分区写入速度不一致原因,每个分区的值会有差异,只有所有分区都完成写入,才会通知leader,leader才会去更新HW值
4.3、offset
offset为消息偏移量,指的是消息所在partition的位置,每条消息都有一个当前Partition下唯一的64字节的offset,它是相当于当前分区第一条消息的偏移量。
在 kafka 0.9 之前版本,offset 是由 zookeeper负责管理的。
4.4、offset commit
Consumer从broker中取一批消息写入buffer进行消费,在规定的时间内消费完消息后,会自动将其消费消息的offset提交给broker,以记录下哪些消息是消费过的。当然,若在时限内没有消费完毕,其是不会提交offset的。
Kafka对于offset的处理有两种提交方式:(1) 自动提交(默认的提交方式) (2) 手动提交(可以灵活地控制offset)
(1) 自动提交偏移量
Kafka中偏移量的自动提交是由参数enable_auto_commit和auto_commit_interval_ms控制的,当enable_auto_commit=True时,Kafka在消费的过程中会以频率为auto_commit_interval_ms向Kafka自带的topic(__consumer_offsets)进行偏移量提交,具体提交到哪个Partation是以算法:partation=hash(group_id)%50来计算的。
如:group_id=test_group_1,则partation=hash("test_group_1")%50=28
对于自动提交偏移量,如果auto_commit_interval_ms的值设置的过大,当消费者在自动提交偏移量之前异常退出,将导致kafka未提交偏移量,进而出现重复消费的问题,所以建议auto_commit_interval_ms的值越小越好。
(2) 手动提交偏移量
鉴于Kafka自动提交offset的不灵活性和不精确性(只能是按指定频率的提交),Kafka提供了手动提交offset策略。手动提交能对偏移量更加灵活精准地控制,以保证消息不被重复消费以及消息不被丢失。
对于手动提交offset主要有3种方式:1.同步提交 2.异步提交 3.异步+同步 组合的方式提交
4.5、Coordinator
Coordinator一般指的是运行在每个broker上的group Coordinator进程,用于管理Consumer Group中的各个成员,主要用于offset位移管理和Rebalance。一个Coordinator可以同时管理多个消费者组。
coordinator存储的信息
对于每个Consumer Group,Coordinator会存储以下信息:
1. 对每个存在的topic,可以有多个消费组group订阅同一个topic
2. 对每个Consumer Group,元数据如下:
1)订阅的topics列表
2)Consumer Group配置信息,包括session timeout等
3)组中每个Consumer的元数据。包括主机名,consumer id
4)每个正在消费的topic partition的当前offsets
5)Partition的ownership元数据,包括consumer消费的partitions映射关系
如何确定consumer group的coordinator
consumer group如何确定自己的coordinator是谁呢? 简单来说分为两步:
1、确定consumer group位移信息写入__consumers_offsets这个topic的哪个分区。具体计算公式:
__consumers_offsets partition# = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)
注意:groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默认是50个分区。
2、该分区leader所在的broker就是被选定的coordinator
4.6、Rebalance
当消费者组中的数量发生变化,或者topic中的partition数量发生了变化时,partition的所有权会在消费者间转移,即partition会重新分配,这个过程称为再均衡Rebalance。再均衡能够给消费者组及broker带来高性能、高可用性和伸缩,但在再均衡期间消费者是无法读取消息的,即整个broker集群有小一段时间是不可用的。因此要避免不必要的再均衡。
4.7、ACK
Kafka的ack机制,指的是producer的消息发送确认机制,这直接影响到Kafka集群的吞吐量和消息可靠性。而吞吐量和可靠性就像硬币的两面,两者不可兼得,只能平衡
ack有3个可选值,分别是1,0,-1
ack=1,简单来说就是,producer只要收到一个分区副本成功写入的通知就认为推送消息成功了。这里有一个地方需要注意,这个副本必须是leader副本。只有leader副本成功写入了,producer才会认为消息发送成功。
注意,ack的默认值就是1。这个默认值其实就是吞吐量与可靠性的一个折中方案。生产上我们可以根据实际情况进行调整,比如如果你要追求高吞吐量,那么就要放弃可靠性。
ack=0,简单来说就是,producer发送一次就不再发送了,不管是否发送成功。
ack=-1,简单来说就是,producer只有收到分区内所有副本的成功写入的通知才认为推送消息成功了。
5、kafka内部原理
5.1、broker工作原理
当broker.id=0的broker在启动时,会创建”/brokers/ids/0″临时节点,并把端口等信息写进去
从图中可以看出,kafka系统是由若干子系统构成。根据核心的功能模块,可以大体划分成以下几个子系统:
- 客户端连接请求处理服务
- 客户端指令处理服务
- 日志管理子系统
- 日志复制子系统
- API消息处理子系统
- broker管理子系统
从实现层次上来看,可以分为几层:
- 网络层:主要用来接收客户端请求,接收并解析客户端的命令,维护和客户端的连接等。
- API层:根据客户端的命令进行相应的操作。
- 日志子系统:负责管理数据和数据分区。
- 日志复制子系统:负责对数据的副本进行管理。
- Controller子系统:负责完成controller的操作。
5.2、controller工作原理
职责
拥有Controller身份的Broker,除了要像普通Broker那样对外提供消息的生产、消费、同步功能之外,还需多承担以下几份职责:
-
选举Leader和ISR
控制器从ZK的/brokers/topics加载一个topic所有分区的所有副本,从分区副本列表中选出一个作为该分区的leader,并将该分区对应所有副本置于ISR列表,其他分区类似;其他topic的所有分区也类似。
-
同步元数据信息包括broker和分区的元数据信息
控制器加载ZK的/brokers/ids以及上一个步骤得到的topic下各分区leader和ISR将这些元数据信息同步到集群每个broker。而且通过下面所阐述的监控机制当有broker或者分区发生变更时及时更新到集群保证集群每一台broker缓存的是最新元数据。
-
broker变化监听与处理
broker加入的监听和处理 控制器启动时就起一个监视器监视ZK/brokers/ids/子节点。当存在broker启动加入集群后都会在ZK/brokers/ids/增加一个子节点brokerId,控制器的监视器发现这种变化后,控制器开始执行broker加入的相关流程并更新元数据信息到集群。 broker崩溃的监听与处理 控制器启动时就起一个监视器监视ZK/brokers/ids/子节点。当一个broker崩溃时,该broker与ZK的会话失效导致ZK会删除该子节点,控制器的监视器发现这种变化后,控制器开始执行broker删除的相关流程并更新元数据信息到集群。
-
topic变化监听与处理
topic创建的监听与处理 控制器启动时就起一个监视器监视ZK/brokers/topics/子节点。当通过脚本或者请求创建一个topic后,该topic对应的所有分区及其副本都会写入该目录下的一个子节点。控制器的监视器发现这种变化后,控制器开始执行topic创建的相关流程包括leader选举和ISR并同步元数据信息到集群;且新增一个监视器监视ZK/brokers/topics/<新增topic子节点内容>防止该topic内容变化。 topic删除的监听与处理 控制器启动时就起一个监视器监视ZK/admin/delete_topics/子节点。当通过脚本或者请求删除一个topic后,该topic会写入该目录下的一个子节点。控制器的监视器发现这种变化后,控制器开始执行topic删除的相关流程包括通知该topic所有分区的所有副本停止运行;通知所有分区所有副本删除数据;删除ZK/admin/delete_topics/<待删除topic子节点>。
-
partition变化监听与处理
控制器启动时就起一个监视器监视ZK/admin/reassign_part/子节点。当通过脚本执行分区重分配后会在该目录增加一个子节点,子节点内容是按照一定格式构建的重分配方案,控制器的监视器发现这种变化后,控制器开始执行分区重分配相关流程如同步元数据信息。
-
broker优雅退出
相比较broker机器直接宕机或强制kill,通过脚本或kill -9 关闭一个broker我们称为broker优雅退出。即将关闭的broker向控制器发送退出请求后一直阻塞。 控制器接收到请求后,执行leader重选举和ISR后响应broker。broker接收后退出。 这个比较特殊,不依赖ZK,直接通过broker和控制器RPC通信即可完成
选举
Kafka使用公平竞选的方式来确定Controller,最先在ZooKeeper成功创建临时节点/controller的Broker会成为Controller,因为Broker不会同时启动,一般而言,Kafka集群中第一台启动的Broker会成为Controller,并将版本、自身Broker编号、时间戳写入ZooKeeper临时节点/controller。
其中version固定为1,brokerid表示成为Broker编号,timestamp表示成为Controller的时间戳。每个Broker启动的时候都会去尝试读取ZooKeeper临时节点/controller的brokerid,如果读取到brokerid不为-1,则表示已有Broker竞选成为Controller。此时,当前Broker会放弃竞选Controller,并对ZooKeeper临时节点/controller注册监听器,当Controller因为某些情况导致Kafka进程停止,甚至所在的机器宕机,并在规定的时间没有自行恢复服务,ZooKeeper通过Watch机制感知到并删除临时节点/controller,然后监听器会通知各个Broker进行Controller选举,第一个抢注ZooKeeper临时节点/controller成功的,成为新的Controller,这就是Controller Failover
Controller在选举成功后会拉取ZooKeeper中各个节点的信息来初始化ControllerContext,并管理这些ControllerContext,比如某个Topic增加若干个分区,Controller在负责创建这些分区的同时,还要更新ControllerContext,并且需要将这些变更信息同步到普通Broker
变更Controller,通常有以下几种方式,但是Controller转移,需要付出的代价也比较大,需要关闭原Controller各种监听器、定时任务和线程,并且新Controller上线同样开启以上资源。一般而言,除非Controller所在的机器IO、CPU、内存等资源不足,又或者频繁发生长时间Full GC,否则不应该人为变更Controller。
- 手动更改ZooKeeper临时/controller中brokerid,每个Broker会更新内存中的ActiveControllerId。
- 手动删除ZooKeeper临时节点/controller,触发一轮新的选举。
- 停止当前Controller Kafka进程,此时ZooKeeper会认为Controller所在的Broker宕机,ZooKeeper同样会删除临时节点/controller,触发一轮新的选举。
脑裂
随着服务的长时间运行,加上Controller对比普通Broker多承担一部分额外的工作,Controller不可避免发生Full GC,导致服务停止,如果Full GC时间很长,甚至超过Broker与ZooKeeper设置的会话时间,此时ZooKeeper会认为Controller下线,进行Controller Failover。
选举产生新的Controller之后,ZooKeeper会通知到各个Broker更新内存中的ActiveControllerId,原先发生Full GC的Controller因为停止服务收不到Controller更新通知,在Full GC过后,原先的Controller认为自身服务正常,只是假死了一段时间,甚至还认为自己是Controller,此时Kafka集群就会有两个Controller,这就是Controller脑裂问题。
为了解决Controller脑裂问题,ZooKeeper中还有一个与Controller有关的持久节点/controller_epoch,存放的是一个整形值,用于记录Controller发生变更的次数,即记录当前是第几代Controller,也称为纪元编号。Controller和普通Broker的请求都会携带controller_epoch,如果请求的controller_epoch小于ActiveControllerId,则认为这个请求是已经过期的Controller发送的无效请求。如果请求的controller_epoch大于内存中ActiveControllerId,那么说明已经有新的Controller产生,当前Broker需更新内存中的ActiveControllerId。Kafka通过controller_epoch来保证Controller的唯一性,同时防止Controller脑裂。
最佳实践
由于Controller支持Failover,且集群中任意一台Broker均可能成为Controller,因此我们无须对Controller做额外的配置或者调优。唯一需要注意的是,对于消息敏感程度非常高的集群(例如用于下单、结算、清算等业务的集群)需要重启Broker(比如Broker升级,需要依次重启),需要在重启Broker之前确认身份,假如当前Broker是Controller,需要手动更改ZooKeeper临时/controller中brokerid,将Controller转移到其他Broker上再进行重启。因为ZooKeeper感知Controller下线,到新的Controller上线这个过程,少则几秒,多则十几秒,在这段时间内,集群没有Controller,会存在消息的生产、失败情况,提前转移Controller,能在最大程度保证集群高可用。
状态机
5.3、partition leader选举
当leader宕机后,broker controller会从ISR中挑选一个follower成为新的leader。如果ISR中没有其他副本怎么办?可以通过unclean.leader.election.enable的值来设置leader选举范围。
false
必须等到ISR列表中所有的副本都活过来才进行新的选举。该策略可靠性有保证,但可用性低。
true
在ISR列表中没有副本的情况下,可以选择任意一个没有宕机的主机作为新的leader,该策略可用性高,但可靠性没有保证。
leader选举过程
1、KafkaController
实例化ZookeeperLeaderElector类时,分别设置了两个关键的回调函数,即onControllerFailover和onControllerResignation;
2、ZookeeperLeaderElector
实现topic partition Leader节点选举功能,但是它并不会处理“broker与Zookeeper系统之间出现的会话超时”这种情况,它主要负责创建元数据存储路径、实例化变更监听器等,并通过订阅数据变更监听器来实时监听数据的变化,进而开始执行选举Leader的逻辑;
3、LeaderChangeListener
如果节点数据发送变化,则Kafka系统中的其他代理节点可能已经成为Leader,接着Kafka控制器会调用
onResigningAsLeader函数。当Kafka代理节点宕机或者被人为误删除时,则处于该节点上的Leader会被重新选举,通过调用onResigningAsLeader函数重新选择其他正常运行的代理节点成为新的Leader;
4、SessionExpirationListener
当Kafka系统的代理节点和Zookeeper系统建立连接后,SessionExpirationListener中的handleNewSession函数会被调用,对于Zookeeper系统中会话过期的连接,会先进行一次判断。
5.4、zookeeper中主要节点
上图中缺少/brokers/topics/_consumer_offsets路径,因为0.9版本以上的offset管理是新建了一个__consumer_offset topic,该路径下的就是该topic
/admin
admin/delete_topics 删除的主题
/isr_change_notification
记录isr变动情况,由leader维护该数据,controller监听该路径,获取更新
/cluster
集群信息
/controller
存储控制器broker信息,数据格式为:
{"version":1,"brokerid":0,"timestamp":"1524191485779"}
version: 版本编号默认为1,
brokerid: kafka集群中broker唯一编号,
timestamp: kafka broker中央控制器变更时的时间戳
/controller_epoch
此值为一个数字,kafka集群中第一个broker第一次启动时为1,以后只要集群中center controller中央控制器所在broker变更或挂掉,就会重新选举新的center controller,每次center controller变更controller_epoch值就会 + 1;用于防止broker脑裂,无视epoch更低的请求
/brokers/topics
当一个broker启动时,会向zookeeper注册自己持有的topic和partitions信息
如上图所示,层级结构解释说明:
test:topic名称
partitions:分区列表,当前只有一个分区”0“
state:Kafka的ISR的管理最终都会反馈到Zookeeper节点上。目前有两个地方会对该节点进行维护:
1、Controller来维护:Kafka集群中的其中一个Broker会被选举为Controller,主要负责Partition管理和副本状态管理,也会执行类似于重分配partition之类的管理任务。在符合某些特定条件下,Controller下的LeaderSelector会选举新的leader,ISR和新的leader_epoch及controller_epoch写入Zookeeper的相关节点中。同时发起LeaderAndIsrRequest通知所有的replicas。
2、leader来维护:leader有单独的线程定期检测ISR中follower是否脱离ISR, 如果发现ISR变化,则会将新的ISR的信息返回到Zookeeper的相关节点中。
0.9版本之后特殊的topic:/brokers/topics/_consumer_offsets
新版kafka(0.9及以后)将消费者的位置信息(offset)保存在kafka内部的topic中,就是这里的__consumer_offsets topic,并且提供了kafka-consumer-groups.sh供用户查看消费者信息。
/brokers/ids
每个broker的配置文件中都需要指定一个数字类型的id(全局不可重复),此节点为临时znode(EPHEMERAL),存放的内容格式如下
{
"listener_security_protocol_map": {
"PLAINTEXT": "PLAINTEXT"
},
"endpoints": ["PLAINTEXT://hostname:port"],
"jmx_port": -1,
"host": "host ip",
"timestamp": "1524191485634",
"port": port,
"version": 4
}
5.5、Coordinator工作原理
如4.5章节所说,Coordinator用于管理Consumer Group中的各个成员,每个broker都有一个coordinator实例,用于管理多个消费者组,主要用于位移(consumer offset)管理和rebalance
coordinator存储的信息
对于每个Consumer Group,Coordinator会存储以下信息:
1. 对每个存在的topic,可以有多个消费组group订阅同一个topic
2. 对每个Consumer Group,元数据如下:
1)订阅的topics列表
2)Consumer Group配置信息,包括session timeout等
3)组中每个Consumer的元数据。包括主机名,consumer id
4)每个正在消费的topic partition的当前offsets
5)Partition的ownership元数据,包括consumer消费的partitions映射关系
如何确定consumer group的coordinator
consumer group如何确定自己的coordinator是谁呢? 简单来说分为两步:
1、确定consumer group位移信息写入__consumers_offsets这个topic的哪个分区。具体计算公式:
__consumers_offsets partition# = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)
注意:groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默认是50个分区。
2、该分区leader所在的broker就是被选定的coordinator
__consumers_offsets topic内不同分区会存储不同consumer group对应的offset信息
offset位移管理
消费者在消费的过程中需要记录自己消费了多少数据,即消费位置信息。在Kafka中这个位置信息有个专门的术语:位移(offset)。
每个consumer group管理自己的位移信息,那么只需要简单的一个整数表示位置就够了;同时可以引入checkpoint机制定期持久化,简化了应答机制的实现。Kafka consumer默认是定期帮你自动提交位移的(enable.auto.commit = true),你当然可以选择手动提交位移实现自己控制。另外kafka会定期把group消费情况保存起来,做成一个offset map,如下图所示:
offset管理
老版本(0.9版本及之前)的位移是提交到zookeeper中的,目录结构是:/consumers/<group.id>/offsets//,但是zookeeper其实并不适合进行大批量的读写操作,尤其是写操作。因此kafka提供了另一种解决方案:增加topic(____consumer_offsets),将offset信息写入这个topic,摆脱对zookeeper的依赖(指保存offset这件事情)。__consumer_offsets中的消息保存了每个consumer group某一时刻提交的offset信息
rebalance
rebalance本质上是一种协议,规定了一个consumer group下的所有consumer如何达成一致来分配订阅topic的每个分区。比如某个group下有20个consumer,它订阅了一个具有100个分区的topic。正常情况下,Kafka平均会为每个consumer分配5个分区。这个分配的过程就叫rebalance。Kafka新版本consumer默认提供了两种分配策略:range和round-robin。
rebalance的触发条件有三种:
1、组成员发生变更(新consumer加入组、已有consumer主动离开组或已有consumer崩溃了——这两者的区别后面会谈到)
2、订阅主题数发生变更——这当然是可能的,如果你使用了正则表达式的方式进行订阅,那么新建匹配正则表达式的topic就会触发rebalance
3、订阅主题的分区数发生变更
每次rebalance之后,该group的版本将会更新,防止旧版本的group成员异常提交offset
5.6、分区分配策略
如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。
如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。
如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致Kafka集群数据不均衡。
5.7、副本复制/同步策略
kafka 使用多副本来保证消息不丢失,多副本就涉及到kafka的复制机制,在一个超大规模的集群中,时不时地这个点磁盘坏了,那个点cpu负载高了,出现各种各样的问题,多个副本之间的复制,如果想完全自动化容错,就要做一些考量和取舍了。我们举个例子说明下运维中面对的复杂性,我们都知道 kafka 有个 ISR集合,我先说明下这个概念:
kafka不是完全同步,也不是完全异步,是一种ISR机制
1. leader会维护一个与其基本保持同步的Replica列表,该列表称为ISR(in-sync Replica),每个Partition都会有一个ISR,而且是由leader动态维护
2. 如果一个follower比一个leader落后太多,或者超过一定时间未发起数据复制请求,则leader将其重ISR中移除
3. 当ISR中所有Replica都向Leader发送ACK时,leader才commit,这时候producer才能认为一个请求中的消息都commit了(没有开启ack时,producer不会等待leader反馈commit结果)。
kafka的复制机制
kafka 每个分区都是由顺序追加的不可变的消息序列组成,每条消息都一个唯一的offset 来标记位置。
kafka中的副本机制是以分区粒度进行复制的,我们在kafka中创建 topic的时候,都可以设置一个复制因子,这个复制因子决定着分区副本的个数,如果leader 挂掉了,kafka 会把分区主节点failover到其他副本节点,这样就能保证这个分区的消息是可用的。leader节点负责接收producer 打过来的消息,其他副本节点(follower)从主节点上拷贝消息
kakfa 日志复制算法提供的保证是当一条消息在 producer 端认为已经 committed的之后,如果leader 节点挂掉了,其他节点被选举成为了 leader 节点后,这条消息同样是可以被消费到的。这样的话,leader 选举的时候,只能从 ISR集合中选举,集合中的每个点都必须是和leader消息同步的,也就是没有延迟,分区的leader 维护ISR 集合列表,如果某个点落后太多,就从 ISR集合中踢出去。
一个副本怎么样才算跟得上leader副本
一个副本不能 “caught up” leader 节点,就有可能被从 ISR集合中踢出去,我们举个例子来说明,什么才是真正的 “caught up” —— 跟leader节点消息同步。
kafka 中的一个单分区的 topic复制因子为 3 ,分区分布和 leader 和 follower 如下图,现在broker 2和3 是 follower 而且都在 ISR 集合中。我们设置 replica.lag.max.messages 为4,只要 follower 只要不落后leader 大于3条消息,就认为是跟得上leader的节点,就不会被踢出去, 设置 replica.lag.time.max.ms 为 500ms, 意味着只要 follower 在每 500ms内发送fetch请求,就不会被认为已经dead ,不会从ISR集合中踢出去。
现在 producer 发送一条消息,offset 为3, 这时候 broker 3 发生了 GC, 入下图:
因为 broker 3 现在在 ISR 集合中, 所以要么 broker 3 拉取同步上这条 offset 为3 的消息,要么 3 被从 ISR集合中踢出去,不然这条消息就不会 committed, 因为 replica.lag.max.messages=4 为4, broker 3 只落后一条消息,不会从ISR集合中踢出去, broker 3 如果这时候 GC 100ms, GC 结束,然后拉取到 offset 为3的消息,就再次跟 leader 保持完全同步,整个过程一直在 ISR集合中,如下图:
什么时候一个副本才会从ISR集合中踢出去
一个副本被踢出 ISR集合的几种原因:
- 一个副本在一段时间内都没有跟得上 leader 节点,也就是跟leader节点的差距大于 replica.lag.max.messages, 通常情况是 IO性能跟不上,或者CPU 负载太高,导致 broker 在磁盘上追加消息的速度低于接收leader 消息的速度。
- 一个 broker 在很长时间内(大于 replica.lag.time.max.ms )都没有向 leader 发送fetch 请求, 可能是因为 broker 发生了 full GC, 或者因为别的原因挂掉了。
- 一个新 的 broker 节点,比如同一个 broker id, 磁盘坏掉,新换了一台机器,或者一个分区 reassign 到一个新的broker 节点上,都会从分区leader 上现存的最老的消息开始同步。
所以说 kafka 0.8 版本后设置了两个参数 , replica.lag.max.messages 用来识别性能一直很慢的节点, replica.lag.time.max.ms 用来识别卡住的节点
一个节点在什么情况下真正处于落后状态
从上面的情况来看,两个参数看似已经足够了,如果一个副本超过 replica.lag.time.max.ms 还没有发送fetch同步请求, 可以认为这个副本节点卡住了,然后踢出去,但是还有一种比较特殊的情况没有考虑到,我们上文中设置 replica.lag.max.messages 为4,之所以设置为 4, 是我们已经知道 producer 每次请求打过来的消息数都在 4 以下,如果我们的参数是作用于多个 topic 的情况,那么这个 producer 最大打过来的消息数目就不好估计了,或者说在经常出现流量抖动的情况下,就会出现一个什么情况呢,我们还是使用例子说明:
如果我们的 topic的 producer 因为流量抖动打过来一个 包含 4条消息的请求,我们设置的 replica.lag.max.messages 还是为4, 这个时候,所有的 follower 都会因为超出落后条数被踢出 ISR集合:
然后,因为 follower 是正常的,所以下一次 fetch 请求就会又追上 leader, 这时候就会再次加入 ISR 集合,如果经常性的抖动,就会不断的移入移出ISR集合,会造成令人头疼的 告警轰炸。
这里的核心问题是,在海量的 topic 情况下,或者经常性的流量抖动情况下,我们不能对 topic 的producer 每次打过来的消息数目做任何假设,所以就不太好定出来一个 合适的
eplica.lag.max.messages
值
其实只有两种情况是异常的,一种就是卡住,另外一种是follower 性能慢,如果我们只根据 follower 落后 leader 多少来判断是否应该把 follower 提出ISR集合,就必须要对流量进行预测估计,怎么才能避免这种不靠谱的估计呢,kafka 给出的方案是这样的,对 replica.lag.time.max.ms 这个配置的含义做了增强,和之前一样,如果 follower 卡住超过这个时间不发送fetch请求, 会被踢出ISR集合,新的增强逻辑是,在 follower 落后 leader 超过 eplica.lag.max.messages 条消息的时候,不会立马踢出ISR 集合,而是持续落后超过 replica.lag.time.max.ms 时间,才会被踢出,这样就能避免流量抖动造成的运维问题,因为follower 在下一次fetch的时候就会跟上leader, 这样就也不用对 topic 的写入速度做任何的估计喽
在0.9.0.0版本之后,只有一个参数:replica.lag.time.max.ms来判定该副本是否应该在ISR集合中,这个参数默认值为10s。意思是如果一个follower副本响应leader副本的时间超过10s,kafka会认为这个副本走远了从同步副本列表移除。
5.9、kafka高性能的原因
Kafka虽然是基于磁盘做的数据存储,但却具有高性能、高吞吐、低延时的特点,其吞吐量动辄几万、几十上百万。其中的原因需要从kafka内部几个重要原理说起
顺序写
当broker接收到producer发送过来的消息时,需要根据消息的主题和分区信息,将该消息写入到该分区当前最后的segment文件中,文件的写入方式是追加写。由于是对segment文件追加写,故实现了对磁盘文件的顺序写,避免磁盘随机写时的磁盘寻道的开销,同时由于是追加写,故写入速度与磁盘文件大小无关
磁盘的顺序读写性能很高,一般而言要高出磁盘随机读写三个数量级,一些情况下磁盘顺序读写性能甚至要高于内存随机读写
这里给出著名学术期刊 ACM Queue 上的性能对比图
磁盘的顺序读写是磁盘使用模式中最有规律的,并且操作系统也对这种模式做了大量优化,Kafka就是使用了磁盘顺序读写来提升的性能。Kafka的message是不断追加到本地磁盘文件末尾的,而不是随机的写入,这使得Kafka写入吞吐量得到了显著提升 。
上图就展示了Kafka是如何写入数据的, 每一个Partition其实都是一个文件 ,收到消息后Kafka会把数据插入到文件末尾(虚框部分)。
页缓存
kafka服务端在收到消息后,消息是会以日志文件形式被写入磁盘的,为了提高性能,kafka使用了操作系统页缓存(page cache) 来对磁盘log文件进行读写,以此用来减少对磁盘I/O的操作,把对磁盘的访问变为对内存的访问
虽然消息写入是磁盘顺序写入,没有磁盘寻道的开销,但是如果针对每条消息都执行一次磁盘写入,则也会造成大量的磁盘IO,影响性能。所以在消息写入方面,broker基于MMAP技术(即内存映射文件),将消息先写入到操作系统的页缓存中,由页缓存直接映射到磁盘文件,不需要在用户空间和内核空间直接拷贝消息,所以也可以认为消息传输是发送在内存中的。
Kafka作为一个支持大数据量写入写出的消息队列,由于是基于Scala和Java实现的,而Scala和Java均需要在JVM上运行,所以如果是基于内存的方式,即JVM的堆来进行数据存储则需要开辟很大的堆来支持数据读写,从而会导致GC频繁影响性能。考虑到这些因素,kafka是使用磁盘而不是kafka服务器broker进程内存来进行数据存储,并且基于磁盘顺序读写和MMAP技术来实现高性能。
消息读取:MMAP零拷贝
消费者负责向broker发送从某个分区读取消费消息的请求,broker接收到消费者数据读取请求之后,根据消费者提供主题,分区与分区offset信息,找到给定的分区index和segment文件,然后通过二分查找定位到给定的数据记录,最后通过socket传输给消费者。broker在从segment文件读取消息然后通过socket传输给消费者时,也是基于MMAP技术实现了零拷贝读取。
操作系统提供了sendfile系统调用来支持MMAP机制,即应用只需指定需要传输的磁盘文件句柄,然后通过sendfile系统实现磁盘文件读取和从socket传输出去,其中磁盘文件的读取和从socket传输出去都是通过sendfile系统调用在内核完成的,不需要在内核空间和用户空间进行数据拷贝,具体过程如下:
1、应用指定需要传输的文件句柄和调用sendfile系统调用(第一次系统调用);
2、操作系统在内核读取磁盘文件拷贝到页缓存(第一次内存拷贝);
3、操作系统在内核将页缓存内容拷贝到网卡硬件缓存(第二次内存拷贝)。
4、故整个过程涉及到一次sendfile系统调用,在内核态完成两次拷贝,在内核和用户空间之间不需要进行数据拷贝
同时由于操作系统将磁盘文件内容加载到了内核页缓存,故消费者针对该磁盘文件的多次请求可以重复使用,避免重复在磁盘和内存之间进行数据拷贝
分区分段+索引
Kafka的message是按topic分类存储的,topic中的数据又是按照一个一个的partition即分区存储到不同broker节点。每个partition对应了操作系统上的一个文件夹(目录命名规则:<topic_name>_<partition_id>),partition实际上又是按照segment分段存储的。这也非常符合分布式系统分区分桶的设计思想。
通过这种分区分段的设计,Kafka的message消息实际上是分布式存储在一个一个小的segment中的,每次文件操作也是直接操作的segment。为了进一步的查询优化,Kafka又默认为分段后的数据文件建立了索引文件,就是文件系统上的.index文件。这种分区分段+索引的设计,不仅提升了数据读取的效率,同时也提高了数据操作的并行度。
批量读写
Kafka数据读写也是批量的而不是单条的。除了利用底层的技术外,Kafka还在应用程序层面提供了一些手段来提升性能。最明显的就是使用批次。在向Kafka写入数据时,可以启用批次写入,这样可以避免在网络上频繁传输单个消息带来的延迟和带宽开销。假设网络带宽为10MB/S,一次性传输10MB的消息比传输1KB的消息10000万次显然要快得多。
批量压缩
在很多情况下,系统的瓶颈不是CPU或磁盘,而是网络IO,对于需要在广域网上的数据中心之间发送消息的数据流水线尤其如此。进行数据压缩会消耗少量的CPU资源,不过对于kafka而言,网络IO更应该需要考虑。
如果每个消息都压缩,但是压缩率相对很低,所以Kafka使用了批量压缩,即将多个消息一起压缩而不是单个消息压缩
Kafka允许使用递归的消息集合,批量的消息可以通过压缩的形式传输并且在日志中也可以保持压缩格式,直到被消费者解压缩
Kafka支持多种压缩协议,包括Gzip和Snappy压缩协议
总结
Kafka速度的秘诀在于,它把所有的消息都变成一个批量的文件,并且进行合理的批量压缩,减少网络IO损耗,通过mmap提高I/O速度,写入数据的时候由于单个Partion是末尾添加所以速度最优;读取数据的时候配合sendfile直接暴力输出
6、消息生产与消费
6.1、生产者消息生产步骤
1)producer长连接所有broker
2)producer端在数据积累到阈值时,统一发送数据到topic中的partition leader中(不管数据集涉及到多少topic多少分区,都是统一发送一次)
3)在 kafka内部,leader会先写入页缓存,设置自身LEO,并通知ISR的followers;再由操作系统异步顺序写入该Topic partition对应的分区日志文件,完成数据落盘,
4)ISR中的followers从leader中pull消息, 写入本地log(page cache),设置自身LEO,并向leader发送ack
5)leader收到所有ISR中的followers的ack后, 增加HW并向producer发送ack, 表示消息写入成功
在生产者生产消息时有两种情况,分为开启ack与关闭ack的情况,开启ack时,partition leader需等待follower反馈ack后,才通知生产者写入成功,关闭ack时,partition leader自己写入完成后,直接通知生产者写入成功;
6.2、消息路由策略
在通过 kafka api 方式发布消息时(例如JAVA KafkaProducer),生产者是以 Record 为消息进行发布的。Record 中包含 key 与 value,value 才是我们真正的消息本身,而 key 用于路由消息所要存放的 Partition。消息 要写入到哪个 Partition 并不是随机的,而是有路由策略的。
1. 若指定了 partition,则直接写入到指定的 partition;
2. 若未指定 partition 但指定了 key,则通过对 key 的 hash 值与 partition 数量取模,该取模
结果就是要选出的 partition 索引;
3. 若 partition 和 key 都未指定,则使用轮询算法选出一个 partition。
6.3、消费者数据消费步骤
1. consumer向broker提交连接请求,其所连接上的broker都会向其发送broker controller的通信URL,即配置文件中的listeners地址;
2. 当consumer指定了要消费的topic后,会向broker controller发送消费请求;
3. broker controller会为consumer分配一个或几个partition leader,并将该partition的当前offset发送给consumer;
4. consumer会按照broker controller分配的partition对其中的消息进行消费;
5. 当consumer消费完该条消息后,consumer会向broker发送一个消息已经被消费反馈,即该消息的offset;
6. 在broker接收到consumer的offset后,会更新相应的__consumer_offset中;
7. 以上过程会一直重复,直到消费者停止请求消费;
8. Consumer可以重置offset,从而可以灵活消费存储在broker上的消息。
6.4、重复写重复读
消息重复和丢失是kafka中很常见的问题,主要发生在以下三个阶段:
1. 生产者阶段
2. broke阶段
3. 消费者阶段
6.4.1、生产者阶段重复场景:生产发送的消息没有收到正确的broker响应,导致生产者重试
生产者发出一条消息,broker落盘以后因为网络等种种原因发送端得到一个发送失败的响应或者网络中断,然后生产者收到一个可恢复的Exception重试消息导致消息重复。
如下图是一个生产者数据发送说明图
具体步骤说明如下
1. new KafkaProducer()后创建一个后台线程KafkaThread扫描RecordAccumulator中是否有消
息;
2. 调用KafkaProducer.send()发送消息,实际上只是把消息保存到RecordAccumulator中;
3. 后台线程KafkaThread扫描到RecordAccumulator中有消息后,将消息发送到kafka集群;
4. 如果发送成功,那么返回成功;
5. 如果发送失败,那么判断是否允许重试。如果不允许重试,那么返回失败的结果;如果允许重
试,把消息再保存到RecordAccumulator中,等待后台线程KafkaThread扫描再次发送
关键在上述第5点,如果生产者客户端收到了可重试的异常,那么就会把消息重新缓存到RecordAccumulator中,等待下次重新发送
异常是RetriableException类型或者TransactionManager允许重试;RetriableException类继承关系如下:
这种发送重试机制,有可能会影响消息顺序
如果设置 max.in.flight.requests.per.connection 大于1(默认5,单个连接上发送的未确认。请求的最大数量,表示上一个发出的请求没有确认下一个请求又发出了)。可能会改变记录的顺序,因为如果将两个batch发送到单个分区,第一个batch处理失败并重试,但是第二个batch处理成功,那么第二个batch处理中的记录可能先出现被消费。
设置 max.in.flight.requests.per.connection 为1,可能会影响吞吐量,可以解决单个生产者发送顺序问题。如果多个生产者,生产者1先发送一个请求,生产者2后发送请求,此时生产者1返回可恢复异常,重试一定次数成功了。虽然生产者1先发送消息,但生产者2发送的消息会被先消
解决方案:设置确认机制
设置ack=0
即生产者发送消息(每条消息发送会根据指定的partition分区规则发送到指定broker中),不管broker(对于消息的接收与保存都是由各partition中的leader处理)有没有收到消息就不再发送(不管成功不成功)。如果这条消息在写入到磁盘之前挂掉了,那么这条消息也就丢失了。此时该集群的延迟性最小,但数据可靠性很差
设置ack=1
生产者发送消息到指定partition的leader中,leader收到消息并保存成功后告知生产者该条数据以保存成功
该策略保证了producer收到反馈时数据是写入了leader的page cache中了的,但是此时若leader挂掉,有可能数据丢失
备注:kafka从ISR列表中选取另外的节点作为leader(此时原leader中还没把消息更新到新leader中),所有leader中并没有这条数据,导致消费者无法消费数据,对于应用层面来说就是丢失数据
设置ack=-1
生产者发送消息指定partition的leader中,leader收到消息并转发到所有到ISR中,只有当所有ISR中所有replica确定写入数据成功后,leader才会返回到生产者告知消息保存成功
6、kafka生产者JAVA源码解读
详解另一篇文章 《kafka源码解读系列一:深入生产者核心》
7、回答开篇的3个问题
7.1、kafka集群挂死1台,为什么会造成不可用?kafka的高可用呢?
当前生产上的topic在创建时,设置了16个分区,而每个分区就1个副本,也就是说该topic并没有对分区进行冗余,当broker宕机(共3台broker),会导致1/3的分区不可用,直接导致producer在发送数据时发送失败,消息一直积压在客户端(注意:还有2/3的分区是可用的)
7.2、kafka集群不可用后,为什么会堵住客户端线程,不是号称异步发送吗?
这就要从producer源码讲起,如下图
调用producer.send方法后,消息将会被暂存在RecordAccumulator对象中,由KafkaThread循环按批次发送,由于有1/3的分区不可用,经过一段时间运行后,RecordAccumulator对象的缓存达到瓶颈,此时再调用producer.send方法,将会导致线程阻塞,因此直接影响了用户线程不释放;所以kafka的异步发送是在RecrodAccumulator对象缓存不满的情况下来说的,如果满了,为了防止数据丢失,是会直接对发送线程进行阻塞的
缓存阻塞仅发生下0.8版本之前,0.9及之后的版本加入了消息缓存超时机制,超时缓存的消息对象会通过回调函数通知客户端,同时超时后的缓存消息对象分配的空间将被回收
7.3、如果kafka集群不可用后,客户端该如何保证不影响用户线程?
在0.8版本的api中,由于存在缓存阻塞的问题,因此如果要保障完全与用户线程互不影响,有两种方法:
1、每次发送都起一个线程:这种方法能够保障线程堵塞不会影响用户线程,但是有一个致命问题,就是会导致线程无限增加,最后会导致内存溢出
2、对kafka可用性做判断,当出现不可用时,不再调用producer.send
第二种方法的关键是,需要发送的消息是由用户线程异步缓存在内存中的,不再调用producer.send方法后,仅仅是内存中的缓存数据不再消费了,不影响用户线程
无疑,第二种方法是更好的解决方案,但是判断kafka可用性又成了一个难题
8、结论:如何判断kafka服务端的可用性
详见我的另一篇文章把《kafka服务端的可用性判断方案》
9、其他
9.1、为什么要用kafka
- 缓冲和削峰:上游数据时有突发流量,下游可能扛不住,或者下游没有足够多的机器来保证冗余,kafka在中间可以起到一个缓冲的作用,把消息暂存在kafka中,下游服务就可以按照自己的节奏进行慢慢处理。
- 解耦和扩展性:项目开始的时候,并不能确定具体需求。消息队列可以作为一个接口层,解耦重要的业务流程。只需要遵守约定,针对数据编程即可获取扩展能力。
- 冗余:可以采用一对多的方式,一个生产者发布消息,可以被多个订阅topic的服务消费到,供多个毫无关联的业务使用。
- 健壮性:消息队列可以堆积请求,所以消费端业务即使短时间死掉,也不会影响主要业务的正常进行。
- 异步通信:很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
9.2、kafka特性
1.高吞吐量:可以满足每秒百万级别消息的生产和消费。
2.持久性:有一套完善的消息存储机制,确保数据高效安全且持久化。
3.分布式:基于分布式的扩展;Kafka的数据都会复制到几台服务器上,当某台故障失效时,生产者和消费者转而使用其它的Kafka。
9.3、kafka4个核心api
- Producer API:生产者API允许应用程序将一组记录发布到一个或多个Kafka Topic中。
- Consumer AIP:消费者API允许应用程序订阅一个或多个Topic,并处理向他们传输的记录流。
- Streams API:流API允许应用程序充当流处理器,从一个或者多个Topic中消费输入流,并将输出流生成为一个或多个输出主题,从而将输入流有效地转换为输出流。
- Connector API:连接器API允许构建和运行可重用的生产者或消费者,这些生产者或消费者将Kafka Topic连接到现有的应用程序或数据系统。例如:连接到关系数据库的连接器可能会捕获对表的每次更改。
更多推荐
所有评论(0)