Kafka配置安全认证
Kafka配置安全认证提示:本人提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档文章目录系列文章目录前言一、pandas是什么?二、使用步骤1.引入库2.读入数据总结前言提示:这里可以添加本文要记录的大概内容:例如:随着人工智能的不断发展,机器学习这门技术也越来越重要,很多人都开启了学习机器学习,本文就介绍了机器学习的基础内容。提示:以下是本篇文章正文内容,下面案例可供参考一、pa
Kafka配置安全认证
提示:为了对数据的安全考虑,在对kafka进行读取数据时需要添加安全认证,在摸索了大量博主的博客后,自己终于把这安全认证给安排了,废话不多说,往下走。
文章目录
4.producer-jaas.properties、consumer-jaas.properties
1.kafka-console-consumer-jaas.sh、kafka-console-producer-jaas.sh
一、环境
我使用的kakfa版本为 kafka_2.13-2.8.0.tgz,并且我没有安装zookeeper,使用的是kafka自带的zookeeper。
二、修改kafka/config下配置文件
为了不对已配置好的源文件进行修改,因此各位可以重新拷贝一份kafka的安装代码进行修改,本人是直接复制一份需要修改的文件,然后再在该文件中修改及测试。
1.添加kafka_server_jaas.conf
在 kafka/config 目录下, 添加 kafka_server_jaas.conf 文件,并将以下内容添加到文件中。
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin"
user_alice="alice";
};
在KafkaServer部分,username和password是broker用于初始化连接到其他的broker,在上面配置中,admin用户为broker间的通讯使用的账号和密码。
user_userName定义了所有连接到 broker和 broker验证的所有的客户端连接包括其他 broker的用户密码,user_admin 表示的是用户名为admin,后面的双引号中表示为密码,在这里设置了两个账号,一个为admin,一个为alice。在使用生产者和消费者连接kakfa的topic时,需要拿用户名和密码与这个做验证。
2.添加kafka_client_jaas.conf
在 kafka/config 目录下, 添加 kafka_client_jaas.conf 文件,并将以下内容添加到文件中。
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="alice"
password="alice";
};
在KafkaClient部分,username和password是客户端用来配置客户端连接broker的用户使用,通俗来讲,该配置为生产者或者消费者连接kafka时需要用到的用户名和密码。
若与1中设置的用户名和密码不一致,则会出现以下情况,表示用户名或密码不一致,连接失败:
3.添加kafka_zoo_jaas.conf
在 kafka/config 目录下, 添加 kafka_client_jaas.conf 文件,并将以下内容添加到文件中。
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
user_admin="admin";
};
该配置为zookeeper集群之间连接时需要的用户名和密码。
4.producer-jaas.properties、consumer-jaas.properties
将 producer.properties 和 consumer.properties 重新复制了一份并重命名,并在该两配置文件中添加以下两行代码,表示生产者和消费者需要使用安全认证。
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
5、zookeeper-jaas.properties
将 zookeeper.properties 文件拷贝一份并重命名,在文件中添加如下代码,开启验证。
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
6.server-jaas.properties
将 server.properties 文件拷贝一份并重命名,在文件中添加如下代码,开启验证,一定要记住同样字段的名字不能存在两份,不然会报错。
host.name=ip
listeners=SASL_PLAINTEXT://ip:端口
advertised.listeners=SASL_PLAINTEXT://ip:端口
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true
super.users=User:admin
三、修改kafka/bin下配置文件
切换到 bin 目录下。
1.kafka-console-consumer-jaas.sh、kafka-console-producer-jaas.sh
将 kafka-console-consumer.sh 和 kafka-console-producer.sh 文件拷贝一份并重命名,在 KAFKA_HEAP_OPTS 属性处添加以下参数,参数指定的路径需要注意,别跟着我写,按你们自己的来。
-Djava.security.auth.login.config=/usr/kafka/config/kafka_client_jaas.conf
结果如下:
2.kafka-server-start-jaas.sh
将 kafka-server-start.sh 文件拷贝一份并重命名,在 KAFKA_HEAP_OPTS 属性处添加以下参数,注意这里指定的是server这个配置文件。
-Djava.security.auth.login.config=/usr/kafka/config/kafka_server_jaas.conf
结果如下:
3.zookeeper-server-start-jaas.sh
将 zookeeper-server-start.sh 文件拷贝一份并重命名,在 KAFKA_HEAP_OPTS 属性处添加以下参数,注意这里指定的是server这个配置文件。
-Djava.security.auth.login.config=/usr/kafka/config/kafka_zoo_jaas.conf
结果如下:
四、启动测试
1.启动zookeeper环境
./bin/zookeeper-server-start-jaas.sh ./config/zookeeper-jaas.properties
2.启动kafka环境
./bin/kafka-server-start-jaas.sh ./config/server-jaas.properties
3.启动生产者
./bin/kafka-console-producer-jaas.sh --broker-list ip:9092 --topic test --producer.config ./config/producer-jaas.properties
4.启动消费者
./bin/kafka-console-consumer-jaas.sh --bootstrap-server ip:9092 --topic test --from-beginning --consumer.config ./config/consumer-jaas.properties
成功。
若此时修改kafka_client_jaas.conf 文件中的用户名或者密码。
再次通过生产者或者消费者访问时会报错:
五、其它
1.为用户 alice 在 test(topic)上添加读写的权限
./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=ip:2181 --add --allow-principal User:alice --operation Read --operation Write --topic test
2.为账号为admin密码为admin的用户创建证书
./bin/kafka-configs.sh --zookeeper ip:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin],SCRAM-SHA-512=[password=admin]' --entity-type users --entity-name admin
六、Java代码测试
本人主要是用来在java上访问kafka时能用到这个安全认证功能,关键代码如下。
// 配置文件读取,读取访问kafka需要的账号和密码
System.setProperty("java.security.auth.login.config", "/etc/auditsys/soar/kafka_client_jaas.conf");
Properties properties = new Properties();
properties.setProperty("zookeeper.connect", "ip:2181");
properties.setProperty("bootstrap.servers", "ip:9092");
properties.setProperty("group.id", "test");
properties.put("enable.auto.commit", "true");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "PLAIN");
FlinkKafkaConsumer myConsumer = new FlinkKafkaConsumer("test", new JSONKeyValueDeserializationSchema(true),
properties);
DataStream<ObjectNode> sourceStream = env.addSource(myConsumer);
sourceStream......(其它的流式处理)
env.execute("read from kafka");
总结
有什么问题还望大家指正,谢谢。
更多推荐
所有评论(0)