一、认识kafka

1.1、kafka的定义

  • kafka是一个分布式的,基于 发布/订阅模式 的消息队列(Message Queue), 主要应用于大数据实时处理领域

1.2、消息队列

1.2.1、传统的消息队列应用场景

在这里插入图片描述

  • 同步处理示例:
    • 用户提交注册—>信息写入数据库 —>调用发送消息的接口—>发送消息给用户—>页面响应注册成功
    • 这种模式处理效率过低,响应慢
  • 异步处理示例:
    • 用户提交注册—>信息写入数据库,同时页面响应注册成功,并将发送短信的请求写入MQ队列 —> 之后调用发送消息的接口,发送消息给用户
    • 这种将一条线的流程,分开处理,能够有效的规避高峰期对服务器的负荷,降低QPS

1.2.2、使用消息队列的好处

  • 1、解耦:允许程序独立的扩展或者修改两边的处理过程,只要确保他们遵循同样的接口约束
  • 2、可恢复性:系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉了,加入队列的消息仍可以在系统恢复后被处理
  • 3、缓冲:有利于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况(主要是处理生产大于消费的情况)
  • 4、灵活性 & 峰值处理能力:在访问量剧增的情况下,应用仍然需要继续发挥作用,但这样的突发的流量并不常见。如果以峰值时期对资源的使用情况来购买服务器,那么显然会造成很大的浪费。使用消息队列能够是关键组件顶住突发的访问压力,而不会因为突发的超负荷请求而完全崩溃
  • 5、异步通信:很多时候,用户不想也不需要立即处理消息。消息队列提供异步处理机制,允许用户把一个消息放入到队列中,但是并不立即处理它,等需要的时候再去处理它。
    1.2.3、消息队列的两种模式
  • 1、点对点模式(一对一,消费者主动拉取数据,消息收到后清除队列中消息,也就是这消息只能给一个消费者)
    • 消息生产者生产消息发送到Queue中,然后消费者从Queue取出并消息数据,消息被消费以后,Queue中不再由存储,所以消费者不能看消费到已经被消费的消息
    • Queue支持存在多个消费者,但对于一个消息而言,只会有一个消费者可以消费
  • 2、发布/订阅模式(一对多,消费者消费数据之后不会消除消息)
    • 消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic中的消息会被所有订阅者消费(消息保留是有期限的,可以设置)
    • 发布/订阅模式 同样也可以分为两种:
      • 一种是由队列主动向所有订阅者推送消息(弊端:比如有两个订阅者,性能分别是30M/s和100M/s,而推送的速率为50M/s,这样对于30M/s来说负载过高,对于100M/s来说又是浪费)
      • 另一种是由订阅者主动拉取消息数据(弊端:订阅者需要不断的向队列询问是否有新消息,然后进行拉取,维持这个长轮询,需要占用资源)

点对点模式图
在这里插入图片描述
发布/订阅模式图
在这里插入图片描述

1.3、kafka架构

架构图
在这里插入图片描述

  • kafka分布式集群
    • broke:kafka集群是由多个broker(可以看做kafka节点) 组成
    • topic主题的作用是对消息进行分类(比如天猫的消息归类到topicA,淘宝的消息归类到topicB)
    • 一个topic可以有多个Partition分区,作用是达到负载均衡的效果,提高并发度
    • Patition分区又有 leader 和 follower (实现主备的效果,follower是leader的备份,follower只有在leader挂掉后才工作),所有Partition是成双出现的
    • Consumer消费者,某一个分区Patition只能被同一个消费组Consumer Group中的一个消费者消费(即一个消费组中的不同消费者不能同时消费同一个分区中的数据,A和B两个消费者在一个消费组中,A在消费分区0,那么B就不能消费分区0中的数据了)
    • Consumer Group消费组的作用是提高消费能力,这个组里有多个消费者共同协助消费数据,可以把一个消费组看作是一个消费者(所以,当消费组中的消费者和分区数相等的时候,并发性能是最高的)
    • zookpeer注册信息:帮助kafaka集群存储信息,管理整个集群,是kafaka正常运行的重要组件(0.9版本前zookpeer同样保存consumer消费消息,记录消费者消费到哪一条消息,保证高可用。0.9版本后,消费消息(offset)就直接保存在kafka中,有一个本地系统topic,专门来保存)
    • kafka默认保存7天的数据(168小时)
组件功能
Producer(消息生产者)就是向 kafka broker 发消息的客户端
Consumer(消息消费者)向 kafka broker 取消息的客户端
Consumer Group (CG 消费者组)由多个 consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者
Broker(kafka节点)一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker可以容纳多个 topic。
Topic(主题)可以理解为一个队列,生产者和消费者面向的都是一个 topic
Partition(分区)为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列
Replica (副本)为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 kafka 仍然能够继续工作,kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。
leader(主分区)每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。
follower(从分区)每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据的同步。leader 发生故障时,某个 follower 会成为新的 follower。

二、安装部署kafka

集群规划

  • 10.4.7.11:zk、kafka
  • 10.4.7.12:zk、kafka
  • 10.4.7.13:zk、kafka
    官方地址:http://kafka.apache.org/downloads.html

1、安装java环境

# 删除旧的jdk
[root@localhost ~]# rpm -qa| grep openjdk|xargs rpm -e --nodeps

# 安装java
[root@localhost ~]# ansible all -m shell -a "mkdir -p /usr/java"
[root@localhost ~]# ansible all -m shell -a 'rpm -qa| grep openjdk|xargs rpm -e --nodeps'
[root@localhost ~]# ansible all -m copy -a "src=jdk-8u291-linux-x64.tar.gz dest=jdk-8u291-linux-x64.tar.gz"
[root@localhost ~]# ansible all -m shell -a "tar xf jdk-8u291-linux-x64.tar.gz -C /usr/java"
[root@localhost ~]# ansible all -m shell -a "ln -s /usr/java/jdk1.8.0_291/ /usr/java/jdk"

# 在3个节点都添加环境变量
[root@localhost ~]# echo '
#JAVA HOME
export JAVA_HOME=/usr/java/jdk
export PATH=$JAVA_HOME/bin:$JAVA_HOME/bin:$PATH
export CLASSPATH=$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/lib/tools.jar' >>/etc/profile
[root@localhost ~]# source /etc/profile

2、安装zookeeper集群

  • 下载地址:https://archive.apache.org/dist/zookeeper/
[root@localhost ~]# wget 'https://archive.apache.org/dist/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz'

[root@localhost ~]# ansible all -m copy -a "src=zookeeper-3.4.14.tar.gz dest=~/zookeeper-3.4.14.tar.gz"
[root@localhost ~]# ansible all -m shell -a "tar xf zookeeper-3.4.14.tar.gz -C /opt"
[root@localhost ~]# ansible all -m shell -a "ln -s /opt/zookeeper-3.4.14/ /opt/zookeeper"

创建日志目录和数据存放目录

# 创建日志目录和数据存放目录
[root@localhost ~]# ansible all -m shell -a "mkdir -pv /data/zookeeper/data /data/zookeeper/logs"

创建zk的配置文件

[root@localhost ~]# vi /opt/zookeeper/conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper/data
dataLogDir=/data/zookeeper/logs
clientPort=2181
server.1=10.4.7.11:2888:3888        # 这里使用的是dns解析后的域名,也可以用ip地址
server.2=10.4.7.12:2888:3888
server.3=10.4.7.13:2888:3888

[root@localhost ~]# ansible all -m copy -a "src=/opt/zookeeper/conf/zoo.cfg dest=/opt/zookeeper/conf/zoo.cfg"

配置zk集群的myid文件(每个节点id不能一样)

# 在10.4.7.11[root@localhost ~]# echo 1 > /data/zookeeper/data/myid

# 在10.4.7.12[root@localhost ~]# echo 2 > /data/zookeeper/data/myid

# 在10.4.7.13[root@localhost ~]# echo 3 > /data/zookeeper/data/myid

# 也可以用这条for循环批量执行
[root@master01 ~]# num=1;for i in 10.4.7.1{1..3};do ssh $i "echo $num >/data/zookeeper/data/myid";let num++;done

启动zookeeper

[root@localhost ~]# /opt/zookeeper/bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

# 查看zookper进程(默认监听2181端口)
[root@localhost ~]# ps aux |grep zook
root      58520  1.1  1.3 3836376 52768 pts/0   Sl   02:06   0:00 /usr/java/jdk/bin/java -Dzookeeper.log.dir=. -Dzookeeper.root.logger=INFO,CONSOLE -cp /opt/zookeeper/bin/../zookeeper-server/target/classes:/opt/zookeeper/bin/../build/classes:/opt/zookeeper/bin/../zookeeper-server/target/lib/*.jar:/opt/zookeeper/bin/../build/lib/*.jar:/opt/zookeeper/bin/../lib/slf4j-log4j12-1.7.25.jar:/opt/zookeeper/bin/../lib/slf4j-api-1.7.25.jar:/opt/zookeeper/bin/../lib/netty-3.10.6.Final.jar:/opt/zookeeper/bin/../lib/log4j-1.2.17.jar:/opt/zookeeper/bin/../lib/jline-0.9.94.jar:/opt/zookeeper/bin/../lib/audience-annotations-0.5.0.jar:/opt/zookeeper/bin/../zookeeper-3.4.14.jar:/opt/zookeeper/bin/../zookeeper-server/src/main/resources/lib/*.jar:/opt/zookeeper/bin/../conf::/usr/java/jdk/lib:/usr/java/jdk/lib/tools.jar:/usr/java/jdk/lib:/usr/java/jdk/lib/tools.jar -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false org.apache.zookeeper.server.quorum.QuorumPeerMain /opt/zookeeper/bin/../conf/zoo.cfg
root      59006  0.0  0.0 112708   976 pts/0    S+   02:07   0:00 grep --color=auto zook


zookpeer查看集群状态

[root@localhost ~]# /opt/zookeeper/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/bin/../conf/zoo.cfg
Mode: follower   # 这一台是slave

3、安装kafka集群

  • kafka集群是依赖zookeeper集群的。
  • 下载地址:https://archive.apache.org/dist/
[root@localhost ~]# wget 'https://archive.apache.org/dist/kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz'

[root@localhost ~]# ansible all -m shell -a "mkdir /opt/module"
[root@localhost ~]# tar xf kafka_2.11-0.11.0.0.tgz -C /opt/module/
[root@localhost ~]# cd /opt/module/
[root@localhost module]# mv kafka_2.11-0.11.0.0 kafka

修改 server.properties 配置文件

[root@localhost module]# cd kafka/
[root@localhost kafka]# cd config/

[root@localhost config]# vim server.properties
……省略部分
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0                     # 这个broker.id每台kafka节点都不能重复,且要是整数

# Switch to enable topic deletion or not, default value is false
delete.topic.enable=true         # 将这条注释去掉,默认创建的topic不可以删除的,我们去掉注释后才可以删除topic

#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://10.4.7.11:9092      # 默认监听的端口是9092,这里最好加上本机的ip,否则在创建topic副本时容易出错

# A comma seperated list of directories under which to store log files
log.dirs=/opt/module/kafka/logs  # 这个不是日志目录,是kafka暂存数据的目录

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168         # 暂存数据保留时间,默认是7# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824   # 一个log数据文件的最大的大小,超过这个大小,将会新建一个

# root directory for all kafka znodes.
zookeeper.connect=10.4.7.11:2181,10.4.7.12:2181,10.4.7.13:2181         # 连接zookpeer集群地址信息


# 创建数据及日志的存放目录
[root@localhost config]# mkdir /opt/module/kafka/logs

设置环境变量

[root@localhost config]# echo '
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin' >> /etc/profile

[root@localhost config]# source /etc/profile

将10.4.7.11上的kafka目录分发给10.4.7.12和10.4.7.13

[root@localhost config]# for i in 10.4.7.{12..13};do scp -pr /opt/module/kafka $i:/opt/module/;done

修改10.4.7.12和10.4.7.13的 server.properties 配置文件中的 broker.id

# 在10.4.7.12上修改
[root@localhost ~]# vim /opt/module/kafka/config/server.properties
broker.id=1

# 在10.4.7.13上修改
[root@localhost ~]# vim /opt/module/kafka/config/server.properties
broker.id=2

先启动zookpeer,再启动kafka

# 启动命令的 -daemon 参数,作用是后台运行
[root@localhost ~]# for host in 10.4.7.{11..13};do \
ssh root@${host} " /opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties";done

# 查看运行的java进程
[root@localhost ~]# jps
69681 Jps
95160 jenkins.war
65338 QuorumPeerMain

关闭kafka和zookeeper的命令

[root@localhost ~]# /opt/zookeeper/bin/zkServer.sh stop

[root@localhost ~]# kafka-server-stop.sh

三、常用kafka命令

参考地址:https://www.cnblogs.com/chen-chen-chen/p/12176980.html

  • 创建topic命令: kafka-topics.sh --create --zookeeper <zookeeper地址:端口> --partitions <分区数> --replication-factor <副本数> --topic <topic名称>
  • 删除topic: kafka-topics.sh --zookeeper <zookeeper地址:端口> --delete --topic <topic名称>
  • 查看topic:kafka-topics.sh --zookeeper <zookeeper地址:端口> --list

选项说明:

--topic 定义 topic名
--replication-factor 定义副本数
--partitions 定义分区数

1、创建和删除和查看topic

# 创建topic
[root@localhost config]# kafka-topics.sh  --create \
--zookeeper 10.4.7.11:2181 \             # zookpeer地址
--topic test-1 \                          # 定义topic名称
--partitions 2 \                         # 指定分区数
--replication-factor 2                   # 指定副本数,注意:副本数不能大于kafka的节点数(是每个分区备份数,并且每个分区的备份分布在不同节点)


# 查看topic列表
[root@localhost config]# kafka-topics.sh --zookeeper 10.4.7.11:2181 --list
test-1

# 查看某个topic的详情
[root@localhost ~]# kafka-topics.sh --zookeeper 10.4.7.11:2181 --describe --topic test-1
# topic名称     分区总数                 副本总数
Topic:test-1    PartitionCount:2        ReplicationFactor:2     Configs:
        Topic: test-1   Partition: 0    Leader: 1       Replicas: 1,0   Isr: 1,0
        Topic: test-1   Partition: 1    Leader: 2       Replicas: 2,1   Isr: 2,1

# 删除topic
[root@master01 ~]# kafka-topics.sh \
--zookeeper 192.168.2.203:2181 \
--delete \                             # 指定动作为delete
--topic first                          # 指定topic名称
  • FAQ:

    • 问题描述:使用 kafka-topics.sh --create --zookeeper 10.4.7.11:2181 --topics first --partitions 2 --replication-factor 2 报如下错误"ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: replication factor: 2 larger than available brokers: 1"
    • 问题原因:kafka集群为构建成功,指定的副本数大于集群节点数(broke的个数)
    • 解决办法:检查 server.properties 配置文件中 zookeeper.connect 是否配置正确
  • 补充知识:当我们创建topic后,会在logs数据目录下生成"topic名称-分区id"为格式的文件,如下所示

# 在10.4.7.1110.4.7.13上分别有备份,因为副本数选的是2
[root@master01 bin]# ansible all -m shell -a "ls -l /opt/module/kafka/logs/ |grep test"
10.4.7.11 | CHANGED | rc=0 >>
drwxr-xr-x 2 root root   161 Aug  5 13:14 test-1-1
10.4.7.12 | CHANGED | rc=0 >>
drwxr-xr-x 2 root root   141 Aug  5 13:16 test-1-0
drwxr-xr-x 2 root root   141 Aug  5 13:16 test-1-1
10.4.7.13 | CHANGED | rc=0 >>
drwxr-xr-x 2 root root   141 Aug  5 13:15 test-1-0

在这里插入图片描述

2、生产和消费kafka消息

  • 我们打开两个终端进行测试
    • 一个终端执行生产者命令,进行推送消息
    • 另一个终端执行消费者命令,进行接收消息
# 生产者终端,发出hello world 消息
[root@master01 bin]# kafka-console-producer.sh --topic test-1 --broker-list 10.4.7.11:9092,10.4.7.12:9092,10.4.7.13:9092
>hell world

# 消费者终端,接收到了生产这个推送的消息
[root@master01 ~]# kafka-console-consumer.sh --topic test-1 --bootstrap-server 10.4.7.11:9092 
hello world
  • 我们再开一个终端(上述是两个终端实时推送和接收的,这里模拟异步)
    • 需要加上 --from-beginning 参数,从头开始接收数据
# 加上 --from-beginning 参数,成功接收到消息
[root@localhost ~]# kafka-console-consumer.sh --topic test-1 --bootstrap-server 10.4.7.11:9092 --from-beginning
hell world


  • 新版本的kafka消息是存储在kafka本地 (系统topic ) 中,因此在logs数据目录下可以查看到系统主题"__consumer_offsets",系统主题默认有50个分区用于暂存数据
    在这里插入图片描述

3、修改分区数

  • 使用 --alter 参数,修改topic分区数
[root@localhost ~]# kafka-topics.sh --zookeeper 10.4.7.11:2181 --alter --topic first --partitions 6

4、查看所有消费组

# 查看所有的消费者组id
[root@localhost ~]# kafka-consumer-groups.sh --bootstrap-server 10.4.7.11:9092 --list
console-consumer-31961

# 查看消费组中的 offset
[root@localhost ~]# kafka-consumer-groups.sh --bootstrap-server 10.4.7.11:9092 --describe --group console-consumer-31961
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).

Consumer group 'console-consumer-31961' has no active members.

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
test01                         1          1               1               0          -                                                 -                              -
test01                         0          1               1               0          -                                                 -                              -

修改kafka数据存放时间

//查看zk的连接路径
cat /home/admin/kafka-config/server.properties | grep "zookeeper.connect="
//修改默认时间
sh /home/admin/KafkaProxy/bin/kafka-configs.sh -zookeeper {zk的连接路径} --alter --add-config retention.ms={新设置的值,单位ms,1天为86400000}  --entity-name {Topic名称} --entity-type topics
//示例
sh /home/admin/KafkaProxy/bin/kafka-configs.sh -zookeeper 10.180.15.152:2188,10.180.15.151:2188,10.180.15.150:2188/kafka-private-clusters/cluster-private-paas-default --alter --add-config retention.ms=86400000  --entity-name test --entity-type topics
Completed Updating config for entity: topic 'test'.

四、kafka数据与日志分离

  • 这个步骤其实是在安装是,在配置文件中就应该修改的
  • kafka启动时会自动在工作kafka目录下创建logs目录来存放日志
  • 如果我们将 log.dirs 参数的路径也写成kafka工作目录下logs,那么日志目录就会和数据目录存放在一起
  • 因此,数据与日志分离,其实就是让数据目录和日志目录路径不要相同即可
    1、首先我们先关掉kafka,并删除logs数据目录
[root@localhost ~]# ansible all -m shell -a " /opt/module/kafka/bin/kafka-server-stop.sh"

[root@localhost ~]# ansible all -m shell -a "rm -rf /opt/module/kafka/logs"

2、还要清除zookeeper中的数据,模拟成新安装的kafka

# 进入zookeeper
[root@localhost ~]# /opt/zookeeper/bin/zkCli.sh

# 查看zk中的数据(除了zookeeper外,全是kafka的数据)
[zk: localhost:2181(CONNECTED) 0] ls /
[cluster, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, latest_producer_id_block, config]

# 删除zk中的数据(用rmr来删除,这里就不一一列出了,除了zookeeper,全删了)
[zk: localhost:2181(CONNECTED) 16] rmr  /consumers

# 最后再查看,只剩下zookeeper了
[zk: localhost:2181(CONNECTED) 17] ls /
[zookeeper]

********** 还有暴力一点的办法,直接删除zk的数据目录 *****************
# 首先关掉zk
[root@localhost ~]# ansible all -m shell -a "/opt/zookeeper/bin/zkServer.sh stop"
# 删除version-2目录
[root@localhost ~]# ansible all -m shell -a "rm -rf /data/zookeeper/logs/version-2 /data/zookeeper/data/version-2"

# 再次启动zk
[root@localhost ~]# ansible all -m shell -a "/opt/zookeeper/bin/zkServer.sh start"

3、修改kafka配置文件 server.properties

# 修改数据目录路径为/opt/module/kafka/data
[root@localhost ~]# vim /opt/module/kafka/config/server.properties
log.dirs=/opt/module/kafka/data         # 将log.dirs路径改成/opt/module/kafka/data

4、启动kafka

[root@localhost ~]# kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties

# 启动后,发现自动创建了logs目录,以及我们指定的数据目录data
[root@localhost ~]# ls /opt/module/kafka/
bin  config  data  libs  LICENSE  logs  NOTICE  site-docs

5、创建一个topic测试

[root@master01 ~]# kafka-topics.sh --create --zookeeper 10.4.7.11:2181 --partitions 2 --replication-factor 2 --topic test01
Created topic "test01".

# 多出了新建的test01-1的分区目录
[root@master01 ~]# ll /opt/module/kafka/data/
-rw-r--r-- 1 root root   0 Aug  6 11:03 cleaner-offset-checkpoint
-rw-r--r-- 1 root root   4 Aug  6 11:08 log-start-offset-checkpoint
-rw-r--r-- 1 root root  54 Aug  6 11:03 meta.properties
-rw-r--r-- 1 root root  15 Aug  6 11:08 recovery-point-offset-checkpoint
-rw-r--r-- 1 root root  15 Aug  6 11:08 replication-offset-checkpoint
drwxr-xr-x 2 root root 141 Aug  6 11:07 test01-1


# 查看logs目录,日志文件依旧在logs目录下
[root@master01 ~]# ll /opt/module/kafka/logs/
-rw-r--r-- 1 root root  8673 Aug  6 11:08 controller.log
-rw-r--r-- 1 root root     0 Aug  6 11:03 kafka-authorizer.log
-rw-r--r-- 1 root root     0 Aug  6 11:03 kafka-request.log
-rw-r--r-- 1 root root  5984 Aug  6 11:03 kafkaServer-gc.log.0.current
-rw-r--r-- 1 root root 18391 Aug  6 11:07 kafkaServer.out
-rw-r--r-- 1 root root   172 Aug  6 11:03 log-cleaner.log
-rw-r--r-- 1 root root 18391 Aug  6 11:07 server.log
-rw-r--r-- 1 root root  6873 Aug  6 11:07 state-change.log

五、深入了解kafka

1、kafka的工作流程

在这里插入图片描述

  • Kafka 中消息是以topic 进行分类的,生产者生产消息,消费者消费消息,都是面向topic的。
  • topic 是逻辑上的概念,而partition 是物理上的概念,每个partition 对应于一个数据目录下的 log 文件,该log 文件中存储的就是producer 生产的数据。
  • Producer 生产的数据会被不断追加到该 log 文件末端,且每条数据都有自己的offset。消费者组中的每个消费者,都会实时记录自己消费到了哪个offset(偏移量),以便出错恢复时,从上次的位置继续消费。
    • 如上图,一共生产了15条消息(6+4+5),每个partition中的消息偏移量都是从0开始的(因此kafka的数据拉取时,只能保证区内有序,及在同一个分区内的数据是有序排列的,但是多个分区之间并不一定是有序的)
    • follower会主动同步leader
# log文件示例:
[root@master01 ~]# cd /opt/module/kafka/data/
[root@master01 data]# ll
total 16
-rw-r--r-- 1 root root   0 Aug  6 11:03 cleaner-offset-checkpoint
-rw-r--r-- 1 root root   4 Aug  6 14:02 log-start-offset-checkpoint
-rw-r--r-- 1 root root  54 Aug  6 11:03 meta.properties
-rw-r--r-- 1 root root  15 Aug  6 14:02 recovery-point-offset-checkpoint
-rw-r--r-- 1 root root  15 Aug  6 14:02 replication-offset-checkpoint
drwxr-xr-x 2 root root 141 Aug  6 11:07 test01-1

# 进入到 test01-1分区文件中
[root@master01 data]# cd test01-1/
# 可以看到两个重要文件,一个是".log"结尾的文件,是存放实际数据的,一个是".index"结尾的文件
[root@master01 test01-1]# ll
-rw-r--r-- 1 root root 10485760 Aug  6 11:07 00000000000000000000.index
-rw-r--r-- 1 root root        0 Aug  6 11:07 00000000000000000000.log
-rw-r--r-- 1 root root 10485756 Aug  6 11:07 00000000000000000000.timeindex
-rw-r--r-- 1 root root        0 Aug  6 11:07 leader-epoch-checkpoint

  • 如上所示,有两个重要数据文件:
    • 一个是".log"结尾的文件00000000000000000000.log,是存放实际数据的(但是这个文件只存数据,因此需要index文件来快速索引),一个分区中会有多个.log数据文件,也叫segment(因为在配置文件中限定了一个sgement的大小,超出这个大小就会新建一个.log文件)。
    • 一个是".index"结尾的文件,存储message的offset和offset之前message的字节数大小,帮助快速定位数据,如下图示例

      在这里插入图片描述
      由于生产者生产的消息会不断追加到log 文件末尾,为防止log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制,将每个partition 分为多个segment。
      每个segment对应两个文件——“.index”文件和“.log”文件。
      这些文件位于一个文件夹下,该文件夹的命名规则为:topic 名称+分区序号。例如,first 这个topic 有三个分区,则其对应的文件夹为first-
      0,first-1,first-2

2、kafka的生产者

2.1、分区策略
  • 分区的原因:
    • 1、方便在集群中扩展,每个 partition 可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群可以适应任何大小的数据
    • 2、可以提高并发,因为可以以Partition 为单位进行读写。
  • 分区的原则:
    • 我们需要将 producer 发送的数据封装成一个 ProducerRecord对象

      代码使用如下图:在这里插入图片描述

    • 1、指明 partition 的情况下,直接将指明的值直接作为 partition 值
    • 2、没有指明 partition 值,但是有key的情况下(我们传入的数据时键值对形式的数据),将key的hash值与topic的partition数进行取余得到 partition 值
    • 3、既没有partition值又没有key的情况下(传入的数据不是键值对形式),第一次调用时随机生成一个整数(后面每次调用都会在这个整数上自增),将这个值与topic可用的partition总数取余得到partition值,也就是常说的 round-robin 轮询算法
2.2、生产者数据可靠性的保证
  • 为保证 producer 发送的数据能够可靠的发送到指定的topic,topic的每个partition收到发送的数据后,都需要向 producer 发送ack(acknowledgement确认收到 ),如果producer收到ack,就会进行下一轮的发送,否则重新发送数据

    在这里插入图片描述

2.2.1、副本数据同步策略
方案优点缺点
半数以上完成同步,就发送ack延迟低选举新的leader时,容忍n台节点故障,那么就需要存在2n+1个节点(n个节点故障,那么剩余的n+1超半数的节点中,肯定会有一台完全备份成功的)
全部完成同步,才发送ack选举新的leader时,容忍n台节点故障,那么就需要n+1台副本总数(因为是完全同步,因此n+1台中可能会有一台完成同步了)延迟高
  • Kafka 选择了第二种方案(全部同步完成),原因如下:
    • 1.同样为了容忍 n 台节点的故障,第一种方案需要 2n+1 个副本,而第二种方案只需要 n+1
      个副本,而 Kafka 的每个分区都有大量的数据,第一种方案会造成大量数据的冗余。
    • 2.虽然第二种方案的网络延迟会比较高,但网络延迟对 Kafka 的影响较小。
2.2.2、ISR同步副本策略
  • ISR同步副本集合的目的在于防止leader挂掉后,选取新的leader,尽量不丢失数据(在新版本的kfaka中,采取将与leader同步时间较低的follower加入到ISR集合,作为leader备选。在老版本中,还有将同步的数据条数多的follower加入ISR的策略,在新版本.0.9之后中被剔除了)
  • 由于 kafka 选择了全部同步完成后 发送ack的副本策略,就会存在这样一个问题。:
    • 设想以下情景:leader 收到数据,所有 follower 都开始同步数据,但有一个 follower,因为某种故障,迟迟不能与 leader 进行同步,那 leader 就要一直等下去,而不发送 ack的问题。
  • 解决上述弊端的办法:
    • Leader 维护了一个动态的 in-sync replica set (ISR),意为和 leader 保持同步的 follower 集合(leader只需要和ISR集合中的follower实现完全同步即可发送ack,leader故障了也会优先从ISR集合中选取)。当 ISR 中的 follower 完成数据的同步之后,leader 就会给 follower 发送 ack。如果 follower 长时间 未 向 leader 同 步 数 据 ,则 该 follower 将 被 踢 出 ISR , 该 时 间 阈 值 由replica.lag.time.max.ms 参数设定。Leader 发生故障之后,就会从 ISR 中选举新的 leader。
  • 我们在查看topic的描述信息时,可以看到 Isr信息
[root@localhost ~]# kafka-topics.sh --zookeeper 10.4.7.11:2181 --describe --topic test01
Topic:test01    PartitionCount:2        ReplicationFactor:2     Configs:
        Topic: test01   Partition: 0    Leader: 1       Replicas: 1,0   Isr: 1,0
        Topic: test01   Partition: 1    Leader: 2       Replicas: 2,1   Isr: 2,1

2.2.3、ack参数配置

对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,
所以没必要等 ISR 中的 follower 全部接收成功。所以 Kafka 为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置。

  • acks参数配置:
    • acks 为 0 :producer 不等待 broker 的 ack,这一操作提供了一个最低的延迟,broker 一接收到数据还没有写入磁盘就已经返回,当 broker 故障时有可能丢失数据;
    • acks 为 1 :producer 等待 broker 的 ack,partition 的 leader 落盘成功后返回 ack(只等待leader写完,不等待follower),如果在 follower 同步成功之前 leader 故障,那么将会丢失数据;
    • acks 为 -1 或 all :producer 等待 broker 的 ack,partition 的 leader 和 follower(指的是ISR中的follower)全部落盘成功后才返回 ack。但是如果在 follower 同步完成后,broker 发送 ack 之前,leader 发生故障,那么会造成数据重复。

acks = -1 造成数据重复的案例(如图所示):

  • leader收到数据并且follower完成同步,但是在leader还没发ack是发生故障,会选取一台follower代替leader。而其实producer没有收到ack,会再次发送之前的数据过来,造成了数据重复。
    在这里插入图片描述
2.2.4、故障处理细节(保证生产和接收的数据一致性)
  • LEO(Log End Offset):指的是每个副本最大的 offset;
  • HW(High Watermark):指的是消费者能见到的最大的 offset,ISR 队列中最小的 LEO。(已经完成数据同步的最大offset)
    在这里插入图片描述
  • 1、follower 故障:
    follower 发生故障后会被临时踢出 ISR,待该 follower 恢复后,follower 会读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 leader 进行同步。等该 follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader 之后,就可以重新加入 ISR 了。
  • 2、leader 故障
    leader 发生故障之后,会从 ISR 中选出一个新的 leader,之后,为保证多个副本之间的数据一致性,其余的 follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 leader同步数据。

注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

2.3、Exactly Once 语义

首先了解下 “At Least Once语义” 和 “At Most Once语义”:

  • At Least Once 语义(至少一次):将服务器的ACK级别设置为-1,可以保证producer 到server之间的数据不丢失,即为At Least Once语义

  • At Most Once语义(至多一次):将服务器ACK级别设置为0,可以保证生产者每条消息只被发送一次,即为 At Most Once语义

  • 上面两种 语义,At Least Once 可以保证数据不丢失,但是不能保证数据不重复;相对的 At Most Once 能保证数据不重复,却不能保证数据不丢失。而 Exactly Once 语义,就是为了解决上面两种语义的弊端,既能保证数据不重复,又能保证数据不丢失(应用场景:一些交易信息数据)

Exactly Once 语义

  • 引入了"幂等性" 的特性:所谓 "幂等性"就是值producer不论向server发送多少次重复的数据,server端只会持久化一条。幂等性 结合 At Least Once 语义,就构成了Exactly Once语义(即在保证数据完整的同时,去重处理)
  • At Least Once + 幂等性 = Exactly Once
  • 要启用"幂等性",只需要将producer的参数中 enable.idompotence 设置为true即可。kafka 的幂等性 实现的其实就是将下游需要做去重的处理放在了数据上游。开启了幂等性的producer在初始化的时候会被分配一个PID,发往同一个Partition的消息会附带 Squence Number。而 Broker端会对<PID,Partition,Squence Number>做缓存,当具有相同主键的消息提交时,Broker只会持久化一条。 (缺点: PID 重启就会变化,同时不同的 Partition 也具有不同主键,所以幂等性无法保证跨分区跨会话的 Exactly Once)

3、kafka的消费者

3.1 消费方式
  • consumer采用 pull 的方式从broker中读取数据。(如果采用push的方式很难适应消费速率不同的消费者,因为发送的速率是由broker来决定的,它的目标是尽可能以最快的速度将消息传递,这样就容易造成处理性能慢的consumer来不及处理消息。典型的表现就是拒绝服务以及网络拥塞。)
  • Pull方式的不足之处:如果 kafka 没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka 的消费者在消费数据时会传入一个时长参数 timeout,如果当前没有数据可供消费,consumer 会等待一段时间之后再返回,这段时长即为 timeout。
3.2、分区分配策略(针对消费组)
  • 一个 consumer group 中有多个 consumer,一个 topic 有多个 partition,所以必然会涉及到 partition 的分配问题,即确定那个 partition 由哪个 consumer 来消费。
  • Kafka 有两种分配策略,一是 RoundRobin,一是 Range。
3.2.1、RoundRobin 策略
  • 会将这个 consumer group 订阅的所有 topic 当成一个整体,对这个整体中的所有partition用hash算法进行转换后排序,之后 consumer group 中的消费者一一对应,轮询读取
  • 由于 RoundRobin 策略,会将消费组订阅的所有topic中的partition打散后重新排序轮询。因此采用这种策略有一个前提条件 - - - 消费组中的所有消费者订阅的主题是一样的
    在这里插入图片描述
3.2.2、Range策略(kafka默认的策略)
  • 按照主题进行划分的策略。(比方说有3个主题topic1-3,消费组中有3个消费者,消费者A订阅了topic1和topic2,消费者B订阅了topic2和topic3。公共订阅的topic会按partition数平均分配。但是同样存在弊端,如果共同订阅的topic有很多个并且里面的partition数是奇数,那么就可能导致消费者A被分配的partition原多于消费者B : 因为是以每个topic中的partition进行平分,每个共同的topic分配给消费者A的就会多一个)

在这里插入图片描述
总结:消费组中的消费者订阅的topic相同,建议采用 RoundRobin 策略,可以尽可能的平均分配。如果消费组中的消费者订阅的topic不相同,就建议使用默认的range策略

3.2.3、offset 的维护

由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置的继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。
在这里插入图片描述

查看zookeeper中的记录信息(在0.9版本之前)

[root@localhost ~]# /opt/zookeeper/bin/zkCli.sh
# 查看/下所有信息
[zk: localhost:2181(CONNECTED) 0] ls /
[cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, latest_producer_id_block, config]

# 查看broker,可以看到ids, topics, seqid三个注册信息
[zk: localhost:2181(CONNECTED) 1] ls /brokers
[ids, topics, seqid]
# 查看ids,其实就是在kafka配置中的 broker.id
[zk: localhost:2181(CONNECTED) 2] ls /brokers/ids
[0, 1, 2]
# 查看topic,可以看到创建的topic名称
[zk: localhost:2181(CONNECTED) 3] ls /brokers/topics
[test01, __consumer_offsets]

******************************************
# 查看consumer,可以看到一个消费者组,其实我们在使用消费者消费数据时未指定消费组,会自动创建一个消费组
[zk: localhost:2181(CONNECTED) 6] ls /consumers
[console-consumers-88502]

# 查看消费组信息,可以看到 ids, owners, offsets 三个信息
[zk: localhost:2181(CONNECTED) 6] ls /consumers/console-consumers-88502
[ids, owners, offsets]

# 继续查看offsets,里面会有订阅的topic名称
[zk: localhost:2181(CONNECTED) 6] ls /consumers/console-consumers-88502/offsets
[test01]

# 继续查看topic,可以看到分区id
[zk: localhost:2181(CONNECTED) 6] ls /consumers/console-consumers-88502/offsets/test01
[0, 1]

# 使用get,可以获取到保存在zk中的偏移量信息
[zk: localhost:2181(CONNECTED) 6] ls /consumers/console-consumers-88502/offsets/test01/0
1                                                      # 这里显示的就是当前的offerset
cZxid = 0x300000003e
ctime = Sat Jul 13 14:00:32 CST 2021
mZxid = 0x3000000055
mtime = Sat Jul 13 14:00:45 CST 2021
pZxid = 0x300000003e
cVersion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 1
numChildren = 0

查看保存在kafka本地的数据信息(0.9版本之后)

  • Kafka 0.9 版本之前,consumer 默认将 offset 保存在 Zookeeper 中,从 0.9 版本开始,consumer 默认将 offset 保存在 Kafka 一个内置的 topic 中,该 topic 为__consumer_offsets。
  • 需要修改配置文件consumer.properties中 exclude.internal.topics=false参数,是的消费者可以消费系统主题中的消息
  1. 修改配置
[root@localhost ~]# echo "
# 使得消费者可以消费kafka的系统主题
exclude.internal.topics=false" >> /opt/module/kafka/config/consumer.properties
  1. 读取 offset

2.1、kafka-0.11.0.0 之前版本,现在基本上不用了

[root@localhost ~]# kafka-console-consumer.sh --topic __consumer_offsets --zookeeper 10.4.7.11:2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config /opt/module/kafka/config/consumer.properties --from-beginning

2.2、 kafka-0.11.0.0 之后版本:

[root@localhost ~]# kafka-console-consumer.sh --topic __consumer_offsets --zookeeper 10.4.7.11:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageForm atter" --consumer.config /opt/module/kafka/config/consumer.properties --from-beginning
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐