Kafka配置安全认证

提示:为了对数据的安全考虑,在对kafka进行读取数据时需要添加安全认证,在摸索了大量博主的博客后,自己终于把这安全认证给安排了,废话不多说,往下走。


文章目录

 


一、环境

        我使用的kakfa版本为 kafka_2.13-2.8.0.tgz,并且我没有安装zookeeper,使用的是kafka自带的zookeeper。

 

二、修改kafka/config下配置文件

        为了不对已配置好的源文件进行修改,因此各位可以重新拷贝一份kafka的安装代码进行修改,本人是直接复制一份需要修改的文件,然后再在该文件中修改及测试。

1.添加kafka_server_jaas.conf

       在 kafka/config 目录下, 添加 kafka_server_jaas.conf 文件,并将以下内容添加到文件中。

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin"
    user_admin="admin"
    user_alice="alice";
};

       在KafkaServer部分,username和password是broker用于初始化连接到其他的broker,在上面配置中,admin用户为broker间的通讯使用的账号和密码。

       user_userName定义了所有连接到 broker和 broker验证的所有的客户端连接包括其他 broker的用户密码,user_admin 表示的是用户名为admin,后面的双引号中表示为密码,在这里设置了两个账号,一个为admin,一个为alice。在使用生产者和消费者连接kakfa的topic时,需要拿用户名和密码与这个做验证。

 

2.添加kafka_client_jaas.conf

       在 kafka/config 目录下, 添加 kafka_client_jaas.conf 文件,并将以下内容添加到文件中。

KafkaClient {
 	org.apache.kafka.common.security.plain.PlainLoginModule required
	username="alice"
	password="alice";
};

        在KafkaClient部分,username和password是客户端用来配置客户端连接broker的用户使用,通俗来讲,该配置为生产者或者消费者连接kafka时需要用到的用户名和密码。

      若与1中设置的用户名和密码不一致,则会出现以下情况,表示用户名或密码不一致,连接失败:

 

3.添加kafka_zoo_jaas.conf

       在 kafka/config 目录下, 添加 kafka_client_jaas.conf 文件,并将以下内容添加到文件中。

Server {
    org.apache.zookeeper.server.auth.DigestLoginModule required
    user_admin="admin";
};

       该配置为zookeeper集群之间连接时需要的用户名和密码。

 

4.producer-jaas.properties、consumer-jaas.properties

       将 producer.properties 和 consumer.properties 重新复制了一份并重命名,并在该两配置文件中添加以下两行代码,表示生产者和消费者需要使用安全认证。

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

 

5、zookeeper-jaas.properties

       将 zookeeper.properties 文件拷贝一份并重命名,在文件中添加如下代码,开启验证。

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl

 

6.server-jaas.properties

       将 server.properties 文件拷贝一份并重命名,在文件中添加如下代码,开启验证,一定要记住同样字段的名字不能存在两份,不然会报错。

host.name=ip
listeners=SASL_PLAINTEXT://ip:端口
advertised.listeners=SASL_PLAINTEXT://ip:端口
security.inter.broker.protocol=SASL_PLAINTEXT    
sasl.enabled.mechanisms=PLAIN    
sasl.mechanism.inter.broker.protocol=PLAIN    
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true
super.users=User:admin

 

三、修改kafka/bin下配置文件

        切换到 bin 目录下。

1.kafka-console-consumer-jaas.sh、kafka-console-producer-jaas.sh

       将 kafka-console-consumer.sh 和 kafka-console-producer.sh 文件拷贝一份并重命名,在 KAFKA_HEAP_OPTS 属性处添加以下参数,参数指定的路径需要注意,别跟着我写,按你们自己的来。

-Djava.security.auth.login.config=/usr/kafka/config/kafka_client_jaas.conf

结果如下:

 

2.kafka-server-start-jaas.sh

       将 kafka-server-start.sh 文件拷贝一份并重命名,在 KAFKA_HEAP_OPTS 属性处添加以下参数,注意这里指定的是server这个配置文件。

-Djava.security.auth.login.config=/usr/kafka/config/kafka_server_jaas.conf

结果如下:

 

3.zookeeper-server-start-jaas.sh

       将 zookeeper-server-start.sh 文件拷贝一份并重命名,在 KAFKA_HEAP_OPTS 属性处添加以下参数,注意这里指定的是server这个配置文件。

-Djava.security.auth.login.config=/usr/kafka/config/kafka_zoo_jaas.conf

结果如下:

 

四、启动测试

1.启动zookeeper环境

./bin/zookeeper-server-start-jaas.sh ./config/zookeeper-jaas.properties

2.启动kafka环境

./bin/kafka-server-start-jaas.sh ./config/server-jaas.properties

3.启动生产者

./bin/kafka-console-producer-jaas.sh --broker-list ip:9092 --topic test --producer.config ./config/producer-jaas.properties

 

4.启动消费者

./bin/kafka-console-consumer-jaas.sh --bootstrap-server ip:9092 --topic test --from-beginning --consumer.config ./config/consumer-jaas.properties

 

成功。

 

若此时修改kafka_client_jaas.conf 文件中的用户名或者密码。

再次通过生产者或者消费者访问时会报错:

 

 

五、其它

1.为用户 alice 在 test(topic)上添加读写的权限

./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=ip:2181 --add --allow-principal User:alice --operation Read  --operation Write --topic test

 

2.为账号为admin密码为admin的用户创建证书

./bin/kafka-configs.sh --zookeeper ip:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin],SCRAM-SHA-512=[password=admin]' --entity-type users --entity-name admin

 

六、Java代码测试

       本人主要是用来在java上访问kafka时能用到这个安全认证功能,关键代码如下。

// 配置文件读取,读取访问kafka需要的账号和密码
System.setProperty("java.security.auth.login.config", "/etc/auditsys/soar/kafka_client_jaas.conf");

Properties properties = new Properties();

properties.setProperty("zookeeper.connect", "ip:2181");
properties.setProperty("bootstrap.servers", "ip:9092");
properties.setProperty("group.id", "test");
properties.put("enable.auto.commit", "true");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "PLAIN");

FlinkKafkaConsumer myConsumer = new FlinkKafkaConsumer("test", new JSONKeyValueDeserializationSchema(true),
                properties);

DataStream<ObjectNode> sourceStream = env.addSource(myConsumer);

sourceStream......(其它的流式处理)

env.execute("read from kafka");

 

 


总结

       有什么问题还望大家指正,谢谢。

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐