简介

数据缓冲队列。同时提高了可扩展性。具有峰值处理能力,使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
Kafka是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景

特性:

高吞吐量:kafka每秒可以处理几十万条消息。
可扩展性:kafka集群支持热扩展- 持久性、
可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
高并发:支持数千个客户端同时读写

它主要包括以下组件

话题(Topic):是特定类型的消息流。(每条发布到 kafka 集群的消息属于的类别,即 kafka 是向topic的。)
生产者(Producer):是能够发布消息到话题的任何对象(发布消息到 kafka 集群的终端或服务).
消费者(Consumer):可以订阅一个或多个话题,从而消费这些已发布的消息。
服务代理(Broker):已发布的消息保存在一组服务器中它们被称为代理(Broker)或Kafka集群
partition(区):每个 topic 包含一个或多个 partition。
replication:partition 的副本,保障 partition 的高可用。
leader:replica 中的一个角色, producer 和 consumer 只跟 leader 交互。
follower:replica 中的一个角色,从 leader 中复制数据。
zookeeper:kafka 通过 zookeeper 来存储集群的信息。

Zookeeper: 

ZooKeeper是一个分布式协调服务,它的主要作用是为分布式系统提供一致性服务,提供的功能包括:配置维护、分布式同步等。Kafka的运行依赖ZooKeeper。 也是java微服务里面使用的一个注册中心服务
ZooKeeper主要用来协调Kafka的各个broker,不仅可以实现broker的负载均衡,而且当增加了broker或者某个broker故障了,ZooKeeper将会通知生产者和消费者,这样可以保证整个系统正常运转。
在Kafka中,一个topic会被分成多个区并被分到多个broker上,分区的信息以及broker的分布情况与消费者当前消费的状态信息都会保存在ZooKeeper中,

集群部署:

环境:三台Linux

1.dns域名解析

# vim /etc/hosts

192.168.31.208 es01       
192.168.31.209 es02        
192.168.31.210 es03 

2.安装jdk8

yum install -y java-1.8.0-openjdk

3.安装配置ZK

Kafka运行依赖ZK,Kafka官网提供的tar包中,已经包含了ZK,这里不再额外下载ZK程序。

(1)安装

# wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.0/kafka_2.12-2.8.0.tgz

# tar xzvf kafka_2.12-2.8.0.tgz -C /usr/local/

# mv /usr/local/kafka_2.12-2.8.0/ /usr/local/kafka/

(2)配置

注释掉所有内容 (三台机器)

# sed ‐i 's/^[^#]/#&/' /usr/local/kafka/config/zookeeper.properties

修改配置文件  es01

vim /usr/local/kafka/config/zookeeper.properties        #添加如下配置

dataDir=/opt/data/zookeeper/data # 需要创建,所有节点一致
dataLogDir=/opt/data/zookeeper/logs # 需要创建,所有节点一致
clientPort=2181
tickTime=2000
initLimit=20
syncLimit=10
# 以下 IP 信息根据自己服务器的 IP 进行修改
server.1=192.168.31.208:2888:3888 //kafka集群IP:Port
server.2=192.168.31.209:2888:3888
server.3=192.168.31.210:2888:3888

创建data、log目录

# mkdir ‐p /opt/data/zookeeper/{data,logs}

创建myid文件 es01

# echo 1 > /opt/data/zookeeper/data/myid

es02,es03配置文件的修改相同,但是对应的myid不同

el02的是2        el03的是3

配置项含义:

dataDir ZK数据存放目录。
dataLogDir ZK日志存放目录。
clientPort 客户端连接ZK服务的端口。
tickTime ZK服务器之间或客户端与服务器之间维持心跳的时间间隔。
initLimit 允许follower连接并同步到Leader的初始化连接时间,当初始化连接时间超过该值,则表示连接失败。
syncLimit Leader与Follower之间发送消息时如果follower在设置时间内不能与leader通信,那么此follower将会被丢弃。
server.1=192.168.31.208:2888:3888 2888是follower与leader交换信息的端口,3888是当leader挂了时用来执行选举时服务器相互通信的端口。

4.配置Kafka

注释掉配置文件中的所有内容

# sed ‐i 's/^[^#]/#&/' /usr/local/kafka/config/server.properties

修改kafka的配置文件

# vim /usr/local/kafka/config/server.properties        #在最后添加

el01,el02,el03的不同是,broker.id对应的是myid,        listeners当中的ip地址是本机对应的ip

broker.id=1 #改
listeners=PLAINTEXT://192.168.31.208:9092 #改
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/data/kafka/logs
num.partitions=6
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.31.208:2181,192.168.31.209:2181,192.168.31.210:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

创建对应的目录

# mkdir ‐p /opt/data/kafka/logs

配置项含义:

listeners=PLAINTEXT://192.168.19.22:9092    监听地址
num.network.threads    broker 处理消息的最大线程数,一般情况下不需要去修改
num.io.threads    broker处理磁盘IO 的线程数 ,数值应该大于你的硬盘数
socket.send.buffer.bytes    socket的发送缓冲区
socket.receive.buffer.bytes        socket的接收缓冲区
socket.request.max.bytes    socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖
log.dirs 日志文件目录
num.partitions
num.recovery.threads.per.data.dir
offsets.topic.replication.factor
log.cleanup.policy = delete
日志清理策略 选择有:delete和compact 主要针对过期数据的处理,或是日志文件达到限制的额度,会被
topic创建时的指定参数覆盖
log.cleanup.interval.mins=1
指定日志每隔多久检查看是否可以被删除,默认1分钟
log.retention.hours    数据存储的最大时间 超过这个时间 会根据log.cleanup.policy设置的策略处理数据,也就是消费端能够多久去消费数据。log.retention.bytes和log.retention.minutes或者log.retention.hours任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖
log.segment.bytes    topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖
log.retention.check.interval.ms    文件大小检查的周期时间,是否触发 log.cleanup.policy中设置的策略
zookeeper.connect        ZK主机地址,如果zookeeper是集群则以逗号隔开。
zookeeper.connection.timeout.ms        连接到Zookeeper的超时时间。

5.其他节点配置

只需把配置好的安装包直接分发到其他节点,修改 Kafka的broker.id和 listeners就可以了。

6.启动ZK集群

启动在三个节点依次执行:

        nohup &   配对使用,把执行的内容丢到后台

[root@es01 ~]# cd /usr/local/kafka
[root@es01 kafka]# nohup bin/zookeeper‐server‐start.sh config/zookeeper.properties &

查看端口

[root@mes‐1 ~]# netstat ‐lntp | grep 2181
tcp6 0 0 :::2181 :::* LISTEN 1226/java

7.启动Kafka

启动在三个节点依次执行:

[root@es01 ~]# cd /usr/local/kafka
[root@es01 kafka]# nohup bin/kafka‐server‐start.sh config/server.properties &

验证

在es01上创建topic

[root@es02 kafka]# bin/kafka‐topics.sh ‐‐create ‐‐zookeeper localhost:2181 ‐‐replication‐factor 1‐‐partitions 1 ‐‐topic testtopic

Created topic "testtopic".

参数解释: 

–zookeeper指定zookeeper的地址和端口,
–partitions指定partition的数量,
–replication‐factor指定数据副本的数量

在es02或者es03上查询es01的topic

[root@es03 kafka]# bin/kafka‐topics.sh ‐‐zookeeper 192.168.19.20:2181 ‐‐list
testtopic

在es01上模拟消息生产和消费

[root@es01 kafka]# bin/kafka‐console‐producer.sh ‐‐broker‐list 192.168.19.20:9092 ‐‐topic  testtopic
>hello
>你好呀

es02接收信息

[root@es02 kafka]# bin/kafka‐console‐consumer.sh ‐‐bootstrap‐server 192.168.19.21:9092   ‐‐topic  testtopic ‐‐from‐beginning
hello
你好呀

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐