软件名版本号部署路径
zookeeper3.7.0/data/zookeeper/apache-zookeeper-3.7.0-bin
kafka2.12-2.8.0/data/kafka/kafka_2.12-2.8.0

1.Zookeeper集群添加SASL

经过测试,kafka客户端不带密码可以登录 zk ,带了密码也可以,带密码但是密码是错的就不能连 zk 了,必须在zookeeper同步添加ACL账号来限制登录

1.1. 修改zoo.cfg配置文件

cat >> conf/zoo.cfg <<EOF
# kafka加密使用
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
zookeeper.sasl.client=true
EOF

1.2. 新增zk_server_jaas.conf

cat > conf/zk_server_jaas.conf <<EOF
Server {
    org.apache.kafka.common.security.plain.PlainLoginModule required 
        username="admin" 
        password="zookeeper-admin-pwd" 
        user_kafka="kafka-zookeeper-pwd" 
        user_producer="producer-zookeeper-pwd";
};
EOF

Server字段用于指定服务端登录配置。该配置通过org.apache.org.apache.kafka.common.security.plain.PlainLoginModule由指定采用PLAIN 机制, 定义了两个用户, 用户通过usemame 和password 指定该代理与集群其他代理初始化连接的用户名和密码, 通过“ user_ "为前缀后接用户名方式创建连接代理的用户名和密码,例如, user_kafka = "kafka-zookeeper-pwd” 是指用户名为kafka, 密码为kafka-zookeeper-pwd

1.3. 向zookeeper每个节点添加Kafka认证插件

以下2个jar包全部在 kafka 中的 libs 目录下,copy 到 zookeeper 的 lib 目录下就可以

cp /data/kafka/kafka_2.12-2.8.0/libs/kafka-clients-2.8.0.jar /data/zookeeper/apache-zookeeper-3.7.0-bin/lib/
cp /data/kafka/kafka_2.12-2.8.0/libs/lz4-java-1.7.1.jar /data/zookeeper/apache-zookeeper-3.7.0-bin/lib/

1.4. 修改zkEnv.sh

cat >> bin/zkEnv.sh <<EOF
export SERVER_JVMFLAGS=" -Djava.security.auth.login.config=\$ZOOCFGDIR/zk_server_jaas.conf \$SERVER_JVMFLAGS"
EOF

配置完成后,重启zookeeper即可

2.Kafka集群添加SASL

2.1. 新增kafka_server_jaas.conf配置文件

cat > config/kafka_server_jaas.conf <<EOF
Client {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="kafka"
   password="kafka-zookeeper-pwd";
};
KafkaServer {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="admin"
   password="kafka-admin-pwd"
   user_admin="kafka-admin-pwd"
   user_kafka_client="kafka-server-pwd";
};
KafkaClient {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="kafka_client"
   password="kafka-client-pwd";
};
EOF

Client是kafka和 zookeeper 通信用的,对应zookeeper的配置文件zk_server_jaas.conf中Server下的user_kafka的账号密码
KafkaServer用于指定服务端配置。该配置通过org.apache.org.apache.kafka.common.security.plain.PlainLoginModule由指定采用PLAIN 机制, 定义了两个用户, 用户通过usemame 和password 指定该代理与集群其他代理初始化连接的用户名和密码, 通过“ user_ "为前缀后接用户名方式创建连接代理的用户名和密码,例如, user_admin = "kafka-admin-pwd” 是指用户名为admin, 密码为kafka-admin-pwd,username和password的值必须在user_*中有配置,且用户名密码一致,否则kafka启动就会报错
KafkaClient用于指定kafka客户端配置,username和password必须再KafkaServer的user_*中有配置

2.2. 修改kafka-run-class.sh

export KAFKA_OPTS=" -Djava.security.auth.login.config=/data/kafka/kafka_2.12-2.8.0/config/kafka_server_jaas.conf "

2.3. zookeeper添加ACL

一定要用kafka自带的zookeeper客户端,这个客户端将通过kafka用户的sasl方式登录zookeeper,启动类是org.apache.zookeeper.ZooKeeperMainWithTlsSupportForKafka。单独zookeeper客户端启动类是org.apache.zookeeper.ZooKeeperMain,不能以sasl方式登录。

bin/zookeeper-shell.sh IP1:2181,IP2:2181,IP3:2181

#添加认证用户
addauth digest kafka:kafka-zookeeper-pwd
addauth digest admin:zookeeper-admin-pwd
addauth digest producer:producer-zookeeper-pwd
#配置ACL
setAcl / auth:kafka:cdrwa
#查看ACL
getAcl /

清除认证信息

vi zoo.cfg
插入:skipACL=yes
保存退出
此时已经设置为跳过权限
然后重启zookeeper
setAcl / world:anyone:cdrwa

2.4. 修改server.properties

listeners=SASL_PLAINTEXT://192.168.3.80:9092
advertised.listeners=SASL_PLAINTEXT://192.168.3.80:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=false
#super.users=User:*的值和kafka_server_jaas.conf中KafkaServer的username的值保持一致
super.users=User:admin
zookeeper.set.acl=true

2.7. 启动kafka

bin/kafka-server-start.sh -daemon config/server.properties

3.给kafka服务器端的kafka用户授权

注意: topic,group参数必须使用"",否则可能不生效

3.1.给用户kafka所有topic的producer权限

bin/kafka-acls.sh --authorizer-properties zookeeper.connect={zookeepe地址} --topic “" --add --allow-principal User:{用户名} --producer --group="

3.2给用户kafka所有topic的consumer权限

bin/kafka-acls.sh --authorizer-properties zookeeper.connect={zookeepe地址} --topic “" --add --allow-principal User:{用户名} --consumer–group="

3.3.给用户kafka所有topic的操作权限

bin/kafka-acls.sh --authorizer-properties zookeeper.connect={zookeepe地址} --topic “" --add --allow-principal User:{用户名} --operation all --group="

3.4查询权限

bin/kafka-acls.sh --authorizer-properties zookeeper.connect={zookeeper地址} --list --topic “{topic名称}”

4 测试kafka

4.1.显示所有topic

bin/kafka-topics.sh --list --zookeeper {zookeeper地址}

4.2.创建topic

/kafka-topics.sh --create --zookeeper {zookeeper地址} --topic {topic名称} --partitions 1 --replication-factor 1

4.3发送消息

修改producer.properties

cat > config/producer.properties <<EOF
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
EOF

bin/kafka-console-producer.sh --topic {topic名称} --bootstrap-server {kafka地址} --producer.config config/producer.properties

4.4读取消息

修改consumer.properties

cat > config/consumer.properties <<EOF
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
EOF

bin/kafka-console-consumer.sh --bootstrap-server {kafka地址} --topic test --from-beginning --consumer.config config/consumer.properties

4.5获取group列表

bin/kafka-consumer-groups.sh --bootstrap-server {kafka地址} --list --command-config config/consumer.properties

4.6获取group消费详情

 bin/kafka-consumer-groups.sh --bootstrap-server {kafka地址} --describe --group {groupid} --command-config config/consumer.properties

4.7删除topic

kafka-topics.sh --zookeeper {zookeeper地址} --delete --topic {topic名称}
Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐