一、材料准备

免费下载地址:https://download.csdn.net/download/weixin_40496191/85320269

  1. 三台虚拟机:192.168.248.10、192.168.248.11、192.168.248.12
  2. 虚拟机需要开放接口:
    2.1 zookeeper:2181、2888、3888
    2.2 kafka:9092
  3. zookeeper安装包:apache-zookeeper-3.6.3-bin.tar.gz
  4. zookeeper客户端:ZooInspector.zip
  5. jdk安装包:jdk-8u291-linux-x64.tar.gz
  6. kafka安装包:kafka_2.12-2.8.1.tgz

二、网络配置

如果是通过ip访问,则需要保证三台网络互通,并且需要配置ip+主机名。因为大数据服务间的访问很多都需要寻找域名,这里配置ip+主机名相当于一个域名查找功能。如果不配置,访问将会很慢。

  1. 查看主机名:hostname
  2. 修改主机名:hostnamectl set-hostname 主机名
  3. 配置ip+主机名:vi /etc/hosts
    192.168.248.10 kafka1
    192.168.248.11 kafka2
    192.168.248.12 kafka3
    

三、安装jdk8

  1. 上传、解压到/opt/jdk:jdk-8u291-linux-x64.tar.gz

  2. 配置环境变量:vi /etc/profile,在最后一行添加:

    JAVA_HOME=/opt/jdk/jdk1.8.0_2911
    PATH=$PATH:$JAVA_HOME/bin
    CLASSPATH=:$JAVA_HOME/lib
    export JAVA_HOME PATH CLASSPATH
    
  3. 刷新配置:source /etc/profile

  4. 测试:java -version

四、安装zookeeper集群

  1. 上传、解压到/opt/zookeeper:apache-zookeeper-3.6.3-bin.tar.gz

  2. 创建zookeeper数据文件夹:mkdir /opt/zookeeper/tmp(自定义)

  3. 进入配置文件目录:cd /opt/zookeeper/apache-zookeeper-3.6.3-bin/conf

  4. 将“zoo_sample.cfg”重命名为“zoo.cfg”,并打开配置:

    4.1 打开zoo.cfg,修改数据文件夹地址:dataDir=/opt/zookeeper/tmp

    4.2 打开zoo.cfg,如果需要变更zookeeper端口号,可修改:clientPort=端口号,默认2181

    4.3 打开zoo.cfg,修改8080端口:admin.serverPort=8888(无用端口,默认8080)

    4.4 打开zoo.cfg,添加集群配置

    #ip+数据同步+通信端口,保证zookeeper、数据同步、通信端口三个端口不同
    server.1=192.168.248.10:2888:3888
    server.2=192.168.248.11:2888:3888
    server.3=192.168.248.12:2888:3888
    
  5. 进入zookeeper数据文件目录:cd /opt/zookeeper/tmp

  6. 新建myid并且写入对应的myid,myid为第4步对应的数字(server.数字)

    服务器192.168.248.10:echo 1 > myid
    服务器192.168.248.11:echo 2 > myid
    服务器192.168.248.12:echo 3 > myid    
    
  7. 启动:cd /opt/zookeeper/apache-zookeeper-3.6.3-bin/bin --> ./zkServer.sh start

  8. 查看主从状态:./zkServer.sh status
    在这里插入图片描述在这里插入图片描述

五、安装kafka集群

  1. 上传、解压到/opt/kafka:kafka_2.12-2.8.1.tgz
  2. 修改配置文件:vi /opt/kafka/kafka_2.12-2.8.1/config/server.properties
    2.1 修改节点标识:broker.id=1(2,3)
    2.2 配置可删除:delete.topic.enable=true
    2.3 日志路径:log.dirs=/opt/kafka/tmp
    2.4 配置端口:port=9092(自定义,默认9092)
    2.5 配置不可自动生成主题:auto.create.topics.enable=false
    2.6 配置可被外部访问: listeners=PLAINTEXT://0.0.0.0:9092 (跟端口一致)advertised.listeners=PLAINTEXT://192.168.248.10:9092(跟端口一致)
    2.7 配置清除数据日志文件策略: log.cleanup.policy=delete
    2.8 配置zk:zookeeper.connect=192.168.248.10:2181,192.168.248.11:2181,192.168.248.12:2181
  3. 创建日志文件夹:mkdir /opt/kafka/log
  4. 进入目录: cd /opt/kafka/kafka_2.12-2.8.1/bin
  5. 启动:nohup ./kafka-server-start.sh /opt/kafka/kafka_2.12-2.8.1/config/server.properties >>/opt/kafka/log/kafkaLog
  6. 查看端口:netstat-luntp。ps:如果第一次启动失败,尝试再次启动。因为zookeeper可能还没启动完毕,这时候启动kafka就会报错找不到zookeeper的服务。
    在这里插入图片描述
  7. 可以使用ZooInspector工具查看是否集群成功,解压双击:zookeeper-dev-ZooInspector.jar
    在这里插入图片描述

六、kafka集群配置ACL权限

该步骤主要是为了实现kafka主题的权限监控。具体可了解:kafka acl权限

  1. cd /opt/kafka/kafka_2.12-2.8.1/bin/

  2. 添加admin用户:./kafka-configs.sh --zookeeper 192.168.248.10:2181 --alter --add-config 'SCRAM-SHA-512=[password=admin]' --entity-type users --entity-name admin

  3. 查看admin用户:./kafka-configs.sh --zookeeper 192.168.248.10:2181 --describe --entity-type users --entity-name admin
    ps:因为是集群,所以一台添加,三台共享用户

  4. /opt/kafka/kafka_2.12-2.8.1/config底下添加kafka-broker-jaas.conf文件

    KafkaServer {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username="admin"
    password="admin";
    };
    
  5. 配置文件:/opt/kafka/kafka_2.12-2.8.1/config/server.properties

    5.1 添加项

    # 启用ACL
    authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
    # 设置本例中admin为超级用户
    super.users=User:admin
    # 启用SCRAM机制,采用SCRAM-SHA-512算法
    sasl.enabled.mechanisms=SCRAM-SHA-512
    # 为broker间通讯开启SCRAM机制,采用SCRAM-SHA-512算法
    sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
    # broker间通讯使用PLAINTEXT,本例中不演示SSL配置
    security.inter.broker.protocol=SASL_PLAINTEXT
    

    5.2 修改项

    # 配置listeners使用SASL_PLAINTEXT
    listeners=SASL_PLAINTEXT://192.168.248.10:9092
    # 配置advertised.listeners
    advertised.listeners=SASL_PLAINTEXT://192.168.248.10:9092
    
  6. 修改启动配置:/opt/kafka/kafka_2.12-2.8.1/bin/kafka-server-start.sh

    脚本最后一行配置环境变量如下:exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/opt/kafka/kafka_2.12-2.8.1/config/kafka-broker-jaas.conf kafka.Kafka "$@"

  7. 重启kafka

七、相关脚本编写

  1. zookeeper启动脚本

    1.1 cd /opt --> vi zookeeper.sh

    cd /opt/zookeeper/apache-zookeeper-3.6.3-bin/bin 
    ./zkServer.sh start
    

    1.2 zookeeper脚本赋权:chmod u+x zookeeper.sh
    1.3 启动脚本测试:./zookeeper.sh
    1.4 查看端口:netstat -luntp
    在这里插入图片描述

  2. kafka启动脚本
    2.1 cd /opt --> vi kafka.sh

    cd /opt/kafka/kafka_2.12-2.8.1/bin
    nohup ./kafka-server-start.sh  /opt/kafka/kafka_2.12-2.8.1/config/server.properties >>/opt/kafka/log/kafkaLog
    

    2.2 kafka脚本赋权:chmod u+x kafka.sh
    2.3 启动脚本测试:./kafka.sh
    2.4 查看端口:netstat -luntp
    在这里插入图片描述

安装完成,如遇到bug获安装问题,可在下方留言,看到都会及时回答!!!

八、相关查询指令

  1. 主题操作

    创建主题

    ./kafka-topics.sh --bootstrap-server localhost:9092  --create  --topic test
    

    指定主题的分区数、副本数创建主题

    ./kafka-topics.sh --bootstrap-server localhost:9092  --create   --replication-factor 1 --partitions 1 --topic test
    

    查询所有主题

    ./kafka-topics.sh --list --bootstrap-server localhost:9092
    

    查询主题特定信息

    ./kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic test 
    

    修改主题分区(只能从小往大改)

    ./kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic test --partitions 2	
    

    删除主题

    ./kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test
    
  2. 消息发送、消费操作

    发消息

    ./kafka-console-producer.sh broker-list --bootstrap-server localhost:9092  --topic test 
    

    消费消息(每次从头开始)

    ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    

    消费消息(从最新开始,即只会监听正在发的)

    ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
    

    指定消费组消费消息

    ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testgroup --topic test
    

    查看所有消费组

    ./kafka-consumer-groups.sh  --bootstrap-server localhost:9092 --list
    

    查看消费组详情

    ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testgroup
    

九、acl权限下相关查询指令

  1. 添加配置文件供客户端使用:vi /opt/kafka/kafka_2.12-3.4.0/config/kafka-broker-client-jaas.conf文件

    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=SCRAM-SHA-512
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="ffcsict" password="ffcsict1234!#%&";
    
  2. 查看主题(kafka-topics.sh脚本通用)

    ./kafka-topics.sh --list --bootstrap-server 192.168.248.10:9092 --command-config /opt/kafka/kafka_2.12-3.4.0/config/kafka-broker-client-jaas.conf
    
  3. 查看所有消费组(kafka-consumer-groups.sh脚本通用)

    ./kafka-consumer-groups.sh  --bootstrap-server 192.168.248.10:9092 --list  --command-config /opt/kafka/kafka_2.12-3.4.0/config/kafka-broker-client-jaas.conf
    

    十、kafka集群新增节点

  4. 新重复第五步和第六步,需要注意的几个配置:vi /opt/kafka/kafka_2.12-3.4.0/config/server.properties

    broker.id=新id
    listeners=SASL_PLAINTEXT://服务器ip:9092
    advertised.listeners=SASL_PLAINTEXT://服务器ip:9092
    
  5. 直接启动,可以使用指令查看,将ip改为新节点

    ./kafka-topics.sh --list --bootstrap-server 192.168.248.11:9092 --command-config /opt/kafka/kafka_2.12-3.4.0/config/kafka-broker-client-jaas.conf
    
  6. 创建迁移文件:vi /opt/topic.json

    {
            "topics": [{"topic": "test"},
                       {"topic": "test1"},
                       {"topic": "test3"}],
            "version": 1
    }
    
  7. 生成迁移计划,

    ./kafka-reassign-partitions.sh --bootstrap-server 192.168.248.11:9092  --topics-to-move-json-file /opt/topic.json --broker-list "0,1" --generate  --command-config /opt/kafka/kafka_2.12-3.4.0/config/kafka-broker-client-jaas.conf --generate > move_20241014.json
    

    打开生成的文件,删除json外的多余文字

  8. 执行迁移计划

    ./kafka-reassign-partitions.sh  --bootstrap-server 192.168.248.11:9092  --reassignment-json-file ./move_20241014.json --execute   --command-config /opt/kafka/kafka_2.12-3.4.0/config/kafka-broker-client-jaas.conf
    

    结果如下

    [root@kafka1 bin]# ./kafka-reassign-partitions.sh  --bootstrap-server 192.168.248.11:9092  --reassignment-json-file ./move_20241014.json --execute   --command-config /opt/kafka/kafka_2.12-3.4.0/config/kafka-broker-client-jaas.conf
    Current partition replica assignment
    
    {"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[0],"log_dirs":["any"]},{"topic":"test","partition":1,"replicas":[0],"log_dirs":["any"]},{"topic":"test1","partition":0,"replicas":[0],"log_dirs":["any"]},{"topic":"test1","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"test1","partition":2,"replicas":[0],"log_dirs":["any"]},{"topic":"test1","partition":3,"replicas":[1],"log_dirs":["any"]},{"topic":"test1","partition":4,"replicas":[0],"log_dirs":["any"]},{"topic":"test3","partition":0,"replicas":[1],"log_dirs":["any"]},{"topic":"test3","partition":1,"replicas":[0],"log_dirs":["any"]},{"topic":"test3","partition":2,"replicas":[1],"log_dirs":["any"]},{"topic":"test3","partition":3,"replicas":[0],"log_dirs":["any"]},{"topic":"test3","partition":4,"replicas":[1],"log_dirs":["any"]}]}
    
  9. 验证

    ps:也可以查看分区情况

    ./kafka-reassign-partitions.sh  --bootstrap-server 192.168.248.11:9092  --reassignment-json-file ./move_20241014.json --verify --command-config /opt/kafka/kafka_2.12-3.4.0/config/kafka-broker-client-jaas.conf
    

    结果如下

    [root@kafka1 bin]# ./kafka-reassign-partitions.sh  --bootstrap-server 192.168.248.11:9092  --reassignment-json-file ./move_20241014.json --verify --command-config /opt/kafka/kafka_2.12-3.4.0/config/kafka-broker-client-jaas.conf
    Status of partition reassignment:
    Reassignment of partition test-0 is completed.
    Reassignment of partition test-1 is completed.
    Reassignment of partition test1-0 is completed.
    Reassignment of partition test1-1 is completed.
    Reassignment of partition test1-2 is completed.
    Reassignment of partition test1-3 is completed.
    Reassignment of partition test1-4 is completed.
    Reassignment of partition test3-0 is completed.
    Reassignment of partition test3-1 is completed.
    Reassignment of partition test3-2 is completed.
    Reassignment of partition test3-3 is completed.
    Reassignment of partition test3-4 is completed.
    
    Clearing broker-level throttles on brokers 0,1
    Clearing topic-level throttles on topics test,test1,test3
    
Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐