前言

Kafka kerberos 认证一般分为动态认证和静态认证:

动态认证即通过在properties配置项中sasl.jaas.config认证字符串来进行,静态认证通过System.setProperty(“java.security.auth.login.config”,"")来进行,如果在同一个jvm中需要进行不同服务的kerberos认证连接时 使用kafka静态认证方式,会产生奇奇怪怪的问题,例如真实的认证用户不是当前请求传递的用户或者命名传了正确的kerberos认证信息,但是却认证失败:Could not find a ‘kafkaClient’ entry in the JAAS configuration. System property ‘java.security.auth.login.config’ is /xxxxx/xxxxx/xxxxxx_jaas.conf。


一、Kafka的认证流程源码解读

上源码 KafkaConsumer的kerberos认证在的构造方法的

ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, time);

这一行,进去
在这里插入图片描述
进入之后如下:
在这里插入图片描述
因此在kafka的properties中加加入下面这个配置项

properties.put("security.protocol","SASL_PLAINTEXT")

进入create方法:
在这里插入图片描述
继续进入JassContext.loadClientContext方法:
在这里插入图片描述

public static JaasContext loadClientContext(Map<String, ?> configs) {
        String globalContextName = GLOBAL_CONTEXT_NAME_CLIENT;
        Password dynamicJaasConfig = (Password) configs.get(SaslConfigs.SASL_JAAS_CONFIG);
        return load(JaasContext.Type.CLIENT, null, globalContextName, dynamicJaasConfig);
    }

再进入load方法,可以看到,上一步通过Password dynamicJaasConfig = (Password) configs.get(SaslConfigs.SASL_JAAS_CONFIG)拿到的动态认证配置项作为了 是否为走动态认证逻辑的条件:

static JaasContext load(JaasContext.Type contextType, String listenerContextName,
                            String globalContextName, Password dynamicJaasConfig) {
        if (dynamicJaasConfig != null) {
            JaasConfig jaasConfig = new JaasConfig(globalContextName, dynamicJaasConfig.value());
            AppConfigurationEntry[] contextModules = jaasConfig.getAppConfigurationEntry(globalContextName);
            if (contextModules == null || contextModules.length == 0)
                throw new IllegalArgumentException("JAAS config property does not contain any login modules");
            else if (contextModules.length != 1)
                throw new IllegalArgumentException("JAAS config property contains " + contextModules.length + " login modules, should be 1 module");
            return new JaasContext(globalContextName, contextType, jaasConfig, dynamicJaasConfig);
        } else
            return defaultContext(contextType, listenerContextName, globalContextName);
    }

else中的逻辑 其实就是所说的静态认证

二、具体kerberos使用步骤

1.动态认证关键代码

代码如下(示例):

        System.setProperty("java.security.krb5.conf", "/xxxx/krb5.conf");
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.kerberos.service.name", "kafka");
        props.put("sasl.mechanism", "GSSAPI");
        props.put(SaslConfigs.SASL_JAAS_CONFIG,
                "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/xxx/xxx.keytab\" principal=\"xxx@xxxx.COM\";");
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
}

2.静态认证关键代码

代码如下(示例):

System.setProperty("java.security.auth.login.config", "/xxxxx/kafka_client_jaas.conf");
        System.setProperty("java.security.krb5.conf", "/xxxxx/krb5.conf");
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.kerberos.service.name", "kafka");
        props.put("sasl.mechanism", "GSSAPI");

三、静态认证&动态认证如何选择

, 进入defaultContext方法,静态认证中有一行非常关键的代码:
在这里插入图片描述
这个代码其实是有线程安全问题的,因为这里是静态获取,如果configuration对象在别的地方已经被初始化过,那么会直接返回,如果为空才会读取System.getProperty("")的值去读取文件并初始化一个新的
在这里插入图片描述
所以如果你,代码看到这里你就应该明白了 静态认证其实可以说是jvm全局共享的且默认不会刷新,那么如果你是常驻进程并且kafka客户端创建之后会缓存 比较适合静态认证方式,如果你的kerberos是多用户模式那就要选择动态认证方式,动态认证每次都会加载kafka properties里面的kerberos认证信息,并且不会在系统缓存

如果在多种数据源客户端共存的jvm中,很多都需要做kerberos认证,例如:hive、zookeeper、hbase、kafka、hdfs,这时如果你的kafka使用System.setProperty()的方式进行kerberos认证则会出现 接收请求的server端打印出的principal跟你客户端请求的不是同一个用户,或者是直接认证失败,但自己jaas文件内容已经路径传的明明对的。

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐