kafka集群的安装参见https://blog.csdn.net/shy_snow/article/details/124580111

kafka集群开启SASL/Kerberos安全认证

原理简介

Kafka使用JAAS(Java认证和授权服务)进行SASL(基于网络连接的安全认证机制)配置。

JAAS :全称Java Authentication and Authorization Service(JAVA的认证和授权API),它让你能够将一些标准的安全机制,例如Kerberos等通过一种通用的可配置的方式集成到系统当中去。主要类:
LoginContext:为了实现用户鉴别,建立相应的环境,从配置文件中导入规则;
LoginModule :确认用户的合法性,并分配访问权限principal给subject;
Principal:表示具有访问权限的一个唯一实体,比如用户名,身份证号等。
CallbackHandler:回调处理器,负责与用户(代码拥有者和执行者)交互,确认其身份的合法性;
Subject:表示登陆处理的目标,即一个被鉴别的用户。并可关联一个或多个pirncipal;

SASL:全称Simple Authentication and Security Layer,是一种用来扩充C/S模式验证能力的安全认证机制。
SASL规范了client与server之间的应答过程以及传输内容的编码方法,决定服务器本身如何存储客户端的身份证书以及如何核验客户端提供的密码,SASL对基于连接的协议添加用户认证支持。 SASL支持kerberos机制。

简单介绍了JAAS和SASL是什么,很多细节不深入探讨了,主要想说的是如何使用。
通过三个步骤即可实现JAAS安全认证:
(1) 配置jaas相关信息
(2) 开启SASL/Kerberos认证
(3) 通过脚本将配置信息传递给java虚拟机,这样java的JAAS服务才能根据配置进行认证和授权服务。
不论是zookeeper集群还是kafka集群都是如此。
在这里插入图片描述

zookeeper集群开启SASL/Kerberos认证

(0)为zookeeper生成kerberos的keytab文件

#登录到kdc服务器
# 创建用户
kadmin.local -q "add_principal -randkey zookeeper/host2612@HADOOP.COM"
kadmin.local -q "add_principal -randkey zookeeper/host2613@HADOOP.COM"
kadmin.local -q "add_principal -randkey zookeeper/host2614@HADOOP.COM"
kadmin.local -q "add_principal -randkey zookeeper/host2615@HADOOP.COM"
kadmin.local -q "add_principal -randkey zookeeper/host2616@HADOOP.COM"
kadmin.local -q "add_principal -randkey zookeeper/host2617@HADOOP.COM"
kadmin.local -q "add_principal -randkey zookeeper/host2619@HADOOP.COM"

# 导出keytab文件 将多个principal放到一个票据文件中
kadmin.local -q "xst -k /etc/keytab/zookeeper.keytab zookeeper/host2612 zookeeper/host2613 zookeeper/host2614 zookeeper/host2615 zookeeper/host2616 zookeeper/host2617 zookeeper/host2619"
# 将keytab复制zookeeper各节点机器上

(1) 新增jaas.conf配置jaas认证信息(Server,Client)

Server{
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/opt/MPP/kafka_2.11-2.1.1/config/kerberos/zookeeper.keytab"
principal="zookeeper/host2612@HADOOP.COM"
userTicketCache=false;
};

Client{
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/opt/MPP/kafka_2.11-2.1.1/config/kerberos/zookeeper.keytab"
principal="zookeeper/host2612@HADOOP.COM"
userTicketCache=false;
};

说明:com.sun.security.auth.module.Krb5LoginModule这个是类名,是kerberos对JAAS中的LoginModule的实现;required表示必须进行校验;其他几个是kerberos相关参数;后面会将该配置文件地址配置到jvm参数,其中的信息会被初始化到LoginContext上下文对象中。另外注意格式以及末尾的;分号。
(2)zoo.cfg开启sasl/kerberos认证

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
kerberos.removeHostFromPrincipal=true 
kerberos.removeRealmFromPrincipal=true
jaasLoginRenew=3600000

(3)java.env配置jaas认证文件地址等jvm参数值,传入java虚拟机中

export JVMFLAGS="-Xms128m -Xmx512m -Djava.security.krb5.conf=/opt/MPP/kafka_2.11-2.1.1/config/kerberos/krb5.conf -Djava.security.auth.login.config=/opt/MPP/apache-zookeeper-3.6.3-bin/conf/jaas.conf"
export CLIENT_JVMFLAGS="-Djava.security.krb5.conf=/opt/MPP/kafka_2.11-2.1.1/config/kerberos/krb5.conf -Djava.security.auth.login.config=/opt/MPP/apache-zookeeper-3.6.3-bin/conf/jaas.conf"

说明: 将jaas配置文件地址作为jvm参数传到jvm中。
最后重启各zookeeper节点。

Kafka开启Sasl/Kerberos安全认证

#生成keytab文件

# 创建服务端的keytab,格式:principal/hostname@realm
# 【注意】kafka-server只要与sasl.kerberos.service.name一致即可。
kadmin.local -q "add_principal -randkey kafka-server/host2612@HADOOP.COM"
kadmin.local -q "add_principal -randkey kafka-server/host2613@HADOOP.COM"
kadmin.local -q "add_principal -randkey kafka-server/host2614@HADOOP.COM"
kadmin.local -q "add_principal -randkey kafka-server/host2615@HADOOP.COM"
kadmin.local -q "add_principal -randkey kafka-server/host2616@HADOOP.COM"
kadmin.local -q "add_principal -randkey kafka-server/host2617@HADOOP.COM"
kadmin.local -q "add_principal -randkey kafka-server/host2619@HADOOP.COM"

#生成keytab文件
kadmin.local -q "xst -k /etc/keytab/kafka-server12.keytab kafka-server/host2612@HADOOP.COM kafka-server/host2613 kafka-server/host2614 kafka-server/host2615 kafka-server/host2616 kafka-server/host2617 kafka-server/host2619"

kafka服务端配置:

(1)kafka-server-jaas.conf配置jaas认证信息(KafkaServer,Client)

KafkaServer {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   keyTab="/opt/MPP/kafka_2.11-2.1.1/config/kerberos/kafka-server12.keytab"
   storeKey=true
   useTicketCache=false
   //不同的主机,需修改为本机的主机名
   principal="kafka-server/host2612@HADOOP.COM";  
};
# KafkaClient 是kafka客户端连接,我们这里没有选择配置在一起

# 【注意】Client是用来连接zookeeper集群的jaas/Kerberos信息
Client {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   keyTab="/opt/MPP/kafka_2.11-2.1.1/config/kerberos/zookeeper.keytab"
   storeKey=true
   useTicketCache=false
   principal="zookeeper/host2612@HADOOP.COM";
};

(2)server.properties开启sasl/kerberos认证

vi /opt/MPP/kafka_2.11-2.1.1/config/server.properties
# 增加或修改以下内容
# ip各节点不同
listeners=SASL_PLAINTEXT://192.168.26.12:9092
advertised.listeners=SASL_PLAINTEXT://192.168.26.12:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.enabled.mechanisms=GSSAPI
# sasl.kerberos.service.name要和principal的名称相同
sasl.kerberos.service.name=kafka-server

(3)kafka-run-class.sh将KAFKA_SASL_OPTS设置传入jvm参数中
kafka-run-class.sh是各脚本都会引用的,直接设置在这里,启动时参数信息会被传入jvm中。
在这里插入图片描述
修改完成后重启各kafka节点

kafka客户端配置:

(1)kafka-server-jaas.conf配置jaas认证信息(KafkaServer,Client)

KafkaClient {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   keyTab="/opt/MPP/kafka_2.11-2.1.1/config/kerberos/kafka-server12.keytab"
   storeKey=true
   useTicketCache=false
   //不同的主机,需修改为本机的主机名
   principal="kafka-server/host2612@HADOOP.COM";  
};

(2)consumer.properties, producer.properties开启sasl/kerberos认证

security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
#service.name要和kerberos中的principal一致
sasl.kerberos.service.name=kafka-server

(3)kafka-topics.sh、kafka-console-producer.sh、kafka-console-consumer.sh增加jvm参数引用jaas配置文件

#增加一行将jaas配置文件地址设置到jvm参数上
export KAFKA_OPTS=" -Djava.security.krb5.conf=/opt/MPP/kafka_2.11-2.1.1/config/kerberos/krb5.conf -Djava.security.auth.login.config=/opt/MPP/kafka_2.11-.1.1/config/kerberos/kafka_client_jaas.conf"

测试

#停止
 $KAFKA_HOME/bin/kafka-server-stop.sh

# 启动  
 $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties &

#查看topic
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper 192.168.26.12:2181,192.168.26.13:2181,192.168.26.14:2181/kafka --list

#创建topic
$KAFKA_HOME/bin/kafka-topics-sasl.sh --zookeeper 192.168.26.12:2181,192.168.26.13:2181,192.168.26.14:2181/kafka --create --replication-factor 1 --partitions 1 --topic firsttopic

#生产消息  带sasl
$KAFKA_HOME/bin/kafka-console-producer.sh   --broker-list 192.168.26.12:9092 --producer.config $KAFKA_HOME/config/producer.properties --topic firsttopic 

#消费消息
$KAFKA_HOME/bin/kafka-console-consumer.sh  --consumer.config $KAFKA_HOME/config/consumer.properties  --bootstrap-server 192.168.26.12:9092 --from-beginning --topic firsttopic
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐