【FLinlk】Flink小坑之kerberos动态认证
1.概述转载并且补充:Flink小坑之kerberos动态认证2.官网这段内容摘抄自官网:Kerberos Authentication Setup and Configuration配置:Kerberos-based Authentication / AuthorizationFlink 通过 Kafka 连接器提供了一流的支持,可以对 Kerberos 配置的 Kafka 安装进行身份验证。只
1.概述
转载并且补充:Flink小坑之kerberos动态认证
2.官网
这段内容摘抄自官网:Kerberos Authentication Setup and Configuration
配置:Kerberos-based Authentication / Authorization
Flink 通过 Kafka 连接器提供了一流的支持,可以对 Kerberos 配置的 Kafka 安装进行身份验证。只需在 flink-conf.yaml
中配置 Flink。像这样为 Kafka 启用 Kerberos 身份验证:
- 通过设置以下内容配置 Kerberos 票据
security.kerberos.login.use-ticket-cache
:默认情况下,这个值是true
,Flink 将尝试在kinit
管理的票据缓存中使用 Kerberos 票据。注意!在 YARN 上部署的 Flink jobs 中使用 Kafka 连接器时,使用票据缓存的 Kerberos 授权将不起作用。使用 Mesos 进行部署时也是如此,因为 Mesos 部署不支持使用票据缓存进行授权。security.kerberos.login.keytab
和security.kerberos.login.principal
:要使用 Kerberos keytabs,需为这两个属性设置值。
- 将
KafkaClient
追加到security.kerberos.login.contexts
:这告诉 Flink 将配置的 Kerberos 票据提供给 Kafka 登录上下文以用于 Kafka 身份验证。
一旦启用了基于 Kerberos 的 Flink 安全性后,只需在提供的属性配置中包含以下两个设置(通过传递给内部 Kafka 客户端),即可使用 Flink Kafka Consumer 或 Producer 向 Kafk a进行身份验证:
- 将
security.protocol
设置为SASL_PLAINTEXT
(默认为NONE
):用于与 Kafka broker 进行通信的协议。使用独立 Flink 部署时,也可以使用SASL_SSL
;请在此处查看如何为 SSL 配置 Kafka 客户端。 - 将
sasl.kerberos.service.name
设置为kafka
(默认为kafka
):此值应与用于 Kafka broker 配置的sasl.kerberos.service.name
相匹配。客户端和服务器配置之间的服务名称不匹配将导致身份验证失败。
有关 Kerberos 安全性 Flink 配置的更多信息,请参见[这里]({{< ref “docs/deployment/config” >}}})。你也可以在[这里]({{< ref “docs/deployment/security/security-kerberos” >}})进一步了解 Flink 如何在内部设置基于 kerberos 的安全性。
2. Kerberos认证方式
2.1 方式一(仅限于YARN)
在 YARN 模式下,可以部署一个没有 keytab 的安全 Flink 集群,只使用票据缓存(由kinit管理)。这避免了生成密钥表的复杂性,并避免将其委托给集群管理器。在这种情况下,Flink CLI 获取 Hadoop 委托令牌(用于 HDFS 和 HBase等)。主要缺点是集群必然是短暂的,因为生成的委托令牌将过期(通常在一周内)。
使用以下步骤运行安全的 Flink 集群kinit:
- 使用kinit命令登录。
- 正常部署 Flink 集群。
注:tgt有一个有效期,过期了就无法使用了,这种方式不适合长期任务。
2.2 方式二
在原生 Kubernetes、YARN 和 Mesos 模式下运行安全 Flink 集群的步骤
1.在客户端的 Flink 配置文件中添加安全相关的配置选项(见这里)。
security.kerberos.login.keytab: /kerberos/flink.keytab
security.kerberos.login.principal: flink
security.kerberos.login.contexts: Client
security.kerberos.login.use-ticket-cache: true
注意:这里的principal是不带主机名的,如果带主机名可能如下
security.kerberos.login.use-ticket-cache: true
security.kerberos.login.keytab: /home/xx/xx.keytab
# todo: 这里需要hdfs的配置
security.kerberos.login.principal: mr/node1@域名.COM
security.kerberos.login.contexts: KafkaClient,Client
security.kerberos.login.use-ticket-cache: true
但是这个认证会有问题,因为如果你在node1
提交任务的时候,会找node1
所在机器的/home/xx/xx.keytab
文件,然后如果有的话,那么就可以使用,但是当你提交任务到集群后,因为是分布式运行,你传入的mr/node1@域名.COM
只适合Node1的keytab,而node2,node3的keytab是分别绑定到node2和Node3的,然后你用mr/node1@域名.COM
去认证,就会找不到
。
这里会报错 【kafka】kerberos client is being asked for a password not available to garner authentication informa
所以有以下几种方案解决
- 生成一个principal是不包含主机名的,所有的主机可用
- 每个主机固定路径下的/home/xx/xx.keytab文件包含所有主机的principal.可以使用如下命令进行添加
ktadd /home/mr/mr.keytab mr/node1@域名.COM
ktadd /home/mr/mr.keytab mr/node2@域名.COM
ktadd /home/mr/mr.keytab mr/node3@域名.COM
- 生成一个全局可用的keytab,与2相同
但是对方一直不生成,气死,网友也说需要生成一个全局的
后来我找到另外一种方案
解决
最后解决,我们不使用他们的keytab和jaas文件,把他们jaas文件拷贝下来一份
原本的
[root@zdh2 flink]# cat /etc/zdh/kafka/conf.zdh.kafka/jaas.conf
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/mr/mr.keytab" # 这个文件在每个服务器上不相同
storeKey=true
useTicketCache=false
principal="mr/node2@ZCDDH.COM";
};
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/mr/mr.keytab" # 这个文件在每个服务器上不相同
storeKey=true
useTicketCache=false
principal="mr/node2@ZCDDH.COM";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/mr/mr.keytab" # 这个文件在每个服务器上不相同
storeKey=true
useTicketCache=false
principal="mr/node2@ZCDDH.COM";
};
[root@zdh2 f
我们把zdh2服务的/home/mr/mr.keytab文件,拷贝下来,放到其他服务器的 /usr/hdp/soft/zaas/keytab/目录下
改成如下,
[root@zdh3 keytab]# pwd
/usr/hdp/soft/zaas/keytab
[root@zdh3 keytab]# cat jaas.conf
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/usr/hdp/soft/zaas/keytab/mr.keytab" # 对应zdh2服务器的keytab
storeKey=true
useTicketCache=false
principal="mr/node2@ZCDDH.COM";
};
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/usr/hdp/soft/zaas/keytab/mr.keytab" # 对应zdh2服务器的keytab
storeKey=true
useTicketCache=false
principal="mr/node2@ZCDDH.COM";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/usr/hdp/soft/zaas/keytab/mr.keytab" # 对应zdh2服务器的keytab
storeKey=true
useTicketCache=false
principal="mr/node2@ZCDDH.COM";
};
我们的目录形式如下
[root@zdh2 keytab]# ll
total 8
-rw-r--r--. 1 root root 637 Apr 14 02:36 jaas.conf
-rwxr-xr-x. 1 root root 238 Apr 14 02:25 mr.keytab
[root@zdh2 keytab]#
[root@zdh3 keytab]# ll
total 8
-rw-r--r--. 1 root root 637 Apr 14 02:37 jaas.conf
-rwxrwxrwx. 1 root root 238 Apr 14 02:27 mr.keytab
[root@zdh4 keytab]# ll
total 8
-rw-r--r--. 1 root root 637 Apr 14 02:37 jaas.conf
-rwxrwxrwx. 1 root root 238 Apr 14 02:27 mr.keytab
这里注意krb5.conf一般是所有服务器上一样的,如果不一样,也这样操作,拷贝一个服务器的放到一起。
这里确实有个小坑:【FLinlk】Flink小坑之kerberos动态认证
注: 这里就遇到了我们说的小坑,kerberos认证先于命令行解析,命令行定义的配置在认证阶段是无效的
2.确保密钥表文件存在于security.kerberos.login.keytab
客户端节点上的指示的路径中
3.正常部署 Flink 集群。
在 YARN、Mesos 和原生 Kubernetes 模式下,keytab 会自动从客户端复制到 Flink 容器。
注: 这里就遇到了我们说的小坑,kerberos认证先于命令行解析,命令行定义的配置在认证阶段是无效的
因此,我们目前无法通过在命令行指定不同的keytab和principal覆盖文件中的配置,从而达到多用户认证,也就是目前我们只能通过flink用户来向YARN提交作业,没有了用户区分,我们就无法进行权限控制。
那如何解决这个问题,从而达到每个用户都能以自己名义去提交作业呢,这里我们只要解决命令行解析先于kerberos认证就可以了(修改源码)
2.2.1 .暴力解析器
Flink自带的命令行解析器,如果我们借助自身命令行解析器(减少改动也就减少了bug)。 我们可能同时需要兼容所有的解析器,并且在新添加解析器时也增加了限制,可能在某个解析器没有对应参数,解析就不生效。
因此,我们采用最简单最暴力的办法,自己定义一个解析器,解析命令行的参数来覆盖配置文件里定义参数。
具体步骤如下:
1.自定义一个命令行解析器。
// 位于org.apache.flink.client.cli.CliFrontendParser下
public static class ExtendedGnuParser extends GnuParser {
private final boolean ignoreUnrecognizedOption;
public ExtendedGnuParser(boolean ignoreUnrecognizedOption) {
// GnuParser、DefaultParser在遇到未定义的参数时都会抛出异常,这里是为了进行兼容
this.ignoreUnrecognizedOption = ignoreUnrecognizedOption;
}
protected void processOption(String arg, ListIterator<String> iter) throws ParseException {
boolean hasOption = this.getOptions().hasOption(arg);
if (hasOption || !this.ignoreUnrecognizedOption) {
super.processOption(arg, iter);
}
}
}
定义命令行参数并解析, 同时覆盖配置文件参数。
// 位于org.apache.flink.client.cli.CliFrontend
// 其this.configuration为从配置文件中读取的配置。
public Configuration compatCommandLineSecurityConfiguration(String[] args)
throws CliArgsException, FlinkException, ParseException {
// 定义一个自定义都参数解析Options。
Options securityOptions = new Options()
.addOption(Option.builder("D")
.argName("property=value")
.numberOfArgs(2)
.valueSeparator('=')
.desc("Allows specifying multiple generic configuration options.")
.build());
// 解析命令行传入的参数
CommandLine commandLine =
new CliFrontendParser.ExtendedGnuParser(true)
.parse(securityOptions, args);
// 从配置文件中读取的参数。
Configuration securityConfiguration = new Configuration(this.configuration);
// 用命令行传入参数替换掉配置文件中的security参数
Properties dynamicProperties = commandLine.getOptionProperties("D");
for(Map.Entry<Object, Object> entry: dynamicProperties.entrySet()){
securityConfiguration.setString(
entry.getKey().toString(),
entry.getValue().toString()
);
}
return securityConfiguration;
}
修改之后我们进行编译mvn clean package -DskipTests -Dfast
编译完成后,flink根目录下会出现一个build-target的软连接,指向编译后的flink真正的安装包。
我们修改一下flink安装包的名称,由flink-1.13.2 修改为flink-1.13.2.1 来区分,同时上传到线上就可以了。
更多推荐
所有评论(0)