1引言

数据中台因业务需要将要引入开源框架kafka。

本文主要内容为kafka接入数据中台的技术验证预案和实施建设预案。

2验证预案

2.1性能验证

验证kafka的高吞吐率,主要验证producer和consumer的发送和消费吞吐率。

使用kafka manager 监控。

发送量测试

测试环境设定:broker集群(3节点),topic(1个),partition(3节点*2),replication(3)。

1、在borker不变的情况下,producer数量变化,测试多个producer发送消息的吞吐性能。

2、在borker不变的情况下,producer数量不变,消息数量变化,测试多个producer发送消息的吞吐性能。

3、在borker不变的情况下,producer数量不变,消息大小变化,测试多个producer发送消息的吞吐性能。

4、在borker不变的情况下,producer数量不变,partition数量变化,测试多个producer发送消息的吞吐性能。

5、在borker不变的情况下,producer数量不变,replication变化,测试多个producer发送消息的吞吐性能。

6、同时挂载producer和consumer,测试发送消息和消费消息的吞吐性能

7、在borker数量变化的情况下,测试发送消息和消费消息的吞吐性能。

结论预设:随着producer数量的增加,每秒总的消息量会增加。随着消息变大,每秒总的消息量会增加。partition越多,在某阈值范围内,吞吐率提高。replication越大,吞吐率下降。

消费量测试

测试环境设定:broker集群(3节点),topic(1个),partition(3节点*2),replication(3)      。

1、在borker不变的情况下,consumer数量变化,测试多个consumer接收消息的消费性能。

2、在borker不变的情况下,consumer数量不变,消息数量变化,测试多个consumer接收消息的吞吐性能。

3、在borker不变的情况下,consumer数量不变,消息大小变化,测试多个consumer接收消息的吞吐性能。

4、在borker不变的情况下,consumer数量不变,partition数量变化,测试多个consumer接收消息的吞吐性能。

5、在borker不变的情况下,consumer数量不变,replication变化,测试多个consumer接收消息的吞吐性能。

6、同时挂载producer和consumer,测试发送消息和消费消息的吞吐性能

7、在borker数量变化的情况下,测试发送消息和消费消息的吞吐性能。

结论预设:随着consumer数量的增加,消息消费的速度会越快。

broker容量测试

关闭consumer,通过producer发送海量数据到broker,测试broker的容量承载能力。

异步消息测试

配置消息异步发送,按照吞吐率测试方案测试producer和consumer的吞吐性能。

配置消息异步发送,查看数据丢失情况。

同步消息测试

配置消息同步发送,按照吞吐率测试方案测试producer和consumer的吞吐性能。

配置消息同步发送,查看数据丢失情况。

消息重发测试

测试broker进程异常状态下,producer和consumer的消息重发机制。

offset相关测试

修改offset后,会产生消息重复、丢失、乱序等情况,具体进行测试。

partition方式测试

设定partition分配的方式,随机或者hash,来测试具体情况

压缩比测试

分别设定不同的压缩算法,GZIP, Snappy, LZ4,再次进行上述吞吐率的测试。

zookeeper下线测试

zookeeper节点下线后,测试kafka集群的稳定情况。

网络带宽占用测试

容错测试

满负荷测试

加大producer和consumer的数量及数据量,直至主机资源消耗的70%以上,观察运行时间长度。

崩溃测试

加大producer和consumer的数量及数据量,直至主机资源耗尽。

断网断电测试

断网断电测试broker的数据丢失情况。

2.2功能验证

验证kafka提供的各种功能。

生产消息

编写producer发送消息。

消费消息

编写consumer消费消息。

主题管理

测试topic的创建、更新、删除等管理功能。

分区管理

测试partition的添加、删除等管理功能。

测试topic与多个partition关联后的管理功能。

日志管理

测试集群报错后,kafka提供的日志报错功能。

集群监控

测试kafka manager对集群metrics的监控功能,集群状态、topic、partition、replica、consumer group、log等。

2.3集群验证

对kafka集群进行架构验证

新增节点

新增节点后,kafka集群的运行情况。

删除节点

删除节点后,kafka集群的运行情况。

zookeeper下线

删除zookeeper节点后,kafka集群的运行情况。

断网测试

断网后,kafka集群的运行情况。

断网再接入网络后,kafka集群的运行情况。

断电测试

断电重启后,kafka集群的运行情况。

3实施预案

kafka接入数据中台,需要从如下方面准备:

架构设计

 

如上图所示,kafka按分布式集群方案部署后,需要在应用系统端定制producer来采集数据,在数据中台端定制consumer来往中台写入数据。

容量规划

从数据流转、存储的角度,搞清楚数据采集端的数据容量、broker集群的数据容量、数据消费端的数据容量。

首先需要搞清楚如下情况:

有哪些数据要接入

数据所在物理位置

数据存储的应用软件

每个producer的部署位置

每个producer的部署环境

每个producer采集数据的安全权限

每个producer采集数据的频度

每个producer的资源消耗(主机、带宽)

每个consumer group个数

每个consumer group中consumer的个数

每个consumer连接中台的技术接口

每个consumer访问中台的权限

每个consumer的部署位置

每个consumer对中台资源的消耗

其他

总之要汇总得出如下结论:

producer的数量

producer的位置

producer的日数据总量

producer的并发量

producer的并发数据量

producer资源配置(cpu、内存、硬盘、带宽)

borker集群个数

broker集群资源配置(cpu、内存、硬盘、带宽)

broker日数据量峰值

broker日存储大数据

topic数量

topic配置

topic关联partition的数量

replica配置量

consumer group的数量

每个consumer group中consumer的数量

每个consumer对接中台的接口

每个consumer对接中台权限

consumer日消费数据总量

consumer消费数据的并发量

consumer消费数据的并发数据量

consumer资源配置(cpu、内存、硬盘、带宽)

其他

数据采集

在应用系统端,定制并部署producer,用来采集数据。

producer对接应用系统。

producer采集应用数据到broker。

producer在应用系统测部署运维。

broker部署

依据容量规划,规划broker集群的部署位置、资源配置、主机规划、带宽要求等。

写入中台

在数据中台端,定制并部署consumer,往中台写入数据。

consumer对接数据中台。

consumer消费数据到数据中台。

consumer在数据中台测部署运维。

技术培训

需要协调应用系统开发商和数据中台开发商,共同分析需求,定制producer和consumer。

对接应用系统开发商,编写producer。

对接数据中台开发商,编写consumer。

日常运维

运维团队通过kafka manager运维kafka集群,改造manager,能够自动预警、报警。

运维团队直接运维producer。

运维团队直接运维consumer。

附:产品分析

概念分析

产品定义

Kafka是一种高吞吐量、分布式、基于发布/订阅的消息系统,最初由LinkedIn公司开发,使用Scala 语言编写。目前是Apache组织的开源项目。

kafka主要概念组件如下:

1、 broker:Kafka服务器,负责消息存储和转发。

2、 topic:消息主题,Kafka按照topic来分类消息。

3、 partition:topic的分区,一个topic可以包含多个partition,topic消息保存在各个partition 上。

4、 offset:消息在日志中的位置,可以理解是消息在partition 上的偏移量,也是代表该消息的唯一序号。

5、 Producer:消息生产者。

6、 Consumer:消息消费者。

7、 Consumer Group:消费者分组,每个Consumer 必须属于一个group。

8、 Zookeeper:保存着集群broker、topic、partition 等meta 数据;另外还负责broker 故障发现,partition leader 选举,负载均衡等功能。

工作机制

Kafka基于分布式高可用的borker提供强大的数据缓冲能力,producer对接数据源采集数据,并将数据发送到broker上,数据就在borker上缓存下来,直到consumer消费完数据为止。

Kafka基于分布式架构提供高容错功能,基于offset记录已经处理过的数据,消息写入topic有顺序,consumer记录自己的offset,topic数据存储在分区上,分区有副本,分布在不同节点上。

Kafka使用producer写入数据时,启用批次写入,这样可以避免在网络上频繁传输单个消息带来的延迟和带宽开销。假设网络带宽为10MB/S,一次性传输10MB的消息比传输1KB的消息10000万次显然要快得多。

Kafka提供consumer group概念。某些topic拥有数百万甚至数千万的消息量,如果仅仅靠单个消费者消费,那么消费速度会非常慢,所以我们需要使用消费组功能,同一个消费组的多个消费者就能分布到多个物理机器上以加速消费。每个消费者组都会有一个独一无二的消费者组id来标记自己。每一个消费者group可能有一个或者多个消费者,对于当前消费组来说,topic中每条数据只要被消费组内任何一个消费者消费一次,那么这条数据就可以认定被当前消费组消费成功。

Kafka虽然是基于磁盘做的数据存储,但却具有高性能、高吞吐、低延时的特点,其吞吐量动辄几万、几十上百万。Kafka是将消息记录持久化到本地磁盘中的,一般人会认为磁盘读写性能差,可能会对Kafka性能如何保证提出质疑。实际上不管是内存还是磁盘,快或慢关键在于寻址的方式,磁盘分为顺序读写与随机读写,内存也一样分为顺序读写与随机读写。基于磁盘的随机读写确实很慢,但磁盘的顺序读写性能却很高,一般而言要高出磁盘随机读写三个数量级,一些情况下磁盘顺序读写性能甚至要高于内存随机读写。

磁盘的顺序读写是磁盘使用模式中最有规律的,并且操作系统也对这种模式做了大量优化,Kafka就是使用了磁盘顺序读写来提升的性能。Kafka的message是不断追加到本地磁盘文件末尾的,而不是随机的写入,这使得Kafka写入吞吐量得到了显著提升 。

Kafka的message是按topic分类存储的,topic中的数据又是按照一个一个的partition即分区存储到不同broker节点。每个partition对应了操作系统上的一个文件夹,partition实际上又是按照segment分段存储的。这也非常符合分布式系统分区分桶的设计思想。

 通过这种分区分段的设计,Kafka的message消息实际上是分布式存储在一个一个小的segment中的,每次文件操作也是直接操作的segment。为了进一步的查询优化,Kafka又默认为分段后数据文件建立了索引文件,就是文件系统上index文件。这种分区分段+索引的设计,不仅提升了数据读取的效率,同时也提高了数据操作并行度。

消费组有如下三个特征:

1、每个消费组有一个或者多个消费者。

2、每个消费组拥有一个唯一性的标识id。

3、消费组在消费topic的时候,topic的每个partition只能分配给一个消费者。

Kafka的producer和consumer通常需要定制,所以对接各种数据源不灵活、不方便。

Kafka利用了操作系统本身的Page Cache,就是利用操作系统自身的内存而不是JVM空间内存。相比于使用JVM或in-memory cache等数据结构,利用操作系统的Page Cache更加简单可靠。首先,操作系统层面的缓存利用率会更高,因为存储的都是紧凑的字节结构而不是独立的对象。其次,操作系统本身也对于Page Cache做了大量优化,提供了 write-behind、read-ahead以及flush等多种机制。再者即使服务进程重启,系统缓存依然不会消失,避免了in-process cache重建缓存的过程。

      通过操作系统的Page Cache,Kafka的读写操作基本上是基于内存的,读写速度得到了极大的提升。 linux操作系统 “零拷贝” 机制使用了sendfile方法,允许操作系统将数据从Page Cache 直接发送到网络,只需要最后一步的copy操作将数据复制到 NIC 缓冲区,这样避免重新复制数据 。通过这种 “零拷贝” 的机制,Page Cache 结合 sendfile 方法,Kafka消费端的性能也大幅提升。这也是为什么有时候消费端在不断消费数据时,我们并没有看到磁盘io比较高,此刻正是操作系统缓存在提供数据。零拷贝并非指一次拷贝都没有,而是避免了在内核空间和用户空间之间的拷贝。

Kafka数据读写是批量的而不是单条的。除了利用底层的技术外,Kafka还在应用程序层面提供了一些手段来提升性能。最明显的就是使用批次。在向Kafka写入数据时,可以启用批次写入,这样可以避免在网络上频繁传输单个消息带来的延迟和带宽开销。

      在很多情况下,系统的瓶颈不是CPU或磁盘,而是网络IO,对于需要在广域网上数据中心之间发送消息的数据流水线尤其如此。进行数据压缩会消耗少量CPU资源,不过对于kafka而言,网络IO更应该需要考虑。如果每个消息都压缩,但是压缩率相对很低,所以Kafka使用了批量压缩,即将多个消息一起压缩而不是单个消息压缩

Kafka允许使用递归的消息集合,批量的消息可以通过压缩的形式传输并且在日志中也可以保持压缩格式,直到被消费者解压缩。Kafka支持多种压缩协议,包括Gzip和Snappy压缩协议。Kafka速度的秘诀在于,它把所有的消息都变成一个批量的文件,并且进行合理的批量压缩,减少网络IO损耗,通过mmap提高I/O速度,写入数据的时候由于单个Partion是末尾添加所以速度最优;读取数据的时候配合sendfile直接暴力输出。

数据存储

partition 的数据文件(offset,MessageSize,data)

partition 中的每条Message 包含了以下三个属性:offset,MessageSize,data,其中offset 表示Message 在这个partition 中的偏移量,offset 不是该Message 在partition 数据文件中的实际存储位置,而是逻辑上一个值,它唯一确定了partition 中的一条Message,可以认为offset 是partition 中Message 的id;MessageSize 表示消息内容data 的大小;data 为Message 的具体内容。

数据文件分段segment(顺序读写、分段命令、二分查找)

partition 物理上由多个segment 文件组成,每个segment 大小相等,顺序读写。每个segment数据文件以该段中最小的offset 命名,文件扩展名为。log。这样在查找指定offset 的Message 的时候,用二分查找就可以定位到该Message 在哪个segment 数据文件中。

数据文件索引(分段索引、稀疏存储

Kafka为每个分段后的数据文件建立了索引文件,文件名与数据文件的名字是一样的,只是文件扩展名为。index。index 文件中并没有为数据文件中的每条Message 建立索引,而是采用了稀疏存储的方式,每隔一定字节的数据建立一条索引。这样避免了索引文件占用过多的空间,从而可以将索引文件保留在内存中。

生产者

负载均衡(partition 会均衡分布到不同broker 上)

由于消息topic由多个partition 组成,且partition 会均衡分布到不同broker 上,因此,为了有效利用broker 集群的性能,提高消息的吞吐量,producer 可以通过随机或者hash 等方式,将消息平均发送到多个partition 上,以实现负载均衡。

批量发送

是提高消息吞吐量重要的方式,Producer 端可以在内存中合并多条消息后,以一次请求的方式发送了批量的消息给broker,从而大大减少broker 存储消息的IO 操作次数。但也一定程度上影响了消息的实时性,相当于以时延代价,换取更好的吞吐量。

压缩(GZIP 或Snappy)

Producer 端可以通过GZIP 或Snappy 格式对消息集合进行压缩。Producer 端进行压缩之后,在Consumer 端需进行解压。压缩的好处就是减少传输的数据量,减轻对网络传输的压力,在对大数据处理上,瓶颈往往体现在网络上而不是CPU(压缩和解压会耗掉部分CPU 资源)。

消费者

Consumer Group

同一Consumer Group 中的多个Consumer 实例,不同时消费同一个partition,等效于队列模式。partition 内消息是有序的,Consumer 通过pull 方式消费消息。Kafka不删除已消费的消息对于partition,顺序读写磁盘数据,以时间复杂度O(1)方式提供消息持久化能力。

与flume

在生产环境中,一般将Flume和Kafka搭配使用,Flume采集kafka缓冲,来完成实时流式的数据处理。

1、生产环境中,往往是读取日志进行分析,而这往往是多数据源的,如果Kafka构建多个生产者使用文件流的方式向主题写入数据再供消费者消费的话,无疑非常的不方便。

       2、如果Flume直接对接实时计算框架,当数据采集速度大于数据处理速度,很容易发生数据堆积或者数据丢失,而kafka可以当做一个消息缓存队列,从广义上理解,把它当做一个数据库,可以存放一段时间的数据。

       3、Kafka属于中间件,一个明显的优势就是使各层解耦,使得出错时不会干扰其他组件。

       4、Kafka与Flume都可以通过配置保证数据不丢失。但是,Flume不会复制消息,因此即使使用可靠的文件渠道,当Flume进程宕机后,你就无法访问这些消息了。当然Flume进程重启,从磁盘上恢复之前状态后,可以继续对消息进行处理。因此如果对 HA高可用性具有很高要求,建议使用Kafka;

       另外Flume常用的场景是:直接将数据写入存储,如hadoop或habase中。Flume对HDFS/HBase具有更好的优化,同时它也集成了Hadoop安全组件。如果数据需要被多个应用程序处理,建议Kafka;如果数据主要是用于Hadoop,建议Flume;

       另外如果希望将Kafka上的数据导入Hadoop,可以启动一个内置Kafka源与Hadoop槽的Flume进程。这样就不需要去实现自定义的消费者,同时还可以得到Flume对HDFS/HBase优化带来的好处。

集群安装

规划:

cancer01       1

cancer02       2

cancer03       3

说明1:在cancer01主机上配置好kafka目录后,复制到其他主机再修改下即可。

说明2:每台主机上都要安装zookeeper,配置好zookeeper集群。

解压:

tar -xzvf kafka_2.13-2.7.0.tgz

mv kafka_2.13-2.7.0 /usr/local/kafka

更改所有者

chown -R hadoop:hadoop /usr/local/kafka

配置环境

vim /etc/profile

export KAFKA_HOME=/usr/local/kafka

export PATH=$PATH:$KAFKA_HOME/bin

source /etc/profile

修改配置文件

(broker.id和host.name每台主机不相同)

vim /usr/local/kafka/conf/server.properties

#broker的全局唯一编号,不能重复

broker.id=1

#用来监听链接的端口,producer或consumer将在此端口建立连接

port=9092

#处理网络请求的线程数量

num.network.threads=3

#用来处理磁盘IO的线程数量

num.io.threads=8

#topic在当前broker上的分片个数

num.partitions=3

#用来恢复和清理data下数据的线程数量

num.recovery.threads.per.data.dir=1

#发送套接字的缓冲区大小

socket.send.buffer.bytes=102400

#接受套接字的缓冲区大小

socket.receive.buffer.bytes=102400

#请求套接字的缓冲区大小

socket.request.max.bytes=104857600

#kafka消息存放的路径

log.dirs=/usr/local/kafka/logs

#partion buffer中,消息的条数达到阈值,将触发flush到磁盘

log.flush.interval.messages=10000

#消息buffer的时间,达到阈值,将触发flush到磁盘

log.flush.interval.ms=3000

#segment文件保留的最长时间,超时将被删除

log.retention.hours=168

#滚动生成新的segment文件的最大时间

log.roll.hours=168

#日志文件中每个segment的大小,默认为1G

log.segment.bytes=1073741824

#周期性检查文件大小的时间

log.retention.check.interval.ms=300000

#日志清理是否打开

log.cleaner.enable=true

#broker需要使用zookeeper保存meta数据

zookeeper.connect=192.168.31.101:2181,192.168.31.102:2181,192.168.31.103:2181

#zookeeper链接超时时间

zookeeper.connection.timeout.ms=6000

#删除topic需要server.properties中设置delete.topic.enable=true否则只是标记删除

delete.topic.enable=true

#此处的host.name为本机IP(重要),如果不改,则客户端会抛出错误!

host.name=192.168.31.101

advertised.host.name=192.168.31.101

advertised.listeners=PLAINTEXT://192.168.31.101:9092

default.replication.factor=3

auto.create.topics.enable=true

message.max.byte=5242880

replica.fetch.max.bytes=5242880

说明:listeners一定要配置成为IP地址;如果配置为localhost或服务器的hostname,在使用java发送数据时就会抛出异常.因为在没有配置advertised.host.name的情况下,Kafka并没有像官方文档宣称的那样改为广播我们配置的host.name,而是广播了主机配置的hostname。远端的客户端并没有配置hosts,所以自然是连接不上这个hostname的。当使用java客户端访问远程的kafka时,一定要把集群中所有的端口打开,否则会连接超时

vim /usr/local/kafka/conf/producer.properties

metadata.broker.list=192.168.31.101:9092,192.168.31.102:9092,192.168.31.103:9092

vim /usr/local/kafka/conf/consumer.properties

zookeeper.connect=192.168.31.101:2181,192.168.31.102:2181,192.168.31.103:2181

分发到其他主机

scp -r /usr/local/kafka cancer02:/usr/local/

scp -r /usr/local/kafka cancer03:/usr/local/

修改其他主机配置

vim /etc/profile

vim /usr/local/kafka/conf/server.properties

启动zookeeper

zkServer.sh start

zkServer.sh status

启动kafka

kafka-server-start.sh $KAFKA_HOME/config/server.properties 1>/dev/null 2>&1 &

kafka-server-start.sh ../config/server.properties 1>/dev/null 2>&1 &

或者

kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties 1>/dev/null 2>&1 &

kafka-server-start.sh -daemon ../config/server.properties 1>/dev/null 2>&1 &

或者

nohup kafka-server-start.sh ../config/server.properties &>> /usr/local/kafka/kafka.log &

验证

创建topic

kafka-topics.sh --create --zookeeper cancer01:2181 --replication-factor 1 --partitions 1 --topictest

查看topic是否创建成功

kafka-topics.sh --list -zookeeper cancer01:2181

产品使用

创建一个名为“test”的Topic,只有一个分区和备份

kafka-topics.sh --create --zookeeper cancer01:2181,cancer02:2181,cancer03:2181 --config max.message.bytes=12800000 --config flush.messages=1 --replication-factor 1 --partitions 1 --topictest

kafka-topics.sh --create --zookeeper cancer01:2181,cancer02:2181,cancer03:2181 --replication-factor 3 --partitions 3 --topictest

命令解析:

--create:           指定创建topic动作

--topic:            指定新建topic的名称

--zookeeper:     指定kafka连接zk的连接url,该值和server.properties文件中的配置项{zookeeper.connect}一样

--config:           指定当前topic上有效的参数值

--partitions:指定当前创建的kafka分区数量,默认为1个

--replication-factor:指定每个分区的复制因子个数,默认1个

查看已创建的topic信息

kafka-topics.sh --list --zookeeper localhost:2181

查看topic描述信息

kafka-topics.sh --describe –zookeeper localhost:2181 --topictest

命令解析:

--describe: 指定是展示详细信息命令

--zookeeper: 指定kafka连接zk的连接url,该值和server.properties文件中的配置项{zookeeper.connect}一样

--topic:指定需要展示数据的topic名称

为topic增加副本

kafka-reassign-partitions.sh -zookeeper cancer01:2181,cancer02:2181,cancer03:2181 -reassignment-json-file json/partitions-to-move.json -execute 

为topic增加partition

kafka-topics.sh –zookeeper cancer01:2181,cancer02:2181,cancer03:2181 –alter –partitions 20 –topictest

修改Topic信息

kafka-topics.sh --zookeeper cancer01:2181,cancer02:2181,cancer03:2181 --alter --topictest --config max.message.bytes=128000

kafka-topics.sh --zookeeper cancer01:2181,cancer02:2181,cancer03:2181 --alter --topictest --delete-config max.message.bytes

kafka-topics.sh --zookeeper cancer01:2181,cancer02:2181,cancer03:2181 --alter --topictest --partitions 10

kafka-topics.sh --zookeeper cancer01:2181,cancer02:2181,cancer03:2181 --alter --topictest --partitions 3 ## Kafka分区数量只允许增加,不允许减少

删除Topic

默认情况下Kafka的Topic是没法直接删除的,删除是标记删除,没有实际删除这个Topic;如果运行删除Topic,两种方式:

       方式一:通过delete命令删除后,手动将本地磁盘以及zk上的相关topic的信息删除即可

      方式二:配置server.properties文件,给定参数delete.topic.enable=true,重启kafka服务,此时执行delete命令表示允许进行Topic的删除

kafka-topics.sh --delete --topictest --zookeeper cancer01:2181,cancer02:2181,cancer03:2181

kafka-run-class.sh kafka.admin.DeleteTopicCommand --topictest --zookeeper cancer01:2181,cancer02:2181,cancer03:2181

你可以通过命令:./bin/kafka-topics --zookeeper 【zookeeper server】 --list   来查看所有topic

此时你若想真正删除它,可以如下操作:

(1)登录zookeeper客户端:命令:./bin/zookeeper-client

(2)找到topic所在的目录:ls /brokers/topics

(3)找到要删除的topic,执行命令:rmr /brokers/topics/【topicname】即可,此时topic被彻底删除.

另外被标记为marked for deletion的topic你可以在zookeeper客户端中通过命令获得:ls /admin/delete_topics/【topicname】,

如果你删除了此处的topic,那么marked for deletion 标记消失

发送消息

Kafka提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给Kafka集群.每一行是一条消息.

kafka-console-producer.sh --broker-list cancer01:9092,cancer02:9092,cancer03:9092 --topictest

消费消息

Kafka也提供了一个消费消息的命令行工具,将存储的信息输出出来.

kafka-console-consumer.sh --bootstrap-server cancer01:9092,cancer02:9092,cancer03:9092 --topictest --from-beginning

(--from-beginning如果去掉不会出现消费者启动之前的消息)

下线broker

kafka-run-class.sh kafka.admin.ShutdownBroker --zookeeper cancer01:2181,cancer02:2181,cancer03:2181 --broker #brokerId# --num.retries 3 --retry.interval.ms 60

shutdown broker

查看consumer组内消费的offset

kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper cancer01:2181,cancer02:2181,cancer03:2181 --group test --topic test

创建topic,备份3份

kafka-topics.sh --create --zookeeper cancer01:2181,cancer02:2181,cancer03:2181 --replication-factor 3 --partitions 1 --topicmy-replicated-topic

查看topic运行情况

kafka-topics.sh --describe --zookeeper localhost:2181 --topicmy-replicated-topic

//所有分区的摘要

Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3    Configs:

#提供一个分区信息,因为我们只有一个分区,所以只有一行.

Topic: my-replicated-topic   Partition: 0    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0

#“leader”:该节点负责该分区的所有的读和写,每个节点的leader都是随机选择的.

#“replicas”:备份的节点列表,无论该节点是否是leader或者目前是否还活着,只是显示.

#“isr”:“同步备份”的节点列表,也就是活着的节点并且正在同步leader

其中Replicas和Isr中的1,2,0就对应着3个broker他们的broker.id属性

干掉leader,测试集群容错

  首先查询谁是leader

kafka-topics.sh --describe --zookeeper localhost:2181 --topicmy-replicated-topic

//所有分区的摘要

Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3    Configs:

//提供一个分区信息,因为我们只有一个分区,所以只有一行.

Topic: my-replicated-topic   Partition: 0    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0

可以看到Leader的broker.id为1,找到对应的Broker

[root@administrator bin]# jps -m

5130 Kafka../config/server.properties

4861 QuorumPeerMain ../config/zookeeper.properties

1231 Bootstrap start start

7420 Kafka../config/server-2.properties

7111 Kafka../config/server-1.properties

9139 Jps -m

查询到Leader的PID(Kafka../config/server-1.properties)为7111,杀掉该进程

//杀掉该进程

kill -9 7111

//再查询一下,确认新的Leader已经产生,新的Leader为broker.id=0

./kafka-topics.sh --describe --zookeeper localhost:2181 --topicmy-replicated-topic

Topic:my-replicated-topic      PartitionCount:1        ReplicationFactor:3    Configs:

//备份节点之一成为新的leader,而broker1已经不在同步备份集合里了

Topic: my-replicated-topic     Partition: 0    Leader: 0       Replicas: 1,0,2 Isr: 0,2

再次消费消息,确认消息没有丢失

./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topicmy-replicated-topic

cluster message 1

cluster message 2

消息依然存在,故障转移成功!!

Producer生产者

发送消息的方式,只管发送,不管结果:只调用接口发送消息到 Kafka服务器,但不管成功写入与否.由于 Kafka是高可用的,因此大部分情况下消息都会写入,但在异常情况下会丢消息

同步发送:调用 send() 方法返回一个 Future 对象,我们可以使用它的 get() 方法来判断消息发送成功与否

异步发送:调用 send() 时提供一个回调方法,当接收到 broker 结果后回调此方法

public class MyProducer {

    private static KafkaProducer<String, String> producer;

    //初始化

    static {

        Properties properties = new Properties();

        //kafka启动,生产者建立连接broker的地址

        properties.put("bootstrap.servers", "127.0.0.1:9092");

        //kafka序列化方式

        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //自定义分区分配器

        properties.put("partitioner.class", "com.imooc.kafka.CustomPartitioner");

        producer = new KafkaProducer<>(properties);

    }

    /**

     * 创建topic:.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181

     * --replication-factor 1 --partitions 1 --topickafka-study

     * 创建消费者:.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092

     * --topicimooc-kafka-study --from-beginning

     */

    //发送消息,发送完后不做处理

    private static void sendMessageForgetResult() {

        ProducerRecord<String, String> record = new ProducerRecord<>("kafka-study", "name", "ForgetResult");

        producer.send(record);

        producer.close();

    }

    //发送同步消息,获取发送的消息

    private static void sendMessageSync() throws Exception {

        ProducerRecord<String, String> record = new ProducerRecord<>("kafka-study", "name", "sync");

        RecordMetadata result = producer.send(record).get();

        System.out.println(result.topic());//imooc-kafka-study

        System.out.println(result.partition());//分区为0

        System.out.println(result.offset());//已发送一条消息,此时偏移量+1

        producer.close();

    }

    /**

     * 创建topic:.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181

     * --replication-factor 1 --partitions 3 --topickafka-study-x

     * 创建消费者:.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092

     * --topickafka-study-x --from-beginning

     */

    private static void sendMessageCallback() {

        ProducerRecord<String, String> record = new ProducerRecord<>("kafka-study-x", "name", "callback");

        producer.send(record, new MyProducerCallback());

        //发送多条消息

        record = new ProducerRecord<>("kafka-study-x", "name-x", "callback");

        producer.send(record, new MyProducerCallback());

        producer.close();

    }

    //发送异步消息

    //场景:每条消息发送有延迟,多条消息发送,无需同步等待,可以执行其他操作,程序会自动异步调用

    private static class MyProducerCallback implements Callback {

        @Override

        public void onCompletion(RecordMetadata recordMetadata, Exception e) {

            if (e != null) {

                e.printStackTrace();

                return;

            }

            System.out.println("*** MyProducerCallback ***");

            System.out.println(recordMetadata.topic());

            System.out.println(recordMetadata.partition());

            System.out.println(recordMetadata.offset());

        }

    }

    public static void main(String[] args) throws Exception {

        //sendMessageForgetResult();

        //sendMessageSync();

        sendMessageCallback();

    }

}

自定义分区分配器:决定消息存放在哪个分区..默认分配器使用轮询存放,轮到已满分区将会写入失败.

public class CustomPartitioner implements Partitioner {

    @Override

    public int partition(String topic, Object key, byte[] keyBytes,

                         Object value, byte[] valueBytes, Cluster cluster) {

        //获取topic所有分区

        List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);

        int numPartitions = partitionInfos.size();

        //消息必须有key

        if (null == keyBytes || !(key instanceof String)) {

            throw new InvalidRecordException("kafkamessage must have key");

        }

        //如果只有一个分区,即0号分区

        if (numPartitions == 1) {return 0;}

        //如果key为name,发送至最后一个分区

        if (key.equals("name")) {return numPartitions - 1;}

        return Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1);

    }

    @Override

    public void close() {}

    @Override

    public void configure(Map<String, ?> map) {}

}

Kafka消费者(组)

* 自动提交位移 * 手动同步提交当前位移 * 手动异步提交当前位移 * 手动异步提交当前位移带回调 * 混合同步与异步提交位移

public class MyConsumer {

    private static KafkaConsumer<String, String> consumer;

    private static Properties properties;

    //初始化

    static {

        properties = new Properties();

        //建立连接broker的地址

        properties.put("bootstrap.servers", "127.0.0.1:9092");

        //kafka反序列化

        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        //指定消费者组

        properties.put("group.id", "KafkaStudy");

    }

    //自动提交位移:由consume自动管理提交

    private static void generalConsumeMessageAutoCommit() {

        //配置

        properties.put("enable.auto.commit", true);

        consumer = new KafkaConsumer<>(properties);

        //指定topic

        consumer.subscribe(Collections.singleton("kafka-study-x"));

        try {

            while (true) {

                boolean flag = true;

                //拉取信息,超时时间100ms

                ConsumerRecords<String, String> records = consumer.poll(100);

                //遍历打印消息

                for (ConsumerRecord<String, String> record : records) {

                    System.out.println(String.format(

                            "topic= %s, partition = %s, key = %s, value = %s",

                            record.topic(), record.partition(), record.key(), record.value()

                    ));

                    //消息发送完成

                    if (record.value().equals("done")) { flag = false; }

                }

                if (!flag) { break; }

            }

        } finally {

            consumer.close();

        }

    }

    //手动同步提交当前位移,根据需求提交,但容易发送阻塞,提交失败会进行重试直到抛出异常

    private static void generalConsumeMessageSyncCommit() {

        properties.put("auto.commit.offset", false);

        consumer = new KafkaConsumer<>(properties);

        consumer.subscribe(Collections.singletonList("kafka-study-x"));

        while (true) {

            boolean flag = true;

            ConsumerRecords<String, String> records = consumer.poll(100);

            for (ConsumerRecord<String, String> record : records) {

                System.out.println(String.format(

                        "topic= %s, partition = %s, key = %s, value = %s",

                        record.topic(), record.partition(), record.key(), record.value()

                ));

                if (record.value().equals("done")) { flag = false; }

            }

            try {

                //手动同步提交

                consumer.commitSync();

            } catch (CommitFailedException ex) {

                System.out.println("commit failed error: " + ex.getMessage());

            }

            if (!flag) { break; }

        }

    }

    //手动异步提交当前位移,提交速度快,但失败不会记录

    private static void generalConsumeMessageAsyncCommit() {

        properties.put("auto.commit.offset", false);

        consumer = new KafkaConsumer<>(properties);

        consumer.subscribe(Collections.singletonList("kafka-study-x"));

        while (true) {

            boolean flag = true;

            ConsumerRecords<String, String> records = consumer.poll(100);

            for (ConsumerRecord<String, String> record : records) {

                System.out.println(String.format(

                        "topic= %s, partition = %s, key = %s, value = %s",

                        record.topic(), record.partition(), record.key(), record.value()

                ));

                if (record.value().equals("done")) { flag = false; }

            }

            //手动异步提交

            consumer.commitAsync();

            if (!flag) { break; }

        }

    }

    //手动异步提交当前位移带回调

    private static void generalConsumeMessageAsyncCommitWithCallback() {

        properties.put("auto.commit.offset", false);

        consumer = new KafkaConsumer<>(properties);

        consumer.subscribe(Collections.singletonList("kafka-study-x"));

        while (true) {

            boolean flag = true;

            ConsumerRecords<String, String> records = consumer.poll(100);

            for (ConsumerRecord<String, String> record : records) {

                System.out.println(String.format(

                        "topic= %s, partition = %s, key = %s, value = %s",

                        record.topic(), record.partition(), record.key(), record.value()

                ));

                if (record.value().equals("done")) { flag = false; }

            }

            //使用java8函数式编程

            consumer.commitAsync((map, e) -> {

                if (e != null) {

                    System.out.println("commit failed for offsets: " + e.getMessage());

                }

            });

            if (!flag) { break; }

        }

    }

    //混合同步与异步提交位移

    @SuppressWarnings("all")

    private static void mixSyncAndAsyncCommit() {

        properties.put("auto.commit.offset", false);

        consumer = new KafkaConsumer<>(properties);

        consumer.subscribe(Collections.singletonList("kafka-study-x"));

        try {

            while (true) {

                //boolean flag = true;

                ConsumerRecords<String, String> records = consumer.poll(100);

                for (ConsumerRecord<String, String> record : records) {

                    System.out.println(String.format(

                            "topic= %s, partition = %s, key = %s, " + "value = %s",

                            record.topic(), record.partition(),

                            record.key(), record.value()

                    ));

                    //if (record.value().equals("done")) { flag = false; }

                }

                //手动异步提交,保证性能

                consumer.commitAsync();

                //if (!flag) { break; }

            }

        } catch (Exception ex) {

            System.out.println("commit async error: " + ex.getMessage());

        } finally {

            try {

                //异步提交失败,再尝试手动同步提交

                consumer.commitSync();

            } finally {

                consumer.close();

            }

        }

    }

    public static void main(String[] args) {

        //自动提交位移

        generalConsumeMessageAutoCommit();

        //手动同步提交当前位移

        //generalConsumeMessageSyncCommit();

        //手动异步提交当前位移

        //generalConsumeMessageAsyncCommit();

        //手动异步提交当前位移带回调

        //generalConsumeMessageAsyncCommitWithCallback()

        //混合同步与异步提交位移

        //mixSyncAndAsyncCommit();

    }

}

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐