一、概述

  1. GSSAPI: 使用的Kerberos认证,可以集成目录服务,比如AD。从Kafka0.9版本开始支持
  2.  PLAIN: 使用简单用户名和密码形式。从Kafka0.10版本开始支持
  3. SCRAM: 主要解决PLAIN动态更新问题以及安全机制,从Kafka0.10.2开始支持
  4. OAUTHBEARER: 基于OAuth 2认证框架,从Kafka2.0版本开始支持

二、配置SASL/PLAIN

        

  1. kafka配置文件server.properties配置
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true
auto.create.topics.enable=true
listeners=SASL_PLAINTEXT://<IP Address>:9092
advertised.listeners=SASL_PLAINTEXT://<IP Address>:9092

      

2. 创建kafka_server_jaas.conf文件

        这个文件是配置Broker服务端的JASS,在Kafka程序目录的config目录中创建kafka_servre_jass.conf

        

KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="admin"
password="admin-secret";
};

        username和password配置成具体的账号和密码

        KafkaServer字段是用来配置broker间通信使用的用户名和密码以及客户端连接时需要的用户名和密码,其中username和password是broker用于初始化连接到其他的broker,kafka用户为broker间的通讯。在一个Kafka集群中,这个文件的内容要一样,集群中每个Borker去连接其他Broker的时候都使用这个文件中定义的username和password来让对方进行认证。

        Client部分是用来设置与Zookeeper的连接的,它还允许broker设置 SASL ACL 到zookeeper 节点,锁定这些节点,只有broker可以修改它。如果Zookeeper与Broker之间不设置认证,那么就可以不配置Client部分。

 3. 启动时指定安全配置

export KAFKA_OPTS="-Djava.security.auth.login.config=/KAFKA_HOME/config/kafka_server_jaas.conf"

或将以上指令写入到启动脚本中bin/kafka-server-start.sh

4.  启动kafka server

bin/kafka-server-start.sh -daemon config/server.properties

5. kafka client认证

   (1) 新建kafka_client_jaas.conf文件

     

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
};
Client {
  org.apache.zookeeper.server.auth.DigestLoginModule required
  username="admin"
  password="admin-secret";
};

username和password是真实配置的用户名密码

(2) 使用kafka自带脚本消费消息

$ export KAFKA_OPTS="-

Djava.security.auth.login.config=/KAFKA_HOME/config/kafka_client_jaas.conf"

$ ./bin/kafka-console-consumer.sh --topic test-topic --from-beginning  --

consumer.config=config/consumer.properties --bootstrap-server=localhost:9092

(3)  使用kafka自带脚本生产消息

$ export KAFKA_OPTS="-
Djava.security.auth.login.config=/KAFKA_HOME/config/kafka_client_jaas.conf"
$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
--producer.config=config/producer.properties

6. Java程序使用

引入依赖

<!-- kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.0.0</version>
</dependency>

kafka-clients 3.0版本

配置

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-secret";
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//配置security.protocol
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, config.getSecurity().getProtocol());
//配置sasl.mechanism
properties.put(SaslConfigs.SASL_MECHANISM, config.getSasl().getMechanism());
//配置sasl.jaas.config
properties.put(SaslConfigs.SASL_JAAS_CONFIG, config.getSasl().getJaasConfig());

KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

感谢:

https://www.cnblogs.com/rexcheny/articles/12884990.html
https://docs.vmware.com/en/VMware-Smart-Assurance/10.1.0/sa-ui-installation-config-guide-10.1.0/GUID-3E473EC3-732A-4963-81BD-13BCCD3AC700.html
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐