kafka集群搭建

注:使用VMware 作为虚拟机 ,SecureCRT连接虚拟机。

1、环境:3台centos虚拟机,且在同一网关

1、ip地址设置

2、创建3台虚拟机

1、为了演示出效果。我重新创建了3台虚拟机。并配置其id分别为

  192.168.25.121   192.168.25.122  192.168.25.123

2、如何配置,下面介绍

​ ①,使用屌丝程序员方式,直接修改配置文件。(我懒得下图形化插件了)

  
  [root@centos1 ~]# cd /etc/sysconfig/network-scripts/
  [root@centos1 network-scripts]# ls
  ifcfg-ens33  ifdown-ppp       ifup-eth     ifup-sit
  ifcfg-lo     ifdown-routes    ifup-ippp    ifup-Team
  ifdown       ifdown-sit       ifup-ipv6    ifup-TeamPort
  ifdown-bnep  ifdown-Team      ifup-isdn    ifup-tunnel
  ifdown-eth   ifdown-TeamPort  ifup-plip    ifup-wireless
  ifdown-ippp  ifdown-tunnel    ifup-plusb   init.ipv6-global
  ifdown-ipv6  ifup             ifup-post    network-functions
  ifdown-isdn  ifup-aliases     ifup-ppp     network-functions-ipv6
  ifdown-post  ifup-bnep        ifup-routes

​ ②,修改配置文件ifcfg-ens33

  
  [root@centos1 network-scripts]# vi ifcfg-ens33 

TYPE=Ethernet
PROXY_METHOD=none
BROWSER_ONLY=no
BOOTPROTO=static
DEFROUTE=yes
IPV4_FAILURE_FATAL=no
IPV6INIT=yes
IPV6_AUTOCONF=yes
IPV6_DEFROUTE=yes
IPV6_FAILURE_FATAL=no
IPV6_ADDR_GEN_MODE=stable-privacy
NAME=ens33
UUID=071e00df-5506-40c6-9373-4efb96c4d3aa
DEVICE=ens33
ONBOOT=yes
IPADDR=192.168.25.121
NETMASK=255.255.255.0
BROADCAST=192.168.25.255
GATEWAY=192.168.25.2
DNS1=8.8.8.8

配置链接外部网

 

 

依次修改3台 ip分别为 192.168.25.121 192.168.25.122 192.168.25.123.

在本地cmd测试是否能连通

 

Kafka集群是把状态保存在Zookeeper中的,首先要搭建Zookeeper集群,而zookeeper是依赖jdk的所以还需要下载jdk。

2、安装jdk 搜索qq1173185448联系我下载

①下载jdk的包,上传到虚拟机。

打开SecureCRT登录虚拟机。按住Alt+p进入文件上传界面

 

直接拖动jar包 ,进入即可。默认下载到当前用户家目录位置。

 

②配置jdk环境变量

​ 确定要安装java的位置。我习惯放在/opt/目录下

  
  [root@centos1 ~]# cd /opt/
  [root@centos1 opt]# ll
  总用量 0
  [root@centos1 opt]# mkdir java
  [root@centos1 opt]# ll
  总用量 0
  drwxr-xr-x. 2 root root 6 8月  17 09:31 java
  [root@centos1 opt]# mv /root/jdk-8u181-linux-x64.tar.gz .
  [root@centos1 opt]# ll
  总用量 181296
  drwxr-xr-x. 2 root root         6 8月  17 09:31 java
  -rw-r--r--. 1 root root 185646832 7月  25 11:49 jdk-8u181-linux-x64.tar.gz
  [root@centos1 opt]# tar -xzf jdk-8u181-linux-x64.tar.gz 
  #设置环境变量 在文件末尾追加
  [root@centos1 opt]# vi /etc/profile 
  (自己本地jdk的位置)
  JAVA_HOME=/opt/jdk1.8.0_181 
  JRE_HOME=$JAVA_HOME/jre
  PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
  CLASSPATH=:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib/dt.jar
  export JAVA_HOME JRE_HOME PATH CLASSPATH
  #使得环境变量生效
  [root@centos1 opt]# source /etc/profile
  #测试  出现版本号就成功了
  [root@centos1 opt]# java -version
  java version "1.8.0_181"  
  Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
  Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)

同理 配置好3台虚拟机的jdk。

3、Zookeeper集群搭建

①、下载zookeeper的包,搜索qq1173185448联系我下载。使用SecureCRT进行上传

​ 打开SecureCRT登录虚拟机。按住Alt+p进入文件上传界面

直接拖动jar包 ,进入即可。默认下载到当前用户家目录位置。

 

②、解压zookeeper (我仍然放在/opt/目录下面)

  
  [root@centos1 opt]# mkdir zookeeper             //项目目录
  [root@centos1 opt]# mkdir zookeeper/zkdata      //存放快照日志
  [root@centos1 opt]# mkdir zookeeper/zkdatalog   //存放事务日志
  [root@centos1 opt]# cd /opt/zookeeper/
  [root@centos1 zookeeper]# mv /root/zookeeper-3.4.10.tar.gz .    //移动到当前目录
  [root@centos1 zookeeper]# tar -xzf zookeeper-3.4.10.tar.gz      //解压

③、配置zookeeper配置文件

  
  [root@centos1 zookeeper]# ll
  总用量 4
  drwxr-xr-x. 10 1001 1001 4096 3月  23 2017 zookeeper-3.4.10
  [root@centos1 zookeeper]# cd zookeeper-3.4.10/conf/
  [root@centos1 conf]# ll
  总用量 12
  -rw-rw-r--. 1 1001 1001  535 3月  23 2017 configuration.xsl
  -rw-rw-r--. 1 1001 1001 2161 3月  23 2017 log4j.properties
  -rw-rw-r--. 1 1001 1001  922 3月  23 2017 zoo_sample.cfg

zoo_sample.cfg  这个文件是官方给我们的zookeeper的样板文件,给他复制一份命名为zoo.cfg,zoo.cfg是官方指定的文件命名规则。

  
  [root@centos1 conf]# cp zoo_sample.cfg zoo.cfg
  [root@centos1 conf]# ll
  总用量 16
  -rw-rw-r--. 1 1001 1001  535 3月  23 2017 configuration.xsl
  -rw-rw-r--. 1 1001 1001 2161 3月  23 2017 log4j.properties
  -rw-r--r--. 1 root root  922 8月  17 09:50 zoo.cfg //配置这个文件
  -rw-rw-r--. 1 1001 1001  922 3月  23 2017 zoo_sample.cfg
  [root@centos1 conf]# vi zoo.cfg 
  tickTime=2000
  initLimit=10
  syncLimit=5
  dataDir=/opt/zookeeper/zkdata
  dataLogDir=/opt/zookeeper/zkdatalog
  clientPort=12181
  server.1=192.168.25.121:12888:13888
  server.2=192.168.25.122:12888:13888
  server.3=192.168.25.123:12888:13888

配置文件解释

  
  #tickTime:
  这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
  #initLimit:
  这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒
  #syncLimit:
  这个配置项标识 Leader 与Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是5*2000=10秒
  #dataDir:
  快照日志的存储路径
  #dataLogDir:
  事物日志的存储路径,如果不配置这个那么事物日志会默认存储到dataDir制定的目录,这样会严重影响zk的性能,当zk吞吐量较大的时候,产生的事物日志、快照日志太多
  #clientPort:
  这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。修改他的端口改大点

④创建myid

  
  [root@centos1 zookeeper]# echo "1" > /opt/zookeeper/zkdata/myid

myid文件和server.myid  在快照目录下存放的标识本台服务器的文件,他是整个zk集群用来发现彼此的一个重要标识。

⑤同理配置好3台。 myid 为

  
  echo "n" > /opt/zookeeper/zkdata/myid   (第几台 n就写几)

⑥启动zookeeper

  
  1、启动
  [root@centos1 zookeeper]# cd /opt/zookeeper/zookeeper-3.4.10/bin/  进入bin目录
  [root@centos1 bin]# ./zkServer.sh start   (3台服务器都要启动)
  ZooKeeper JMX enabled by default
  Using config: /opt/zookeeper/zookeeper-3.4.10/bin/../conf/zoo.cfg
  Starting zookeeper ... STARTED
  [root@centos1 bin]# 
  2、查看状态
  [root@centos2 bin]# ./zkServer.sh status
  ZooKeeper JMX enabled by default
  Using config: /opt/zookeeper/zookeeper-3.4.10/bin/../conf/zoo.cfg
  Mode: leader          //leader为领导者,follower为追随者 。

4、kafka集群搭建

①、下载jar,上传到服务器(同 jdk与zookeeper一样。)

  
  [root@centos1 opt]# mkdir kafka
  [root@centos1 opt]# cd kafka/
  [root@centos1 kafka]# mkdir kafkalogs
  [root@centos1 kafka]# ll
  总用量 0
  drwxr-xr-x 2 root root 6 8月  17 21:31 kafkalogs
  [root@centos2 kafka]# mv /root/kafka_2.11-0.9.0.1.tgz .
  [root@centos2 kafka]# tar -xzf kafka_2.11-0.9.0.1.tgz
  [root@centos2 kafka]# cd kafka_2.11-0.9.0.1/config/
  [root@centos1 config]# ll
  总用量 64
  -rw-r--r-- 1 root root  906 2月  12 2016 connect-console-sink.properties
  -rw-r--r-- 1 root root  909 2月  12 2016 connect-console-source.properties
  -rw-r--r-- 1 root root 2110 2月  12 2016 connect-distributed.properties
  -rw-r--r-- 1 root root  922 2月  12 2016 connect-file-sink.properties
  -rw-r--r-- 1 root root  920 2月  12 2016 connect-file-source.properties
  -rw-r--r-- 1 root root 1074 2月  12 2016 connect-log4j.properties
  -rw-r--r-- 1 root root 2055 2月  12 2016 connect-standalone.properties
  -rw-r--r-- 1 root root 1199 2月  12 2016 consumer.properties
  -rw-r--r-- 1 root root 4369 2月  12 2016 log4j.properties
  -rw-r--r-- 1 root root 2228 2月  12 2016 producer.properties
  -rw-r--r-- 1 root root 5296 2月  12 2016 server.properties
  -rw-r--r-- 1 root root 3325 2月  12 2016 test-log4j.properties
  -rw-r--r-- 1 root root 1032 2月  12 2016 tools-log4j.properties
  -rw-r--r-- 1 root root 1023 2月  12 2016 zookeeper.properties

②、修改配置文件

关注:server.properties 这个文件即可,我们可以发现在目录下:

有很多文件,这里可以发现有Zookeeper文件,我们可以根据Kafka内带的zk集群来启动,但是建议使用独立的zk集群

  
  修改配置文件
  [root@centos1 config]# vi config/server.properties 
  broker.id=1  #当前机器在集群中的唯一标识,和zookeeper的myid性质一样。我们第一台写1,第二台2
  port=19092 #当前kafka对外提供服务的端口默认是9092
  host.name=192.168.25.121 #本机id。
  num.network.threads=3 #这个是borker进行网络处理的线程数
  num.io.threads=8 #这个是borker进行I/O处理的线程数
  log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
  socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
  socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
  socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
  num.partitions=1 #默认的分区数,一个topic默认1个分区数
  log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
  message.max.byte=5242880  #消息保存的最大值5M
  default.replication.factor=2  #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
  replica.fetch.max.bytes=5242880  #取消息的最大直接数
  log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
  log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
  log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
  zookeeper.connect=192.168.25.121:12181,192.168.25.122:12181,192.168.25.123:12181 #设置zookeeper的集群ip

③启动

  
  1、#从后台启动Kafka集群(3台都需要启动)
  cd/opt/kafka/kafka_2.11-0.9.0.1//bin #进入到kafka的bin目录 
  ./kafka-server-start.sh -daemon ../config/server.properties
  #检查是否启动
  #执行命令jps
  20348 Jps
  4233 QuorumPeerMain //zookeeper集群
  18991 Kafka         //kafka服务启动
  2、创建topic 在任意一台服务器都可以。在 kafka目录的bin目录下面
  ./kafka-topics.sh --create --zookeeper 192.168.25.121:12181 --replication-factor 2 --partitions 1 --topic xuexiangyi
  3、在第二台创建发布者
  ./kafka-console-producer.sh --broker-list 192.168.25.122:19092 --topic xuexiangyi
  4、在第三台创建订阅者
  ./kafka-console-consumer.sh --zookeeper 192.168.25.121:12181,192.168.25.122:12181,192.168.25.123:12181 --topic xuexiangyi --from-beginning
  5、测试结果如下图

 

 

查找kafka偏移量

 

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐