在这里插入图片描述

1.概述

转载并且补充:Flink小坑之kerberos动态认证

2.官网

这段内容摘抄自官网:Kerberos Authentication Setup and Configuration

配置:Kerberos-based Authentication / Authorization

Flink 通过 Kafka 连接器提供了一流的支持,可以对 Kerberos 配置的 Kafka 安装进行身份验证。只需在 flink-conf.yaml 中配置 Flink。像这样为 Kafka 启用 Kerberos 身份验证:

  1. 通过设置以下内容配置 Kerberos 票据
  • security.kerberos.login.use-ticket-cache:默认情况下,这个值是 true,Flink 将尝试在 kinit 管理的票据缓存中使用 Kerberos 票据。注意!在 YARN 上部署的 Flink jobs 中使用 Kafka 连接器时,使用票据缓存的 Kerberos 授权将不起作用。使用 Mesos 进行部署时也是如此,因为 Mesos 部署不支持使用票据缓存进行授权。
  • security.kerberos.login.keytabsecurity.kerberos.login.principal:要使用 Kerberos keytabs,需为这两个属性设置值。
  1. KafkaClient 追加到 security.kerberos.login.contexts:这告诉 Flink 将配置的 Kerberos 票据提供给 Kafka 登录上下文以用于 Kafka 身份验证。

一旦启用了基于 Kerberos 的 Flink 安全性后,只需在提供的属性配置中包含以下两个设置(通过传递给内部 Kafka 客户端),即可使用 Flink Kafka Consumer 或 Producer 向 Kafk a进行身份验证:

  • security.protocol 设置为 SASL_PLAINTEXT(默认为 NONE):用于与 Kafka broker 进行通信的协议。使用独立 Flink 部署时,也可以使用 SASL_SSL;请在此处查看如何为 SSL 配置 Kafka 客户端。
  • sasl.kerberos.service.name 设置为 kafka(默认为 kafka):此值应与用于 Kafka broker 配置的 sasl.kerberos.service.name 相匹配。客户端和服务器配置之间的服务名称不匹配将导致身份验证失败。

有关 Kerberos 安全性 Flink 配置的更多信息,请参见[这里]({{< ref “docs/deployment/config” >}}})。你也可以在[这里]({{< ref “docs/deployment/security/security-kerberos” >}})进一步了解 Flink 如何在内部设置基于 kerberos 的安全性。

2. Kerberos认证方式

2.1 方式一(仅限于YARN)

在 YARN 模式下,可以部署一个没有 keytab 的安全 Flink 集群,只使用票据缓存(由kinit管理)。这避免了生成密钥表的复杂性,并避免将其委托给集群管理器。在这种情况下,Flink CLI 获取 Hadoop 委托令牌(用于 HDFS 和 HBase等)。主要缺点是集群必然是短暂的,因为生成的委托令牌将过期(通常在一周内)。

使用以下步骤运行安全的 Flink 集群kinit:

  1. 使用kinit命令登录。
  2. 正常部署 Flink 集群。

注:tgt有一个有效期,过期了就无法使用了,这种方式不适合长期任务。

2.2 方式二

在原生 Kubernetes、YARN 和 Mesos 模式下运行安全 Flink 集群的步骤

1.在客户端的 Flink 配置文件中添加安全相关的配置选项(见这里)。

 security.kerberos.login.keytab: /kerberos/flink.keytab
 security.kerberos.login.principal: flink
 security.kerberos.login.contexts: Client
 security.kerberos.login.use-ticket-cache: true

注意:这里的principal是不带主机名的,如果带主机名可能如下

security.kerberos.login.use-ticket-cache: true
security.kerberos.login.keytab: /home/xx/xx.keytab
# todo: 这里需要hdfs的配置
security.kerberos.login.principal: mr/node1@域名.COM
security.kerberos.login.contexts: KafkaClient,Client
security.kerberos.login.use-ticket-cache: true

但是这个认证会有问题,因为如果你在node1提交任务的时候,会找node1所在机器的/home/xx/xx.keytab文件,然后如果有的话,那么就可以使用,但是当你提交任务到集群后,因为是分布式运行,你传入的mr/node1@域名.COM 只适合Node1的keytab,而node2,node3的keytab是分别绑定到node2和Node3的,然后你用mr/node1@域名.COM去认证,就会找不到

这里会报错 【kafka】kerberos client is being asked for a password not available to garner authentication informa

所以有以下几种方案解决

  1. 生成一个principal是不包含主机名的,所有的主机可用
  2. 每个主机固定路径下的/home/xx/xx.keytab文件包含所有主机的principal.可以使用如下命令进行添加
ktadd  /home/mr/mr.keytab mr/node1@域名.COM
ktadd  /home/mr/mr.keytab mr/node2@域名.COM
ktadd  /home/mr/mr.keytab mr/node3@域名.COM
  1. 生成一个全局可用的keytab,与2相同

但是对方一直不生成,气死,网友也说需要生成一个全局的

在这里插入图片描述

在这里插入图片描述
后来我找到另外一种方案

解决

最后解决,我们不使用他们的keytab和jaas文件,把他们jaas文件拷贝下来一份

原本的

[root@zdh2 flink]# cat /etc/zdh/kafka/conf.zdh.kafka/jaas.conf
KafkaServer {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/home/mr/mr.keytab"  # 这个文件在每个服务器上不相同
  storeKey=true
  useTicketCache=false
  principal="mr/node2@ZCDDH.COM";
};

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/home/mr/mr.keytab"   # 这个文件在每个服务器上不相同
  storeKey=true
  useTicketCache=false
  principal="mr/node2@ZCDDH.COM";
};

Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/home/mr/mr.keytab"  # 这个文件在每个服务器上不相同
  storeKey=true
  useTicketCache=false
  principal="mr/node2@ZCDDH.COM";
};
[root@zdh2 f

我们把zdh2服务的/home/mr/mr.keytab文件,拷贝下来,放到其他服务器的 /usr/hdp/soft/zaas/keytab/目录下

改成如下,

[root@zdh3 keytab]# pwd
/usr/hdp/soft/zaas/keytab
[root@zdh3 keytab]# cat jaas.conf 
KafkaServer {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/usr/hdp/soft/zaas/keytab/mr.keytab"  # 对应zdh2服务器的keytab
  storeKey=true
  useTicketCache=false
  principal="mr/node2@ZCDDH.COM";
};

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/usr/hdp/soft/zaas/keytab/mr.keytab"   # 对应zdh2服务器的keytab
  storeKey=true
  useTicketCache=false
  principal="mr/node2@ZCDDH.COM";
};

Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/usr/hdp/soft/zaas/keytab/mr.keytab"  # 对应zdh2服务器的keytab
  storeKey=true
  useTicketCache=false
  principal="mr/node2@ZCDDH.COM";
};

我们的目录形式如下

[root@zdh2 keytab]# ll
total 8
-rw-r--r--. 1 root root 637 Apr 14 02:36 jaas.conf
-rwxr-xr-x. 1 root root 238 Apr 14 02:25 mr.keytab
[root@zdh2 keytab]# 


[root@zdh3 keytab]# ll
total 8
-rw-r--r--. 1 root root 637 Apr 14 02:37 jaas.conf
-rwxrwxrwx. 1 root root 238 Apr 14 02:27 mr.keytab

[root@zdh4 keytab]# ll
total 8
-rw-r--r--. 1 root root 637 Apr 14 02:37 jaas.conf
-rwxrwxrwx. 1 root root 238 Apr 14 02:27 mr.keytab

这里注意krb5.conf一般是所有服务器上一样的,如果不一样,也这样操作,拷贝一个服务器的放到一起。

这里确实有个小坑:【FLinlk】Flink小坑之kerberos动态认证

注: 这里就遇到了我们说的小坑,kerberos认证先于命令行解析,命令行定义的配置在认证阶段是无效的

2.确保密钥表文件存在于security.kerberos.login.keytab客户端节点上的指示的路径中

3.正常部署 Flink 集群。

在 YARN、Mesos 和原生 Kubernetes 模式下,keytab 会自动从客户端复制到 Flink 容器。

注: 这里就遇到了我们说的小坑,kerberos认证先于命令行解析,命令行定义的配置在认证阶段是无效的

因此,我们目前无法通过在命令行指定不同的keytab和principal覆盖文件中的配置,从而达到多用户认证,也就是目前我们只能通过flink用户来向YARN提交作业,没有了用户区分,我们就无法进行权限控制。

那如何解决这个问题,从而达到每个用户都能以自己名义去提交作业呢,这里我们只要解决命令行解析先于kerberos认证就可以了(修改源码)

2.2.1 .暴力解析器

Flink自带的命令行解析器,如果我们借助自身命令行解析器(减少改动也就减少了bug)。 我们可能同时需要兼容所有的解析器,并且在新添加解析器时也增加了限制,可能在某个解析器没有对应参数,解析就不生效。

因此,我们采用最简单最暴力的办法,自己定义一个解析器,解析命令行的参数来覆盖配置文件里定义参数。

具体步骤如下:

1.自定义一个命令行解析器。

// 位于org.apache.flink.client.cli.CliFrontendParser下
 public static class ExtendedGnuParser extends GnuParser {
         private final boolean ignoreUnrecognizedOption;
 ​
         public ExtendedGnuParser(boolean ignoreUnrecognizedOption) {
             // GnuParser、DefaultParser在遇到未定义的参数时都会抛出异常,这里是为了进行兼容
             this.ignoreUnrecognizedOption = ignoreUnrecognizedOption;
         }
 ​
         protected void processOption(String arg, ListIterator<String> iter) throws ParseException {
             boolean hasOption = this.getOptions().hasOption(arg);
             if (hasOption || !this.ignoreUnrecognizedOption) {
                 super.processOption(arg, iter);
             }}
     }

定义命令行参数并解析, 同时覆盖配置文件参数。

 // 位于org.apache.flink.client.cli.CliFrontend
 // 其this.configuration为从配置文件中读取的配置。 
     public Configuration compatCommandLineSecurityConfiguration(String[] args)
             throws CliArgsException, FlinkException, ParseException {
         // 定义一个自定义都参数解析Options。
         Options securityOptions = new Options()
                 .addOption(Option.builder("D")
                 .argName("property=value")
                 .numberOfArgs(2)
                 .valueSeparator('=')
                 .desc("Allows specifying multiple generic configuration options.")
                 .build());// 解析命令行传入的参数
         CommandLine commandLine =
                 new CliFrontendParser.ExtendedGnuParser(true)
                 .parse(securityOptions, args);// 从配置文件中读取的参数。
         Configuration securityConfiguration = new Configuration(this.configuration);// 用命令行传入参数替换掉配置文件中的security参数
         Properties dynamicProperties = commandLine.getOptionProperties("D");
         for(Map.Entry<Object, Object> entry: dynamicProperties.entrySet()){
             securityConfiguration.setString(
                 entry.getKey().toString(),
                 entry.getValue().toString()
             );
         }
         return securityConfiguration;
     }

修改之后我们进行编译mvn clean package -DskipTests -Dfast

编译完成后,flink根目录下会出现一个build-target的软连接,指向编译后的flink真正的安装包。

在这里插入图片描述

我们修改一下flink安装包的名称,由flink-1.13.2 修改为flink-1.13.2.1 来区分,同时上传到线上就可以了。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐