文章目录

kafka高性能原因

https://zhuanlan.zhihu.com/p/671827061

kafka 高性能主要体现在

架构层面

  • Partition级别并行:Broker(挂很多硬盘Disk,硬盘并行)、Consumer端(不同Partition支持rebalance)
  • ISR (会出现数据丢失风险,不像Raft这样过半副本成功)
  • 高可用采用的是分区副本

IO层面

  • Batch读写
  • 磁盘顺序io
  • DMA 零拷贝机制,
  • 分区中有多个.log和.index 文件,index 采用稀疏索引快速检索到offset 对应的组里位置所在的即将要消费的消息
  • 高并发主要采用的是nio
  • 利用pagecache
  • 压缩机制

在这里插入图片描述

Kafka

设计目的

Kafka 是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下:

  • 以时间复杂度为 O(1) 的方式提供消息持久化能力,即使对 TB 级以上数据也能保证常数时间复杂度的访问性能;
  • 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒 100K 条以上消息的传输;
  • 支持 Kafka Server 间的消息分区,及分布式消费,同时保证每个 Partition 内的消息顺序传输;
  • 同时支持离线数据处理和实时数据处理;
  • Scale out:支持在线水平扩展;

为何使用消息系统

  • 解耦
    消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
    而基于消息发布订阅的机制,可以联动多个业务下游子系统,能够不侵入的情况下分步编排和开发,来保证数据一致性。

  • 冗余
    有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。

  • 扩展性
    因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。

  • 灵活性 & 峰值处理能力
    在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

  • 可恢复性
    系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

  • 顺序保证
    在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。Kafka 保证一个 Partition 内的消息的有序性。

  • 缓冲
    在任何重要的系统中,都会有需要不同的处理时间的元素。消息队列通过一个缓冲层来帮助任务最高效率的执行———写入队列的处理会尽可能的快速。该缓冲有助于控制和优化数据流经过系统的速度。

  • 异步通讯
    很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

概念

Kafka部分名词解释如下:

  • Broker:消息中间件处理结点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。
  • Topic:一类消息,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发。
  • Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。
  • Segment:partition物理上由多个segment组成,下面2.2和2.3有详细说明。
    offset:每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到- partition中。partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息.

1.kafka Partition分区机制

首先,从数据组织形式来说,kafka有三层形式,kafka有多个主题,每个主题有多个分区,每个分区又有多条消息。

而每个分区可以分布到不同的机器上,这样一来,从服务端来说,分区可以实现高伸缩性(集群的高可用 ),以及负载均衡,动态调节的能力。

在这里插入图片描述

1.0 为什么要设计分区

假如不进行分区的话就如同 MySQL 单表存储一样,发消息就会被集中存储,这样会导致某台 Kafka 服务器存储 Topic 消息过多,如果在写消息压力很大的情况下,最终会导致这台 Kafka 服务器吞吐量出现瓶颈, 因此 Kafka 设计了分区的概念,同时也带来了「负载均衡」、「横向扩展」的能力,如下图所示:。

在这里插入图片描述

  • 1)负载均衡:发送消息时可以根据分区的数量进行数据均匀分布,使其落在不同的分区上, 这样可以提高并发写性能;同时消费的时候多个订阅者可以从一个或者多个分区中同时消费数据,以支撑海量数据处理能力,提高读消息性能。

  • 2)横向扩展:可以将一个 Topic 分成了多个 Partition,将不同的 Partition 尽可能的部署在不同的物理节点上,这样扩展起来非常方便,另外一个消费者可以消费多个分区中的数据,但是这样还是不能够充分的发挥横向扩展,这时候消费者组就出现了,我们用消费者组,来消费整个的 Topic,一个消费者消费 Topic 中的一个分区。

1.1 Kafka 如何合理设置分区数,越多越好吗?

一、Kafka 如何合理设置分区数

首先我们要了解在 Partition 级别上达到负载均衡是实现高吞吐量的关键,合适的 Partition 数量可以达到并行读写和负载均衡的目的,需要根据每个分区的生产者和消费者的目标吞吐量进行估计。

此时我们可以遵循一定的步骤来计算确定分区数:

1)首先根据某个 Topic 当前接收的数据量等经验来确定分区的初始值。

2)然后针对这个 Topic,进行测试 Producer 端吞吐量和 Consumer 端的吞吐量。

3)测试的结果, 假设此时他们的值分别是 Tp「Producer 端吞吐量」、Tc「负Consumer 端吞吐量」,总的目标吞吐量是 Tt, 单位是 MB/s, 那么结果 numPartition = Tt / max (Tp, Tc)。

4)特殊说明:测试 Tp 通常是很容易的,因为它的逻辑非常简单,就是直接发送消息到 Kafka 就好了。而测试 Tc 通常与应用消费消息后进行其他什么处理有关,相对复杂一些。

broker && partition 配置最大化集群吞吐

在这里插入图片描述

在这里插入图片描述

总之,通常情况下 Kafka 集群中越多的 Partition 会带来越高的吞吐量。但是,如果 Kafka 集群中 Partition 总量过大或者单个 Broker 节点 Partition 过多,都可能会对系统的可用性和消息延迟带来潜在的负面影响,需要引起我们的重视。

1.2 分区写入策略

所谓分区写入策略,即是生产者将数据写入到kafka主题后,kafka如何将数据分配到不同分区中的策略。

常见的有三种策略,轮询策略,随机策略,和按键保存策略。其中轮询策略是默认的分区策略(均衡性)。
在这里插入图片描述

按键保存策略
按键保存策略,就是当生产者发送数据的时候,可以指定一个key,计算这个key的hashCode值,按照hashCode的值对不同消息进行存储
至于要如何实现,只要让生产者发送的时候指定key就行。

另外,多个订阅者可以从一个或者多个分区中同时消费数据,以支撑海量数据处理能力

  • 1、分区类似hadoop分布式文件系统,比如分布式的文件块,如果某台机器宕机数据丢失,也是丢失一部分,不会出现整个文件全军覆没。另外通过partition级别的冗余存(replication.factor)来保证partition级别的安全性。
  • 2、kafka采用partition存储方式,也提高访问吞吐率,有负载均衡的效果。
  • 3、最主要的是通过分区,对分区选Leader,可以增加Topic级别消息并发度。

综上所述,一个是增大副本高可用,另外增加并发;

1.3 topic中partition存储分布

假设实验环境中Kafka集群只有一个broker,xxx/message-folder为数据文件存储根目录,在Kafka broker中server.properties文件配置(参数log.dirs=xxx/message-folder),例如创建2个topic名称分别为report_push、launch_info, partitions数量都为partitions=4 存储路径和目录规则为: xxx/message-folder

 |--report_push-0
 |--report_push-1
 |--report_push-2
 |--report_push-3
 |--launch_info-0
 |--launch_info-1
 |--launch_info-2
 |--launch_info-3

在Kafka文件存储中,同一个topic下有多个不同partition,每个partition为一个目录,partiton命名规则为topic名称+有序序号,第一个partiton序号从0开始,序号最大值为partitions数量减1。 如果是多broker分布情况,请参考kafka集群partition分布原理分析

下面以一个Kafka集群中4个Broker举例,创建1个topic包含4个Partition,2 Replication;数据Producer流动如图所示:

在这里插入图片描述

(2)当集群中新增2节点,Partition增加到6个时分布情况如下:

在这里插入图片描述

https://tech.meituan.com/2015/01/13/kafka-fs-design-theory.html
https://blog.csdn.net/lizhitao/article/details/41778193

Partition Offset

请添加图片描述

2.kafka副本机制(数据可靠性)

2.1 Kafka 副本有哪两种,作用是什么?

在kafka中,每个主题可以有多个分区,每个分区又可以有多个副本。这多个副本中,只有一个是leader,而其他的都是follower副本。仅有leader副本可以对外提供服务。
在这里插入图片描述

多个follower副本通常存放在和leader副本不同的broker中。通过这样的机制实现了高可用,当某台机器挂掉后,其他follower副本也能迅速”转正“,开始对外提供服务。

这里通过问题来整理这部分内容。

Kafka中partition replication之间同步数据,从partition的leader复制数据到follower只需要一个线程(ReplicaFetcherThread),实际上复制是follower(一个follower相当于consumer)主动从leader批量拉取消息的,这极大提高了吞吐量,从中可以看出无处不显示Kafka高吞吐量设计思想。

  • kafka的副本都有哪些作用?
    在kafka中,实现副本的目的就是冗余备份,且仅仅是冗余备份,所有的读写请求都是由leader副本进行处理的。follower副本仅有一个功能,那就是从leader副本拉取消息,尽量让自己跟leader副本的内容一致。

  • 说说follower副本为什么不对外提供服务?
    这个问题本质上是对性能和一致性的取舍。试想一下,如果follower副本也对外提供服务那会怎么样呢?首先,性能是肯定会有所提升的。但同时,会出现一系列问题。类似数据库事务中的幻读,脏读。

2.2 Kafka中replication如何复制数据

Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。完全同步复制要求All Alive Follower都复制完,这条消息才会被认为commit,这种复制方式极大的影响了吞吐率。

而异步复制方式下,Follower异步的从Leader复制数据,数据只要被Leader写入log就被认为已经commit,这种情况下如果Follower都复制完都落后于Leader,而如果Leader突然宕机,则会丢失数据。

而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐率。Follower可以批量的从Leader复制数据,而且Leader充分利用磁盘顺序读以及send file(zero copy)机制,这样极大的提高复制性能,内部批量写磁盘,大幅减少了Follower与Leader的消息量差。

Kafka中partition replication之间同步数据,从partition的leader复制数据到follower只需要一个线程(ReplicaFetcherThread),实际上复制是follower(扮演consumer角色)主动从leader批量拉取消息的,这极大提高了吞吐量,从中可以看出无处不显示Kafka高吞吐量设计思想。

  • 优点
    性能高,吞吐量大。
    降低了系统和磁盘开销,Leader充分利用磁盘顺序读以及send file(zero copy)机制。
    降低Leader与Follower之间网络开销和交互次数。

  • 缺点

    有可能会占用大量网络带宽(例如本来集群很大而且数据量很多,后来新增Broker节点需要迁移数据),甚至堵塞网络,需要有流控机制,否则会影响线上服务。

    因为Follower是批量拉取Leader消息,如果设置为保证所有replicas commit,才返回Ack给生产者会存在抖动现象,Follow拉取Leader修改HW,当HW与当次生产者请求logEndOffset的offst一致时,客户端等待时间会拉长。

2.3 ISR机制(副本数据一致性)

ISR 的引入主要是解决同步副本与异步复制两种方案各自的缺陷:

  • 同步副本中如果有个副本宕机或者超时就会施慢该副本组的整体性

  • 如果仅仅使用异步副本,当所有的副本消息均远落后于主副本时,一旦主副本宕机重新边那么就会存在消息丢失情况。

kafka 为了保证数据的一致性使用了isr 机制

isr 的全称是:In-Sync Replicas isr 是一个副本的列表,里面存储的都是能跟leader 数据一致的副本,确定一个副本在isr列表中,有2个判断条件

  • 根据副本和leader 的交互时间差,如果大于某个时间差 就认定这个副本不行了,就把此副本从isr 中剔除,此时间差根据

    配置参数rerplica.lag.time.max.ms=10000 
    也就是默认10s,isr中的follow没有向isr发送心跳包就会被移除
    
  • 根据leader 和副本的信息条数差值决定是否从isr 中剔除此副本,此信息条数差值根据配置参数

    rerplica.lag.max.messages=4000 决定 ,也就是默认消息差大于4000会被移除
    

注意点:kafka后续版本移除了第二个判断条件,只保留了第一个,以内极端情况下,如果producor一次性发来了10000条数据,而默认条数差立马会大于4000

首先我们知道kafka 的数据是多副本的,某个topic的replication-factor为N且N大于1时,每个Partition都会有N个副本(Replica)。kafka的replica包含leader与follower。每个topic 下的每个分区下都有一个leader 和(N-1)个follower,
每个follower 的数据都是同步leader的 这里需要注意 是follower 主动拉取leader 的数据

Replica的个数小于等于Broker的个数,也就是说,对于每个Partition而言,每个Broker上最多只会有一个Replica,因此可以使用Broker id 指定Partition的Replica

leader && follower

Replication

Kafka 在0.8以前的版本中,并不提供 HA 机制,一旦一个或多个 Broker 宕机,则宕机期间其上所有 Partition 都无法继续提供服务。若该 Broker 永远不能再恢复,亦或磁盘故障,则其上数据将丢失。

在 Kafka 在0.8以前的版本中,是没有 Replication 的,一旦某一个 Broker 宕机,则其上所有的 Partition 数据都不可被消费,这与 Kafka 数据持久性及 Delivery Guarantee 的设计目标相悖。同时 Producer 都不能再将数据存于这些 Partition 中。

  • 如果 Producer 使用同步模式则 Producer 会在尝试重新发送 message.send.max.retries(默认值为3)次后抛出 Exception,用户可以选择停止发送后续数据也可选择继续选择发送。而前者会造成数据的阻塞,后者会造成本应发往该 Broker 的数据的丢失。

  • 如果 Producer 使用异步模式,则 Producer 会尝试重新发送 message.send.max.retries(默认值为3)次后记录该异常并继续发送后续数据,这会造成数据丢失并且用户只能通过日志发现该问题。

由此可见,在没有 Replication 的情况下,一旦某机器宕机或者某个 Broker 停止工作则会造成整个系统的可用性降低。随着集群规模的增加,整个集群中出现该类异常的几率大大增加,因此对于生产系统而言 Replication 机制的引入非常重要。

Leader

kafka由zk选举partition follower为leader

请添加图片描述

3.kafka broker生产、消费底层实现

在这里插入图片描述
一次简单的消息从生产到消费过程,需要经过 2次网络IO 和 2次磁盘IO 。如果消息体过大,势必会增加IO的耗时,进而影响kafka生产和消费的速度。消费者速度太慢的结果,就会出现消息积压情况。

3.1 kafka读取数据流程

在这里插入图片描述

对于Produce请求:Server端的I/O线程统一将请求中的数据写入到操作系统的PageCache后立即返回,当消息条数到达一定阈值后,Kafka应用本身或操作系统内核会触发强制刷盘操作(如左侧流程图所示)。
对于Consume请求:主要利用了操作系统的ZeroCopy机制,当Kafka Broker接收到读数据请求时,会向操作系统发送sendfile系统调用,操作系统接收后,首先试图从PageCache中获取数据(如中间流程图所示);如果数据不存在,会触发缺页异常中断将数据从磁盘读入到临时缓冲区中(如右侧流程图所示),随后通过DMA操作直接将数据拷贝到网卡缓冲区中等待后续的TCP传输。

在这里插入图片描述

producer

Producer 发送消息到 Broker 时,会根据 Paritition 机制选择将其存储到哪一个 Partition。如果 Partition 机制设置合理,所有消息可以均匀分布到不同的 Partition里,这样就实现了负载均衡。

  • 指明 Partition 的情况下,直接将给定的 Value 作为 Partition 的值。
  • 没有指明 Partition 但有 Key 的情况下,将 Key 的 Hash 值与分区数取余得到 Partition值。
  • 既没有 Partition 有没有 Key 的情况下,第一次调用时随机生成一个整数(后面每次调用都在这个整数上自增),将这个值与可用的分区数取余,得到 Partiticn 值,也就是常说的 Round-Robin 轮询算法。

consumer消费消息

写message

  • 消息从java堆转入page cache(即物理内存)。
  • 由异步线程刷盘,消息从page cache刷入磁盘。

读message

  • 消息直接从page cache转入socket发送出去。
  • 当从page cache没有找到相应数据时,此时会产生磁盘IO,从磁盘Load消息到page cache,然后直接从socket发出去

kafka消费者组 consumer group

在这里插入图片描述

消费者组内消费者数量

在这里插入图片描述

上面那张图,仔细推敲一下就会发现,图中其实已经有一些既定的事实,比如消费者组内消费者小于或等于分区数,以及topic分区数刚好是消费者组内成员数的倍数。

那么如果消费者组内成员数超过分区数会怎样呢?比如有4个分区,但消费者组内有6个消费者,这时候有2个消费者不会分配分区,它会一直空闲。

而如果消费者不是分区的倍数,比如topic内有4个分区,而消费者组内有三个消费者,那怎么办呢?这时候只会有两个消费者分别被分配两个分区,第三个消费者同样空闲。

所以,消费者组内的消费者数量最好是与分区数持平,再不济,最好也是要是分区数的数量成比例。

重平衡(Rebalance)

请添加图片描述

说完消费者组,再来说说与消费者组息息相关的重平衡机制。重平衡可以说是kafka为人诟病最多的一个点了。

重平衡其实就是一个协议,它规定了如何让消费者组下的所有消费者来分配topic中的每一个分区。比如一个topic有100个分区,一个消费者组内有20个消费者,在协调者的控制下让组内每一个消费者分配到5个分区,这个分配的过程就是重平衡。

重平衡的触发条件主要有三个:

  • 消费者组内成员发生变更,这个变更包括了增加和减少消费者。注意这里的减少有很大的可能是被动的,就是某个消费者崩溃退出了
  • 主题的分区数发生变更,kafka目前只支持增加分区,当增加的时候就会触发重平衡
  • 订阅的主题发生变化,当消费者组使用正则表达式订阅主题,而恰好又新建了对应的主题,就会触发重平衡

为什么说重平衡为人诟病呢?因为重平衡过程中,消费者无法从kafka消费消息,这对kafka的TPS影响极大,而如果kafka集内节点较多,比如数百个,那重平衡可能会耗时极多。数分钟到数小时都有可能,而这段时间kafka基本处于不可用状态。所以在实际环境中,应该尽量避免重平衡发生。

避免重平衡

小结一下,其实主要就是三个参数,session.timout.ms控制心跳超时时间,heartbeat.interval.ms控制心跳发送频率,以及max.poll.interval.ms控制poll的间隔。这里给出一个相对较为合理的配置,如下:

session.timout.ms:控制心跳超时时间 设置为6s
heartbeat.interval.ms:控制心跳发送频率 设置2s
max.poll.interval.ms:控制poll的间隔, 推荐为消费者处理消息最长耗时再加1分钟

consumer group 避免活锁(livelock)

请添加图片描述

consumer Commit Offset

请添加图片描述

consumer Push vx Pull策略

请添加图片描述

4.Kafka消息底层存储原理

Kafka文件存储机制那些事

请添加图片描述

磁盘缓存:BufferIo
DirectIo:bypass kernel 绕过内核,直接读底层硬件存储,不经过pagecache

free -m

buff: 写 IO 缓存
cache: 读 IO 缓存

读机械硬盘:滑动机械臂,寻某一个位置还需要等磁盘旋转,有寻道延迟和旋转延迟,读取延迟,影响磁盘性能

随即IO:要等机械臂旋转、浪费时间

消息存储结构

向kafka的某个topic中写消息,实际上就是向内部的某个partition写消息,读取消息也是从某个partition上读取消息。

一个topic的实际物理存储实际上对应着一个个的文件夹,文件夹名字是按照topic-num进行目录划分的,topic就是topic名字,num就是partition编号,从0开始。

partition文件夹下面也不仅仅是一个日志文件,而是有多个文件,这些多个文件可以从逻辑上将文件名一致的文件集合就称为一个 LogSegment日志段文件组。
partion相当于一个巨型文件,被平均分配到多个大小相等的LogSegment数据文件中,每个LogSegment内部的消息数量不一定相等,这种分段存储的特性特性方便旧的LogSegment file快速被删除,同时加快了文件查找速度。

每个逻辑segment段主要包含:消息日志文件(以log结尾)、偏移量索引文件(以index结尾)、时间戳索引文件(以timeindex结尾)、快照文件(以.snaphot结尾)等等文件。

如下表示一个某个topic文件夹下的主要文件结构:

在这里插入图片描述

每个分区是由多个Segment组成,当Kafka要写数据到一个partition时,它会写入到状态为active的segment中。如果该segment被写满,则一个新的segment将会被新建,然后变成新的“active” segment。

相关存储文件

数据日志文件 .log

log文件作为日志文件,存储了消息内容以及该消息相关的元数据。Kafka会将消息以追加的方式顺序持久化到partition下面的最新的日志段下面的日志文件中,即只有最后一个logSegment文件才能执行写入操作。

每一个条消息日志的主要包含offset,MessageSize,data三个属性(还有一些其他属性):

  • offset:8字节,表示Message在这个partition中的全局偏移量,这是一个逻辑值而不是实际无力偏移量,它唯一确定了partition中的一条Message所在的逻辑位置,可以看作是partition中Message的id,offset从0开始。
  • MessageSize:4字节,表示消息内容的大小;
  • data:Message的具体内容,大小不固定。
偏移量索引文件(稀疏索引) .index

index文件作为偏移量索引文件,主要用于加快查找消息的速度。该文件中的每一条索引记录都对应着log文件中的一条消息记录。

一条索引记录包含相对offset和position两个属性(均为4字节):

  • 相对offset:因为数据文件分段以后,每个数据文件的起始offset不为0,相对offset表示这条Message相对于其对应的数据文件中最小offset(也就是基准offset)的大小。举例,分段后的一个数据文件的offset是从20开始,那么offset为25的Message在index文件中的相对offset就是25-20 = 5。存储相对offset可以减小索引文件占用的空间。
  • position:表示该条Message在数据文件中的物理位置。只要打开对应log文件并移动文件指针到这个position就可以读取对应的Message内容了。

相对offset大小为4字节,因此最大值为Integer.MAX_VALUE。在写入消息时会对要写入的相对offset进行校验,超过该值时将会自动进行日志段切分。

假设某个LogSegment段中的数据文件名1234.log,索引文件中某个索引条目为(3,497)为例,那么这个索引条目对应的消息就是在数据文件中的第4个消息,该消息全局offset为1238,该消息的物理偏移地址(相对数据文件)为497。

稀疏索引

index索引文件中并没有为数据文件中的每条Message建立索引,而是采用了稀疏索引,默认每间隔4k的数据建立一条索引,时间戳索引文件(以timeindex结尾)也是这个规则。通过log.index.interval.bytes属性可以更新间隔数量大小。

稀疏索引避免了索引文件占用过多的空间,从而可以将索引文件长期保留在内存中,但缺点是没有建立索引的Message也不能一次性定位到其在数据文件的位置,从而需要做一次顺序扫描,但是这次顺序扫描的范围就很小了。

index索引文件中并没有为数据文件中的每条Message建立索引,而是采用了稀疏索引,默认每间隔4k的数据建立一条索引,时间戳索引文件(以timeindex结尾)也是这个规则。通过log.index.interval.bytes属性可以更新间隔数量大小。

稀疏索引避免了索引文件占用过多的空间,从而可以将索引文件长期保留在内存中,但缺点是没有建立索引的Message也不能一次性定位到其在数据文件的位置,从而需要做一次顺序扫描,但是这次顺序扫描的范围就很小了。

Kafka还采用了mmap的方式直接将 index 文件映射到内存中(Java中就是MappedByteBuffer),这样对 index 的操作就不需要操作磁盘 IO,大大的减少了磁盘IO次数和时间。

Kafka高效文件存储设计特点

  • Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
  • 通过索引信息可以快速定位message和确定response的最大大小。
  • 通过index元数据全部映射到memory,可以避免segment file的IO磁盘操作。
  • 通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小。

在partition中如何通过offset查找message

例如读取offset=368776的message,需要通过下面2个步骤查找。

第一步查找segment file 上述图2为例,

在这里插入图片描述

其中00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0.

第二个文件00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1.同样,第三个文件00000000000000737337.index的起始偏移量为737338=737337 + 1,

其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据offset 二分查找文件列表,就可以快速定位到具体文件。 当offset=368776时定位到00000000000000368769.index|log

第二步通过segment file查找message 通过第一步定位到segment file,当offset=368776时,依次定位到00000000000000368769.index的元数据物理位置和00000000000000368769.log的物理偏移地址,然后再通过00000000000000368769.log顺序查找直到offset=368776为止。

从上述图3可知这样做的优点,segment index file采取稀疏索引存储方式,它减少索引文件大小,通过mmap可以直接内存操作,稀疏索引为数据文件的每个对应message设置一个元数据指针,它比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间。

总结

  • parition中的消息采用多个小文件段的方式存储,很容易实现消息定期清除或删除已经消费完的文件。
  • 通过索引文件可以快速定位message在日志文件中的位置。
  • 索引文件通过Mmap技术直接映射到内存中,这样对索引的操作就不需要操作磁盘IO,也减少了数据在内存中的拷贝次数。
  • 索引文件采用稀疏存储,可以大幅降低索引文件映射到内存中时占用的内存空间大小。

中间件写文件

本质上,page cache 可以看成是一个 write- through 的缓存。

一般情况下,用户直接使用 write/fflush 只是 写到了 page cache 里面。
这个时候如果宕机,那么依赖会丢失掉数据。

可以依赖于 Linux 自动刷新,也可以自己强 制发起系统调用来刷新

page cache刷盘

Linux 自动刷新 page cache:

linux 对每一个设备维持了一个后台线程,用 于回写 page cache。 会在两种情况下触发:

  • 脏页(dirty page)很多
  • 周期性刷新

Linux内核由于存在page cache, 一般修改的文件数据并不会马上同步到磁盘,会缓存在内存的page cache中,我们把这种和磁盘数据不一致的页称为脏页,脏页会在合适的时机同步到磁盘。为了回写page cache中的脏页,需要标记页为脏(dirty)。

其实对于用户代码来说,整个过程是不可控的。因此依赖于自动刷新机制,就要容忍数据丢失
在这里插入图片描述

Mysql、Redis、Kafka 刷盘时机

MySQL binlog刷盘可以通过 sync_binlog参数来控制

  • 0-系统自由判断
  • 1-commit刷盘
  • N-每N个事务刷盘

MySQL redo log:通过参数 innodb_flush_log_at_trx_commit 控制

  • 0-写入 log buffer,每秒刷新到盘;
  • 1-每次提交刷盘;
  • 2-写入到 OS cache,每秒刷盘;

Redis AOF 刷盘时机:

  • always: 每次都刷盘
  • everysec: 每秒刷盘
  • no: 由操作系统决定

Kafka 刷盘时机:

  • log.flush.interval.ms:间隔多少毫秒刷新
  • log.flush.interval.messages:每多少条消息刷新一次

写入磁盘总结:

  1. 写到中间件自己的 log buffer 就返回,需要考虑何时刷到系统缓存或者磁盘;
  2. 写到系统缓存就返回(这种可能是因为自身没有 log buffer),要考虑刷新到系统磁盘的 时机
  3. 直接刷新到磁盘
    刷盘策略: 1. 每次都刷 2. 按次数 3. 按数据量
  4. 按时间间隔

5.kafka ack机制

producer ack

Kafka producer有三种ack机制 初始化producer时在config中进行配置

  • ack=0 意味着producer不等待broker同步完成的确认,继续发送下一条(批)信息
    提供了最低的延迟。但是最弱的持久性,当服务器发生故障时,就很可能发生数据丢失。例如leader已经死亡,producer不知情,还会继续发送消息broker接收不到数据就会数据丢失

  • ack=1 意味着producer要等待leader成功收到数据并得到确认,才发送下一条message。此选项提供了较好的持久性较低的延迟性。
    Partition的Leader死亡,follwer尚未复制,数据就会丢失

  • ack=-1 意味着producer得到follwer确认,才发送下一条数据,延时性最差,性能最慢,但可靠性最强(数据强一致性)

三种机制性能递减,可靠性递增。
同时,Ack默认值为1,此时吞吐量与可靠性折中。实际生产中可以根据实际需求进行调整。

Kafka中位移提交

Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即Consumer 需要为分配给它的每个分区提交各自的位移数据。

提交位移主要是为了表征 Consumer 的消费进度,这样当 Consumer 发生故障重启之后,就能够从 Kafka 中读取之前提交的位移值,然后从相应的位移处继续消费,从而避免整个消费过程重来一遍。换句话说,位移提交是 Kafka 提供给你的一个工具或语义保障,你负责维持这个语义保障,即如果你提交了位移 X,那么 Kafka 会认为所有位移值小于 X 的消息你都已经成功消费了

好了,我们来总结一下今天的内容。Kafka Consumer 的位移提交,是实现 Consumer 端语义保障的重要手段。位移提交分为自动提交和手动提交,而手动提交又分为同步提交和异步提交。在实际使用过程中,推荐你使用手动提交机制,因为它更加可控,也更加灵活。另外,建议你同时采用同步提交和异步提交两种方式,这样既不影响 TPS,又支持自动重试,改善 Consumer 应用的高可用性。总之,Kafka Consumer API 提供了多种灵活的提交方法,方便你根据自己的业务场景定制你的提交策略。

6.零拷贝技术

https://www.cnblogs.com/zz-ksw/p/12801632.html

传统的文件读写(四次拷贝与四次上下文切换)

传统的文件读写或者网络传输,通常需要将数据从内核态转换为用户态。

应用程序读取用户态内存数据,写入文件 / Socket之前,需要从用户态转换为内核态之后才可以写入文件或者网卡当中。

例如消息中间件 Kafka 就是这个应用场景,从磁盘中读取一批消息后原封不动地写入网卡(NIC,Network interface controller)进行发送。

在没有任何优化技术使用的背景下,操作系统为此会进行 4 次数据拷贝,以及 4 次上下文切换,如下图所示:
在这里插入图片描述

在Consumer从Broker中读取数据,按照传统的IO模型,磁盘 —> 内核态 —> 用户态 —> 内核态 —> 网卡,一共要经历四次拷贝,四次上下文切换。

传统模式下,当需要对一个文件进行传输的时候,其具体流程细节如下:

  • 调用read函数,文件数据被copy到内核缓冲区(页缓存PageCache)
  • read函数返回,文件数据从内核缓冲区copy到用户缓冲区
  • write函数调用,将文件数据从用户缓冲区copy到内核与socket相关的缓冲区。
  • 数据从socket缓冲区copy到相关协议引擎。

显然,第二次和第三次数据copy 其实在这种场景下没有什么帮助反而带来开销,这也正是零拷贝出现的意义。

在这里插入图片描述
在这里插入图片描述

DMA直接内存访问

DMA(Direct Memory Access,直接内存访问)是一种计算机技术,允许外部设备(如网卡、硬盘控制器等)直接访问计算机内存,而不需要通过中央处理器(CPU)的干预。这样可以大大提高数据传输速度和系统效率。

在传统的I/O操作中,CPU通常需要直接参与数据传输,即将数据从外部设备读取到内存中或者从内存中写入到外部设备。这种方式会占用CPU的时间,降低系统的效率。而DMA技术则通过在设备和内存之间引入专门的DMA控制器,使得外部设备可以直接与内存进行数据传输,从而解放了CPU,使其能够同时执行其他任务。

DMA实现
DMA传输将数据从一个地址空间复制到另一个地址空间,提供在外设和存储器之间或者存储器和存储器之间的高速数据传输。当CPU初始化这个传输动作,传输动作本身是由DMA控制器来实现和完成的。DMA传输方式无需CPU直接控制传输,也没有中断处理方式那样保留现场和恢复现场过程,通过硬件为RAM和IO设备开辟一条直接传输数据的通道,使得CPU的效率大大提高。

DMA的工作原理通常包括以下几个步骤:

  • CPU通过设置DMA控制器的寄存器来指定数据传输的起始地址、目的地址和数据长度等参数。
  • DMA控制器获得CPU的控制后,开始从外部设备读取数据或向外部设备写入数据。
  • 数据传输完成后,DMA控制器会向CPU发送中断信号,通知数据传输完成。
  • 使用DMA技术可以显著提高系统的性能和效率,特别是在需要大量数据传输的场景下,如音视频处理、网络数据传输等。

而用户空间与内核空间之间的数据传输并没有类似DMA这种可以不需要CPU参与的传输工具,因此用户空间与内核空间之间的数据传输是需要CPU全程参与的。所有也就有了通过零拷贝技术来减少和避免不必要的CPU数据拷见过程

现在,DMA 代替了 CPU 负责内存与磁盘以及内存与网卡之间的数据搬运,CPU 作为 DMA 的控制者,如下图所示:
在这里插入图片描述

什么是零拷贝技术?

零拷贝技术是一个思想,指的是指计算机执行操作时,CPU 不需要先将数据从某处内存复制到另一个特定区域。

可见,**零拷贝的特点是 CPU 不全程负责内存中的数据写入其他组件,CPU 仅仅起到管理的作用。但注意,零拷贝不是不进行拷贝,而是 CPU 不再全程负责数据拷贝时的搬运工作。**如果数据本身不在内存中,那么必须先通过某种方式拷贝到内存中(这个过程 CPU 可以不参与),因为数据只有在内存中,才能被转移,才能被 CPU 直接读取计算

零拷贝技术的具体实现方式:
  • sendfile

  • mmap

  • splice

  • 直接 Direct I/O

不同的零拷贝技术适用于不同的应用场景,下面依次进行 sendfile、mmap、Direct I/O 的分析。

  • DMA 技术回顾:DMA 负责内存与其他组件之间的数据拷贝,CPU 仅需负责管理,而无需负责全程的数据拷贝;

  • 使用 page cache 的 zero copy:

    • sendfile:一次代替 read/write 系统调用,通过使用 DMA 技术以及传递文件描述符,实现了 zero copy

    • mmap:仅代替 read 系统调用,将内核空间地址映射为用户空间地址,write 操作直接作用于内核空间。通过 DMA 技术以及地址映射技术,用户空间与内核空间无须数据拷贝,实现了 zero copy

  • 不使用 page cache 的 Direct I/O:读写操作直接在磁盘上进行,不使用 page cache 机制,通常结合用户空间的用户缓存使用。通过 DMA 技术直接与磁盘/网卡进行数据交互,实现了 zero copy

sendfile实现过程

在这里插入图片描述

snedfile 的应用场景是:用户从磁盘读取一些文件数据后不需要经过任何计算与处理就通过网络传输出去。此场景的典型应用是消息队列。

在传统 I/O 下,正如第一节所示,上述应用场景的一次数据传输需要四次 CPU 全权负责的拷贝与四次上下文切换,正如本文第一节所述。

sendfile 主要使用到了两个技术:

  • DMA 技术;

  • 传递文件描述符代替数据拷贝;

    1.page cache 以及 socket buffer 都在内核空间中;
    2.数据传输过程前后没有任何写操作;

在Kafka中,transferFrom和transferTo方法。

sendfile零拷贝技术过程

  • 先从用户态切换到内核态,把磁盘数据拷贝到内核缓冲区,同时从内环缓冲区拷贝一些offset和length数据到socket缓冲区,

  • 接着从内核态切换到用户态,从内核缓冲区直接把数据拷贝到网络协议引擎里去,同时从Socket缓冲区拷贝一些offset和length信息到网络协议引擎里去,offset和length量几乎可以忽略。

只要2次切换,2次拷贝。
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

mmap实现

mmap,内存映射,直接将磁盘文件数基于DMA引擎拷贝据映射到内核缓冲区,同时用户缓冲区是跟内核缓冲区共享一块映射数据,建立映射后,不需要从内核缓冲区拷贝到用户缓冲区。故,可减少一次拷贝。总共是4次切换,3次拷贝。
在这里插入图片描述

mmap sendFile区别

mmap 适合小数据量读写,sendFile 适合大文件传输。

mmap 需要 4 次上下文切换,3 次数据拷贝;sendFile 需要 3 次上下文切换,最少 2 次数据拷贝。

sendFile 可以利用 DMA 方式,减少 CPU 拷贝,mmap 则不能(必须从内核拷贝到 Socket 缓冲区)。

零拷贝技术应用

包括mmap和sendfile两种形式,在各种框架中常有体现。

Kafka中存在大量的网络数据持久化到磁盘和磁盘文件通过网络发送的过程,使用Sendfile方式

kafka Sendfile

把磁盘文件映射到内存中,然后把映射到内存的数据通过Socket发送出去。

RocketMQ使用mmap零拷贝技术

Consumer消费消息过程使用零拷贝,零拷贝包括2种方式,RocketMQ使用第一种方式,因小块数据传输的效果比sendfile方式好

  • 使用mmap+write方式
    优点:即使频繁调用,使用小文件块传输,效率也很高
    缺点:不能很好的利用DMA方式,会比sendfile多消耗CPU资源,内存安全性控制复杂,需要避免JVM Crash问题

使用sendfile方式
优点:可以利用DMA方式,消耗CPU资源少,大块文件传输效率高,无内存安全新问题
缺点:小块文件效率低于mmap方式,只能是BIO方式传输(同步阻塞),不能使用NIO(同步非阻塞)

在这个选择上:rocketMQ 在消费消息时,使用了 mmap。kafka 使用了 sendFile。

3.BIO和NIO的区别

  • BIO以流的方式处理数据,NIO以块的方式处理数据,块IO的效率比流IO高很多。(比如说流IO他是一个流,你必须时刻去接着他,不然一些流就会丢失造成数据丢失,所以处理这个请求的线程就阻塞了他无法去处理别的请求,他必须时刻盯着这个请求防止数据丢失。而块IO就不一样了,线程可以等他的数据全部写入到缓冲区中形成一个数据块然后再去处理他,在这期间该线程可以去处理其他请求)
  • BIO是阻塞的,NIO是非阻塞的
  • BIO基于字节流和字符流进行操作的,而NIO基于Channel(通道)和Buffer(缓冲区)进行操作的,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择器)用于监听多个通道事件,因此使用单个线程就可以监听多个客户端通道

https://blog.csdn.net/weixin_48872249/article/details/113845526

Kafka高吞吐率原因分析

1.磁盘顺序读写

顺序写:

  • 磁盘IO慢主要慢在寻道和旋转,顺序写减少了磁盘寻道和旋转次数。因为硬盘是机械结构,每次读写都会寻址->写入,其中寻址是一个“机械动作”,它是最耗时的。所以硬盘最讨厌随机I/O,最喜欢顺序I/O。为了提高读写硬盘的速度,Kafka就是使用顺序I/O。

  • Kafka中每个分区是一个有序的,不可变的消息序列,新的消息不断的追加到Partition的末尾,Partition是一个逻辑概念,每个Partition被划分成多个Segment,每个Segment对应一个物理文件,Kafka对Segment文件追加写,就是顺序写文件。

Kafka会把收到的消息都写入到硬盘中,它绝对不会丢失数据。为了优化写入速度Kafka采用了两个技术, 顺序写入 和 MMFile 。

kafka将来自Producer的数据,顺序追加在partition,partition就是一个文件,以此实现顺序写入。

在这里插入图片描述

Consumer从broker读取数据时,因为自带了偏移量,接着上次读取的位置继续读,以此实现顺序读。

顺序读写,是kafka利用磁盘特性的一个重要体现。

2. 不使用JVM,操作系统页缓存(PageCache)

在这里插入图片描述

PageCache:

producer 生成消息到 Broker 时,Broker 会使用 pwrite() 系统调用,按偏移量写入数据。写入时,会先写入 page cache。

Consumer 消费消息时,Broker会使用sendfile() 系统调用,零拷贝的将数据从 page cache 传输到 Broker 的 Socket Buffer,通过网络传输。因此当Kafka的生产速率和消费速率相差不大时,就能几乎只靠 page cache 的读写完成整个生产-消费过程,磁盘访问非常少

什么页缓存?
页缓存是操作系统实现的一种主要的磁盘缓存,以此用来减少对磁盘I/O的操作。具体来说,就是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问。为了弥补性能上的差异 ,现代操作系统越来越多地将内存作为磁盘缓存,甚至会将所有可用的内存用途磁盘缓存,这样当内存回收时也几乎没有性能损失,所有对于磁盘的读写也将经由统一的缓存。

**Kafka中大量使用了页缓存,这是Kafka实现高吞吐的重要因此之一。**虽然消息都是先被写入页缓存,然后由操作系统负责具体的刷盘任务,但在Kafka中同样提供了同步刷盘及间断性强制刷盘(fsync)的功能,这些功能可以通过log.flush.interval.message、log.flush.interval.ms等参数来控制。

同步刷盘可以提高 消息的可行性,防止由于机器掉电等异常造成处于页缓存而没有及时写入磁盘的消息丢失。不过一般不建议这么做,刷盘任务就应交由操作系统去调配,消息的可靠性应该由多副本机制来保障,而不是由同步刷盘这种严重影响性能的行为来保障。

3.零拷贝(索引文件、日志文件读写)

kafka 为了解决内核态和用户态数据不必要 Copy 这个问题, 在读取数据的时候就引入了「零拷贝技术」。即让操作系统的 os cache 中的数据直接发送到网卡后传出给下游的消费者,中间跳过了两次拷贝数据的步骤,从而减少拷贝的 CPU 开销, 减少用户态内核态的上下文切换次数, 从而优化数据传输的性能, 而Socket缓存中仅仅会拷贝一个描述符过去,不会拷贝数据到Socket缓存,如下图所示:

在这里插入图片描述
在 Kafka 中主要有以下两个地方使用到了「零拷贝技术」:

1)基于 mmap 机制实现的索引文件:首先索引文件都是基于 MappedByBuffer 实现,即让用户态和内核态来共享内核态的数据缓冲区,此时数据不需要 Copy 到用户态空间。虽然 mmap 避免了不必要的 Copy,但是在不同操作系统下, 其创建和销毁成功是不一样的,不一定都能保证高性能。所以在 Kafka 中只有索引文件使用了 mmap。

2)基于sendfile 机制实现的日志文件读写:在 Kafka 传输层接口中有个 TransportLayer 接口,它的实现类中有使用了 Java FileChannel 中 transferTo 方法。该方法底层就是使用 sendfile 实现的零拷贝机制, 目前只是在 I/O 通道是普通的 PLAINTEXT 的时候才会使用到零拷贝机制。

2、Producer生产的数据持久化到broker,采用mmap文件映射,实现顺序的快速写入;
3、Customer从broker读取数据,采用sendfile,将磁盘文件读到OS内核缓冲区后,直接转到socket buffer进行网络发送。

4.批量与压缩

Kafka Producer 向 Broker 发送消息不是一条一条发送,而是按批发送。且roducer、Broker 和 Consumer 使用相同的压缩算法,在 producer 向 Broker 写入数据,Consumer 向 Broker 读取数据时甚至可以不用解压缩,最终在 Consumer Poll 到消息时才解压,这样节省了大量的网络和磁盘开销。

如果每个消息都压缩,但是压缩率相对很低,所以Kafka使用了批量压缩,即将多个消息一起压缩而不是单个消息压缩

5.文件结构:

Kafka 消息是以 Topic 为单位进行归类,各个 Topic 之间是彼此独立的,互不影响。每个 Topic 又可以分为一个或多个分区。每个分区各自存在一个记录消息数据的日志文件。

Kafka 每个分区日志在物理上实际按大小被分成多个 Segment。

index 采用稀疏索引,这样每个 index 文件大小有限,Kafka 采用mmap的方式,直接将 index 文件映射到内存,这样对 index 的操作就不需要操作磁盘 IO。

Kafka 充分利用二分法来查找对应 offset 的消息位置

6.网络模型:Kafka基于NIO,采用Reactor线程模型,实现了自己的RPC通信。

一个Acceptor线程处理新的连接,多个Processor线程select 和 read socket请求,多个Handler线程处理请求并响应(I/O多路复用)。

Kafka 的网络通信模型是基于 NIO 的Reactor 多线程模型来设计的。其中包含一个Acceptor线程用于处理连接,多个 Processor 线程 select 和 read socket 请求,一个Processor 由包含多个 Handler 线程处理请求并响应。

在这里插入图片描述

7.分区Partition并发

Kafka 的 Topic 可以分成多个 Partition,每个 Paritition 类似于一个队列,保证数据有序。同一个 Group 下的不同 Consumer 并发消费 Paritition,分区实际上是调优 Kafka 并行度的最小单元,因此,可以说,每增加一个 Paritition 就增加了一个消费并发。

kafka优势总结

  • 它把所有的消息都变成一个批量的文件,并且进行合理的批量压缩,减少网络IO损耗
  • 顺序读写提高I/O速度,写入数据的时候由于单个Partion是末尾添加所以速度最优(顺序写入)
  • 读取数据的时候配合sendfile直接暴力输出。

Kafka 对于单一读写请求均拥有很好的吞吐和延迟。

处理写请求时,数据写入 PageCache 后立即返回,数据通过异步方式批量刷入磁盘,既保证了多数写请求都能有较低的延迟,同时批量顺序刷盘对磁盘更加友好。

处理读请求时,实时消费的作业可以直接从 PageCache 读取到数据,请求延迟较小,

同时 ZeroCopy 机制 能够减少数据传输过程中用户态与内核态的切换,大幅提升了数据传输的效率。

Kafka架构

Zookeeper

zookeeper对于kafka的作用是什么?

Zookeeper 主要用于在集群中不同节点之间进行通信,在 Kafka 中,它被用于提交偏移量,因此如果节点在任何情况下都失败了,它都可以从之前提交的偏移量中获取,除此之外,它还执行其他活动,如: leader 检测、分布式同步、配置管理、识别新节点何时离开或连接、集群、节点实时状态等等。

在 Kafka 中 Zookeeper 作用是什么?

Kafka 集群能够正常工作,目前还是需要依赖于 ZooKeeper,主要用来「负责 Kafka集群元数据管理,集群协调工作」,在每个 Kafka 服务器启动的时候去连接并将自己注册到 Zookeeper,类似注册中心。

Kafka 使用 Zookeeper 存放「集群元数据」、「集群成员管理」、 「Controller 选举」、「其他管理类任务」等。待 KRaft 提案完成后,Kafka 将完全不依赖 Zookeeper。

在这里插入图片描述

  • 1)集群元数据:Topic 对应 Partition 的所有数据都存放在 Zookeeper 中,且以 Zookeeper 保存的数据为准。

  • 2)集群成员管理:Broker 节点的注册、删除以及属性变更操作等。主要包括两个方面:成员数量的管理,主要体现在新增成员和移除现有成员;单个成员的管理,如变更单个 Broker 的数据等。

  • 3)Controller 选举:即选举 Broker 集群的控制器 Controller。其实它除了具有一般 Broker 的功能之外,还具有选举主题分区 Leader 节点的功能。在启动 Kafka系统时,其中一个 Broker 会被选举为控制器,负责管理主题分区和副本状态,还会执行分区重新分配的管理任务。如果在 Kafka 系统运行过程中,当前的控制器出现故障导致不可用,那么 Kafka 系统会从其他正常运行的 Broker 中重新选举出新的控制器。

  • 4)其他管理类任务:包括但不限于 Topic 的管理、参数配置等等。

Kafka 3.X 「2.8版本开始」为什么移除 Zookeeper 的依赖

原因有以下2点:

1)集群运维层面:Kafka 本身就是一个分布式系统,如果还需要重度依赖 Zookeeper,集群运维成本和系统复杂度都很高。

2)集群性能层面:Zookeeper 架构设计并不适合这种高频的读写更新操作, 由于之前的提交位移的操作都是保存在 Zookeeper 里面的,这样的话会严重影响 Zookeeper 集群的性能。

面试题

kafka如何保证数据可靠性

为了保证数据的可靠性,Kafka会给每个分区找一个节点当带头大哥(Leader),以及若干个节点当随从(Follower)。消息写入分区时,带头大哥除了自己复制一份外还会复制到多个随从。如果随从挂了,Kafka会再找一个随从从带头大哥那里同步历史消息;如果带头大哥挂了,随从中会选举出新一任的带头大哥,继续笑傲江湖。

https://www.cnblogs.com/listenfwind/p/12465409.html

kafka如何保证消息不丢失

1.持久化
2.副本机制

kafka先持久化还是先同步副本?

先同步持久化,再同步副本

kafka同步副本时,网络不太好 客户端需要一直等待吗?

如果有一副本,网络不太好,
ISR集合,检测到副本长时间没有同步数据,会剔除 ISR集合,

kafka分片设计

为什么kafka中1个partition只能被同组的一个consumer消费?

Kafka通过消费者组机制同时实现了发布/订阅模型和点对点模型。多个组的消费者消费同一个分区属于多订阅者的模式,自然没有什么问题;

而在单个组内某分区只交由一个消费者处理的做法则属于点对点模式。其实这就是设计上的一种取舍,如果Kafka真的允许组内多个消费者消费同一个分区,也不是什么灾难性的事情,只是没什么意义,而且还会重复消费消息。

顺序性也会被破坏

kafka面试题

1、kafka的消费者是pull(拉)还是push(推)模式,这种模式有什么好处?

Kafka 遵循了一种大部分消息系统共同的传统的设计:producer 将消息推送到 broker,consumer 从broker 拉取消息。

4、kafka判断一个节点还活着的有那两个条件?

(1)节点必须维护和 ZooKeeper 的连接,Zookeeper 通过心跳机制检查每个节点的连接
(2)如果节点是个 follower,他必须能及时的同步 leader 的写操作,延时不能太久

5、讲一讲 kafka 的 ack 的三种机制

request.required.acks 有三个值 0 1 -1(all),具体如下:

  • 0:生产者不会等待 broker 的 ack,这个延迟最低但是存储的保证最弱当 server 挂掉的时候就会丢数据。
  • 1:服务端会等待 ack 值 leader 副本确认接收到消息后发送 ack 但是如果 leader挂掉后他不确保是否复制完成新 leader 也会导致数据丢失。
  • -1(all):服务端会等所有的 follower 的副本受到数据后才会受到 leader 发出的ack,这样数据不会丢失。

7.kafka 如何保证消息幂等性

这个问题换种问法,就是kafka如何保证消息的幂等性。对于消息队列来说,出现重复消息的概率还是挺大的,不能完全依赖消息队列,而是应该在业务层进行数据的一致性幂等校验。

比如你处理的数据要写库(mysql,redis等),你先根据主键查一下,如果这数据都有了,你就别插入了,进行一些消息登记或者update等其他操作。另外,数据库层面也可以设置唯一健,确保数据不要重复插入等 。一般这里要求生产者在发送消息的时候,携带全局的唯一id。

7.发送消息的分区策略有哪些?

  • 1.轮询:依次将消息发送该topic下的所有分区,如果在创建消息的时候 key 为 null,Kafka 默认采用这种策略。
  • 2.key 指定分区:在创建消息是 key 不为空,并且使用默认分区器,Kafka 会将 key 进行 hash,然后根据hash值映射到指定的分区上。这样的好处是 key 相同的消息会在一个分区下,Kafka 并不能保证全局有序,但是在每个分区下的消息是有序的,按照顺序存储,按照顺序消费。在保证同一个 key 的消息是有序的,这样基本能满足消息的顺序性的需求。但是如果 partation 数量发生变化,那就很难保证 key 与分区之间的映射关系了。
  • 3.自定义策略:实现 Partitioner 接口就能自定义分区策略。
  • 4.指定 Partiton 发送

8 kafka消费完 数据删除吗

1.基于时间或者文件大小 某种策略批量删除

9.队列模型了解吗?Kafka 的消息模型知道吗?

队列模型:P2P、Pub/Sub
Kafka 采用的是 Pub/Sub 模型

10.Kafka 如何保证消息不重复消费?

kafka出现消息重复消费的原因:

服务端侧已经消费的数据没有成功提交 offset(根本原因)。
Kafka 侧 由于服务端处理业务时间长或者网络链接等等原因让 Kafka 认为服务假死,触发了分区 rebalance。
解决方案:

消费消息服务做幂等校验,比如 Redis 的set、MySQL 的主键等天然的幂等功能。这种方法最有效。

将 enable.auto.commit 参数设置为 false,关闭自动提交,开发者在代码中手动提交 offset。那么这里会有个问题:什么时候提交offset合适?

11.Kafka如何处理大量积压消息

方法:
1 增大partion数量,
2 消费者加了并发,服务, 扩大消费线程
3 增加消费组服务数量
4 kafka单机升级成了集群
5 避免消费者消费消息时间过长,导致超时
6 使Kafka分区之间的数据均匀分布

场景:
1 如果是Kafka消费能力不足,则可以考虑增加 topic 的 partition 的个数,
同时提升消费者组的消费者数量,消费数 = 分区数 (二者缺一不可)

2 若是下游数据处理不及时,则提高每批次拉取的数量。批次拉取数量过少
(拉取数据/处理时间 < 生产速度),使处理的数据小于生产的数据,也会造成数据积压。

12.顺序消息 消费失败问题

解决方案:重试表

顺序消息消费时,可以先查询订单是否在重试表有记录,如果有记录,则记录到重试表

消费失败时,也可以加到重试表中

13.pagecache存的什么东西

在 Linux 的实现中,文件 Cache 分为两个层面,一是 Page Cache,另一个是 Buffer Cache(块缓存)。
page cache用于缓存文件的页数据,大小通常为4K;
Buffer cache用于缓存块设备(如磁盘)的块数据,大小通常为1K。

在这里插入图片描述

page cache,又称pcache,其中文名称为页高速缓冲存储器,简称页高缓。

page cache的大小为一页,通常为4K。在linux读写文件时,它用于缓存文件的逻辑内容,从而加快对磁盘上映像和数据的访问。

  • 单位:页。通常为4K

  • 大小:动态变化,因为操作系统会将所有未直接分配给应用程序的物理内存都用于页面缓存。

  • 文件系统层级的缓存:page cache用于缓存文件的页数据,从磁盘中读取到的内容是存储在page cache里的。

Kafka对page cache的利用
Kafka为什么不自己管理缓存,而非要用page cache?原因有如下三点:

JVM中一切皆对象,数据的对象存储会带来所谓object overhead,浪费空间;
如果由JVM来管理缓存,会受到GC的影响,并且过大的堆也会拖累GC的效率,降低吞吐量;
一旦程序崩溃,自己管理的缓存数据会全部丢失。
Kafka三大件(broker、producer、consumer)与page cache的关系可以用下面的简图来表示。

kafka两个消费组消费有速度有差异会发生什么?

集群规模过大的kafka

pagecache会被污染,会存老的数据,因为内存不是无限大的,会有资源限制

kafka底层优化

多个group消费速度接近时,再开启page cache
多个group消费速度差距大时:不开启预读pagecache,直接走direct Io,绕过linux buffer IO

https://github.com/flycash/interview-baguwen/tree/main/mq

Kafka面试题

Kafka 是什么, 适应场景有哪些?

Kafka 是一个分布式的流式处理平台,用于实时构建流处理应用。主要应用在大数据实时处理领域。Kafka 凭借「高性能」、「高吞吐」、「高可用」、「低延迟」、「可伸缩」几大特性,成为「消息队列」的首选。

其主要设计目标如下:

  • 1)高性能:以时间复杂度为 O(1) 的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。

  • 2)高吞吐、低延迟:在非常廉价的机器上也能做到单机支持每秒几十万条消息的传输,并保持毫秒级延迟。

  • 3)持久性、可靠性:消息最终被持久化到磁盘,且提供数据备份机制防止数据丢失。

  • 4)容错性:支持集群节点故障容灾恢复,即使 Kafka 集群中的某一台 Kafka 服务节点宕机,也不会影响整个系统的功能(若副本数量为N, 则允许N-1台节点故障)。

  • 5)高并发:可以支撑数千个客户端同时进行读写操作。

你用消息队列做过什么?

答案:

  • 解耦:将不同的系统之间解耦开来。尤其是当你在不希望感知到下游的情况。(后面我们用一个一对多的项目来进一步说明,这是一个很常见的场景,可以自行替换自己的例子)通常而言,当我需要对接多个系统,需要告知我的某些情况的时候,但是我又不知道究竟有多少人关心,以及他们为啥关心的时候,就会考虑采用消息队列来通信。(这个例子是我的例子)例如我们退款,会用消息队列来暴露我们的退款信息,比如说退款成功与否。很多下游关心,但是实际上,我们退款部门根本不关心有谁关心。在不使用消息队列的时候,我们就需要一个个循环调用过去;

  • 异步:是指将一个同步流程,利用消息队列来拆成多个步骤。(下面是我准备的一个刷亮点的方面,我们从事件驱动设计上来讨论)这个特性被广泛应用在事件驱动之中。(下面是我退款的例子)比如说在退款的时候,退款需要接入风控,多个款项资金转移,这些步骤都是利用消息队列解耦,上一个步骤完成,发出事件来驱动执行下一步;

  • 削峰:(这是最大的考点,而且如果你的简历里面有类似电商之类的经历,那么就很可能追问下去,接连考秒杀啥的)削峰主要是为了应对突如其来的流量。一般来说,数据库只能承受每秒上千的写请求,如果在这个时候,突然来了几十万的请求,那么数据库可能就会崩掉。消息队列这时候就起到一个缓冲的效果,服务器可以根据自己的处理能力,一批一批从消息队列里面拉取请求并进行处理。

在这里插入图片描述
1)日志收集方向:可以用 Kafka 来收集各种服务的 log,然后统一输出,比如日志系统 elk,用 Kafka 进行数据中转。

2)消息系统方向:Kafka 具备系统解耦、副本冗余、流量削峰、消息缓冲、可伸缩性、容错性等功能,同时还提供了消息顺序性保障以及消息回溯功能等。

3)大数据实时计算方向:Kafka 提供了一套完整的流式处理框架, 被广泛应用到大数据处理,如与 flink、spark、storm 等整合。

消息队列有什么缺点?

分析:典型的反直觉题。因为我们只会说消息队列怎么怎么好,很少有人会思考使用消息队列会带来什么问题。

**答:

  • 可用性降低:引入任何一个中间件,或者多任何一个模块,都会导致你的可用性降低。(所以这个其实不是MQ的特性,而是所有中间件的特性)**

  • 一致性难保证:引入消息队列往往意味着本地事务不可用,那么就容易出现数据一致性的问题。例如业务成功了,但是消息没发出去;

  • 复杂性上升:复杂性分两方面,一方面是消息队列集群维护的复杂性,一方面是代码的复杂性

(升华主题)几乎所有的中间件的引入,都会引起类似的问题。

关键字:可用性,一致性,复杂性

Kafka 的高性能是如何保证的?

核心是零拷贝,Page Cache,顺序写,批量操作,数据压缩,日志分段存储

Kafka 的 ISR 是如何工作的?

核心是理解Kafka 如何维护 ISR,什么情况下会导致一个 partition 进去(或者出来)ISR

kafka因为Full GC而抖动,导致副本被踢出ISR

Kafka 的负载均衡策略有哪些?

列举策略,要注意分析优缺点。更进一步可以讨论更加宽泛的负载均衡的做法,和 RPC 之类的负载均衡结合做对比

为什么 Kafka 的从 Partition 不能读取?

违背直觉的问题,关键是要协调偏移量的代价太大;

为什么 Kafka 在消费者端采用了拉(PULL)模型?

注意和 PUSH 模型做对比。最好是能够举一个适用 PUSH 的例子

这样做有好处也有坏处:由broker决定消息推送的速率,对于不同消费速率的consumer就不太好处理了。

消息系统,实时性要求高
一些消息系统比如Scribe和Apache Flume采用了push模式,将消息推送到下游的consumer。

分区过多会引起什么问题?

又是一个违背直觉的问题,核心在于顺序写

如何确定合适的分区数量?

如何解决 Topic 的分区数量过多的问题? • 如何保证消息有序性?方案有什么缺点?抓住核心,相关的消息要确保发送到同一个分区,例如 ID 为1的永远发到分区1

Kafka能不能重复消费?当然可以。但是要强调,一般的消费者都要考虑幂等的问题

如何保证消息消费的幂等性?就是去重,简单就是数据库唯一索引,高级就是布隆过滤器 + 唯一索引

如何保证只发送(或者只消费)一次?

属实没必要,做好消费幂等简单多了

Kafka Rebalance (重平衡)

发生时机,rebalance 过程,rebalance 有啥影响?如何避免 rebalance?

核心把 rebalance 的过程背下来

消息积压怎么办?

没啥好办法,也就是加快消费,合并消息

分析:消息积压,核心就在于生产者太快而消费者太慢。解决思路要么控制生产者的发送速率,要么提高消费者的效率。一般我们不太会倾向于控制发送者的速率,所以解决问题的思路就变成了如何提高消费者效率。

在这里插入图片描述

提高消费者的效率,要么提高消费单条消息的效率,要么是增加消费者的数量。

答案:整体上有两个思路:

  • 增加集群规模。不过这个只能治标,缓解问题,但是不能解决问题;
  • 加快单个消息的消费速率。例如原本同步消费的,可以变成异步消费。把耗时的操作从消费的同步过程里面摘出去;
  • 增加消费者。例如Kafka中,增加Partition,或者启用线程池来消费同一个Partition。

(刷亮点)其实消息积压要看是突然的积压,即偶然的,那么只需要扩大集群规模,确保突然起来的消息都能在消息中间件上保存起来,就可以了。因为后续生产者的速率回归正常,消费者可以逐步消费完积压的消息。如果是常态化的生产者速率大于消费者,那么说明容量预估就不对。这时候就要调整集群规模,并且增加消费者。典型的就是,Kafka增加新的Partition。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐