Kafka安装教程

kafka

  1. 下载kafka安装包

    安排下载链接 kafka,注意kafka版本

  2. 上传并解压安装包

    tar -zxvf kafka_2.13-2.8.0.tgz -C /opt/apps/
    
  3. 创建kafka - logs文件存放地址

    mkdir /opt/data/kafka
    
  4. 编辑kafka配置文件,config/server.properties

    broker.id=0  			# 每台服务器上的brokerid不一样
    
    delete.topic.enable=true 		# 可以删除topic的标识
    
    log.dirs=/opt/data/kafka		# 修改kafka数据存放路径
    
    zookeeper.connect=hadoop01:2181,hadoop02:2181,hadoop03:2181/kafka		# 修改kafka依赖zk的地址并将元数据存放至kafka节点下
    
  5. 将kafka安装包分发至集群各个节点上,并修改broker.id

  6. 将kafka的安装路径写入至/etc/profile文件中,source /etc/profile让配置文件即可生效

  7. 启动kafka集群

    #!/bin/bash
    ssh hadoop01 "source /etc/profile && $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties"
    ssh hadoop02 "source /etc/profile && $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties"
    ssh hadoop03 "source /etc/profile && $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties"
    
  8. kafka集群命令操作

    • 查看当前集群中的topic

      kafka-topics.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181/kafka --list

    • 创建topic

      kafka-topics.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181/kafka --create --replication-factor 3 --partitions 1 --topic first

      解释:–replication-factor 3 3个副本
      –partitions 1 1个分区

    • 生产消息

      kafka-console-producer.sh --broker-list hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic first

    • 消费消息

      kafka-console-consumer.sh --bootstrap-server
      hadoop01:9092,hadoop02:9092,hadoop03:9092 --from-beginning --topic first

      解释:–from-beginning 重头开始消费

    • 查看topic详情

      kafka-topics.sh --zookeeper hadoop01:2181/kafka --describe --topic first

kafka-eagle

  1. 下载kafka-eagle安装包并解压

    下载地址 kafka-eagle,里面也有详细的安装手册 kafka-eagle-docs

    tar -zxvf kafka-eagle-bin-2.1.0.tar.gz -C /opa/apps
    mv kafka-eagle-bin-2.1.0 kafka-eagle-2.1.0
    
  2. 配置kafka-eagle配置文件

    ######################################
    # multi zookeeper & kafka cluster list
    # Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead
    ######################################
    efak.zk.cluster.alias=cluster1
    cluster1.zk.list=hadoop01:2181,hadoop02:2181,hadoop03:2181/kafka
    ######################################
    # zookeeper enable acl
    ######################################
    cluster1.zk.acl.enable=false
    cluster1.zk.acl.schema=digest
    cluster1.zk.acl.username=test
    cluster1.zk.acl.password=test123
    ######################################
    # broker size online list
    ######################################
    cluster1.efak.broker.size=20
    ######################################
    # zk client thread limit
    ######################################
    kafka.zk.limit.size=16
    ######################################
    # EFAK webui port
    ######################################
    efak.webui.port=8048
    ######################################
    # EFAK enable distributed
    ######################################
    efak.distributed.enable=false
    efak.cluster.mode.status=master
    efak.worknode.master.host=localhost
    efak.worknode.port=8085
    ######################################
    # kafka jmx acl and ssl authenticate
    ######################################
    cluster1.efak.jmx.acl=false
    cluster1.efak.jmx.user=keadmin
    cluster1.efak.jmx.password=keadmin123
    cluster1.efak.jmx.ssl=false
    cluster1.efak.jmx.truststore.location=/data/ssl/certificates/kafka.truststore
    cluster1.efak.jmx.truststore.password=ke123456
    ######################################
    # kafka offset storage
    ######################################
    cluster1.efak.offset.storage=kafka
    cluster2.efak.offset.storage=zk
    ######################################
    # kafka jmx uri
    ######################################
    cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi
    ######################################
    # kafka metrics, 15 days by default
    ######################################
    efak.metrics.charts=true
    efak.metrics.retain=15
    ######################################
    # kafka sql topic records max
    ######################################
    efak.sql.topic.records.max=5000
    efak.sql.topic.preview.records.max=10
    ######################################
    # delete kafka topic token
    ######################################
    efak.topic.token=keadmin
    ######################################
    # kafka sasl authenticate
    ######################################
    cluster1.efak.sasl.enable=false
    cluster1.efak.sasl.protocol=SASL_PLAINTEXT
    cluster1.efak.sasl.mechanism=SCRAM-SHA-256
    cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle";
    cluster1.efak.sasl.client.id=
    cluster1.efak.blacklist.topics=
    cluster1.efak.sasl.cgroup.enable=false
    cluster1.efak.sasl.cgroup.topics=
    cluster2.efak.sasl.enable=false
    cluster2.efak.sasl.protocol=SASL_PLAINTEXT
    cluster2.efak.sasl.mechanism=PLAIN
    cluster2.efak.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle";
    cluster2.efak.sasl.client.id=
    cluster2.efak.blacklist.topics=
    cluster2.efak.sasl.cgroup.enable=false
    cluster2.efak.sasl.cgroup.topics=
    ######################################
    # kafka ssl authenticate
    ######################################
    cluster3.efak.ssl.enable=false
    cluster3.efak.ssl.protocol=SSL
    cluster3.efak.ssl.truststore.location=
    cluster3.efak.ssl.truststore.password=
    cluster3.efak.ssl.keystore.location=
    cluster3.efak.ssl.keystore.password=
    cluster3.efak.ssl.key.password=
    cluster3.efak.ssl.endpoint.identification.algorithm=https
    cluster3.efak.blacklist.topics=
    cluster3.efak.ssl.cgroup.enable=false
    cluster3.efak.ssl.cgroup.topics=
    ######################################
    # kafka sqlite jdbc driver address
    ######################################
    #efak.driver=org.sqlite.JDBC
    #efak.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
    #efak.username=root
    #efak.password=www.kafka-eagle.org
    ######################################
    # kafka mysql jdbc driver address
    ######################################
    efak.driver=com.mysql.cj.jdbc.Driver
    efak.url=jdbc:mysql://hadoop01:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
    efak.username=root
    efak.password=Admin@123
    
  3. 启动kafka-eagle服务

    ./bin/ke.sh start
    
  4. 登录查看

    地址:http://192.168.3.201:8048

    用户名:admin

    密码:123456

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐