一、背景:

在192.168.248.100的主机上搭建kafka服务器,需要做kafka访问的一个安全验证!
本文是建立在kerberos服务器已经搭建的情况下,搭建教程:
https://blog.csdn.net/weixin_40496191/article/details/124056421

二、kerberos整合kafka

  1. kerberos服务器创建kafka用户:addprinc -randkey kafka/192.168.248.100

  2. 生成凭据:ktadd -k /root/kerberos/kafka.keytab -norandkey kafka/192.168.248.100

  3. 将凭据拷贝到kafka服务器的/opt/kafka/kerberos目录底下

  4. 在kafka安装目录下config创建kafka-jaas.conf

    KafkaServer {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/opt/kafka/kerberos/kafka.keytab"
    principal="kafka/192.168.248.100@HADOOP.COM";
    };
     
    KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    useTicketCache=true
    keyTab="/opt/kafka/kerberos/kafka.keytab"
    principal="kafka/192.168.248.100@HADOOP.COM";
    };
    
  5. kafka修改安装目录下config/server.properties配置:

    advertised.listeners=SASL_PLAINTEXT://192.168.248.100:9092
    listeners=SASL_PLAINTEXT://192.168.248.100:9092
    security.inter.broker.protocol=SASL_PLAINTEXT
    sasl.mechanism.inter.broker.protocol=GSSAPI
    sasl.enabled.mechanisms=GSSAPI
    sasl.kerberos.service.name=kafka   
    
  6. kerberos服务器的krb5.conf移到/etc目录底下

  7. 修改bin/kafka-run-class.sh脚本,添加kafka jvm参数,,如下

      KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Djava.awt.headless=true -Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/opt/kafka/kafka1/config/kafka-jaas.conf"
    
  8. 修改config/producer.properties配置文件

    security.protocol = SASL_PLAINTEXT
    sasl.mechanism = GSSAPI
    sasl.kerberos.service.name =kafka
    
  9. 修改config/consumer.properties配置文件

    security.protocol = SASL_PLAINTEXT
    sasl.mechanism = GSSAPI
    sasl.kerberos.service.name=kafka
    
  10. 测试,启动kafka
    生产:./kafka-console-producer.sh --broker-list 192.168.248.100:9092 --topic TOPIC1 --producer.config /opt/kafka/kafka1/config/producer.properties

    消费:./kafka-console-consumer.sh --bootstrap-server 192.168.248.100:9092 --topic TOPIC1--consumer.config /opt/kafka/kafka1/config/consumer.properties

三、kerberos整合java+kafka

  1. kerberos整合java代码

    package cn.Api;
    
    import kafka.utils.ZkUtils;
    import org.I0Itec.zkclient.ZkClient;
    import org.apache.kafka.clients.admin.AdminClient;
    import org.apache.kafka.clients.admin.DeleteTopicsResult;
    import org.apache.kafka.clients.admin.ListTopicsResult;
    import org.apache.kafka.common.KafkaFuture;
    
    import java.util.ArrayList;
    import java.util.Properties;
    import java.util.Set;
    import java.util.concurrent.ExecutionException;
    
    /**
     * @author 天真热
     * @create 2022-04-06 15:09
     * @desc
     **/
    public class KerberosApi {
        public static void main(String[] args) {
            listTopic();
        }
    
        /**
         * 查看主题列表
         */
        public static void listTopic() {
            Properties prop = new Properties();
            System.setProperty("java.security.krb5.conf", "C:/Users/86188/Desktop/kerberos/krb5.conf");              //认证代码
            System.setProperty("java.security.auth.login.config", "C:/Users/86188/Desktop/kerberos/kafka-jaas.conf");//认证代码
            prop.put("sasl.kerberos.service.name", "kafka");     //认证代码
            prop.put("sasl.mechanism", "GSSAPI");                //认证代码
            prop.put("security.protocol", "SASL_PLAINTEXT");     //认证代码
            prop.put("bootstrap.servers", "192.168.248.100:9092");
            AdminClient admin = AdminClient.create(prop);
    
            ListTopicsResult result = admin.listTopics();
            KafkaFuture<Set<String>> future = result.names();
    
            try {
                System.out.println("==================Kafka Topics====================");
                future.get().forEach(name -> System.out.println(name));
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
    
        }
    
        /**
         * 删除主题
         */
        public static void deleteTopic() {
            //删除kafka主题
            Properties prop = new Properties();
            System.setProperty("java.security.krb5.conf", "C:/Users/86188/Desktop/kerberos/krb5.conf");              //认证代码
            System.setProperty("java.security.auth.login.config", "C:/Users/86188/Desktop/kerberos/kafka-jaas.conf");//认证代码
            prop.put("sasl.kerberos.service.name", "kafka");     //认证代码
            prop.put("sasl.mechanism", "GSSAPI");                //认证代码
            prop.put("security.protocol", "SASL_PLAINTEXT");     //认证代码
            prop.put("bootstrap.servers", "192.168.248.100:9092");
            AdminClient client = AdminClient.create(prop);
            ArrayList<String> topics = new ArrayList<>();
            topics.add("test_topic");
            DeleteTopicsResult result = client.deleteTopics(topics);
            try {
                result.all().get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    
    }
    
    

四、kerberos整合springboot+kafka

  1. 将以下三个文件存在到项目所在服务器的文件夹中,并且修改kafka-jaas.conf配置文件的keytab证书位置:kafka.keytabkafka-jaas.confkrb5.conf
  2. 项目配置启动参数:
    -Djava.security.auth.login.config=C:/Users/86188/Desktop/kerberos/kafka-jaas.conf -Djava.security.krb5.conf=C:/Users/86188/Desktop/kerberos/krb5.conf
  3. 项目yml配置:
    spring: 
    	kafka: 
    		properties: 
              sasl:
                mechanism: GSSAPI
                kerberos:
                  service:
                    name: kafka
              jass:
                enabled: true
              security:
                protocol: SASL_PLAINTEXT
    
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐