1、jar包

<!-- spark -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_${scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_${scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>

<!-- elasticsearch -->
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-hadoop</artifactId>
    <version>6.8.21</version>
</dependency>

<!-- hadoop-common -->
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.8.3</version>
</dependency>

2、代码

package study

import org.apache.spark.SparkConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.elasticsearch.spark.sql.DefaultSource15

import scala.collection.mutable

object SparkToEs {

  def main(args: Array[String]): Unit = {
    initEsProperties(false)
  }

  def initEsProperties(useKerberos: Boolean): Unit = {
    // 构建spark上下文
    val conf = new SparkConf().setAppName("df").setMaster("local[2]")
    val spark = SparkSession.builder()
      .config(conf)
      .getOrCreate()

    val DOC_ID = "a1"
    val esConfig = new mutable.HashMap[String, String]
    var host = ""
    esConfig += ("es.resource" -> "cool_es_test/cool_es_test_tb")
    esConfig += ("es.write.operation" -> "index")

    val httpUrls = "http://192.168.1.151:9200"
    if (httpUrls.contains(",")) {
      esConfig += ("es.nodes.wan.only" -> "true")
      esConfig += ("es.nodes" -> httpUrls)
      host = httpUrls.split(":")(1).substring(2)
    } else {
      esConfig += ("es.nodes.wan.only" -> "true")
      esConfig += ("es.nodes" -> httpUrls.split(":")(1).substring(2))
      esConfig += ("es.port" -> httpUrls.split(":")(2).replace("/", ""))
      host = httpUrls.split(":")(1).substring(2)
    }
    esConfig += ("es.mapping.id" -> DOC_ID)

    if (useKerberos) {
      val KEYTAB = "D:\\workspace\\demo\\src\\main\\resources\\kerberos\\XXXXX.keytab"
      val PRINCIPAL = "XXXXX"
      val KRB5CONF = "D:\\workspace\\demo\\src\\main\\resources\\kerberos\\krb5.conf"

      var es_principal = "HTTP/" + host + "@EXAMPLE.COM"
      esConfig += ("es.security.authencation" -> "kerberos")
      esConfig += ("es.net.spnego.auth.elasticsearch.principal" -> es_principal)
      esConfig += ("es.security.user.provider.class" -> "com.cool.kerberos.OHadoopUserProvider") 
      esConfig += ("es.spark.principal" -> PRINCIPAL)
      esConfig += ("es.spark.keytab" -> KEYTAB)

      spark.sparkContext.addFile(KEYTAB)

      System.setProperty("http.auth.preference", "Kerberos")
      System.setProperty("java.security.krb5.conf", KRB5CONF)
      System.setProperty("sun.security.krb5.debug", "false")
      System.setProperty("sun.security.spnego.debug", "false")
    }

    val ds: DefaultSource15 = new DefaultSource15
    val someData = Seq(Row("2022-06-23 11:", 6, 10.6, "2022-10-27", "13:11:22", "2021-10-27T14:33:33"))
    val someSchema = List(
      StructField("a1", StringType, true),
      StructField("a2", IntegerType, true),
      StructField("a3", DoubleType, true),
      StructField("a4", StringType, true),
      StructField("a5", StringType, true),
      StructField("a6", StringType, true)
    )

    val df = spark.createDataFrame(spark.sparkContext.parallelize(someData), StructType(someSchema))
    df.show()

    val writeModeMap = Map("append" -> SaveMode.Append, "overwrite" -> SaveMode.Overwrite)
    ds.createRelation(spark.sqlContext, writeModeMap.getOrElse("append", SaveMode.Append), esConfig.toMap, df)
  }
}

3、spark连接es,需要保证每个executor都有对应凭证连接ES 

package com.cool.kerberos;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.spark.SparkFiles;
import org.elasticsearch.hadoop.EsHadoopException;
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.mr.security.HadoopUser;
import org.elasticsearch.hadoop.security.User;
import org.elasticsearch.hadoop.security.UserProvider;

import java.io.File;
import java.io.IOException;

/**
 * 重写了HadoopUserProvider类
 * 
 * 修改内容: 在获取ugi的时候,判断是否登录,如果没有就自动登录,并返回ugi,这样在每个executor节点中都会自动登录
 * 用法: esConifg中配置 es.security.user.provider.class
 * esConfig.put(ConfigurationOptions.ES_SECURITY_USER_PROVIDER_CLASS -> "com.cool.kerberos.KHadoopUserProvider"
 *
 *
 */
public class OHadoopUserProvider extends UserProvider {
    public static final String SPARK_ES_PRINCIPAL = "es.spark.principal";
    public static final String SPARK_ES_KEYTAB = "es.spark.keytab";

    public KHadoopUserProvider() {
        super();
    }

    public static KHadoopUserProvider create(Settings settings) {
        KHadoopUserProvider provider = new KHadoopUserProvider();
        provider.setSettings(settings);
        return provider;
    }


    @Override
    public User getUser() {
        try {
            UserGroupInformation ugi = UserGroupInformation.getLoginUser();
            if (ugi == null || !ugi.hasKerberosCredentials()) {
                ugi = doLogin(settings);
            }
            return new HadoopUser(ugi, getSettings());
        } catch (IOException e) {
            throw new EsHadoopException("Could not retrieve the current user", e);
        }
    }

    private UserGroupInformation doLogin(Settings settings) throws IOException {
        Configuration krb_conf = new Configuration();
        krb_conf.set("hadoop.security.authentication", "kerberos");
        krb_conf.set(ConfigurationOptions.ES_SECURITY_AUTHENTICATION, "kerberos");
        String keytab = settings.getProperty(SPARK_ES_KEYTAB);
        String principal = settings.getProperty(SPARK_ES_PRINCIPAL);
        System.out.println("keytab: " + keytab + "; principal: " + principal);

        if (principal == null || keytab == null) {
            throw new RuntimeException("principal or keytabPath 参数不存在, 请配置 " + SPARK_ES_KEYTAB + " and  " + SPARK_ES_PRINCIPAL);
        }


        String keytabPath = SparkFiles.get(keytab);
        // 判断文件是否存在
        if (!new File(keytabPath).exists()) {
            throw new RuntimeException("executor端登陆失败, keytab文件(" + keytabPath + ")不存在, 请通过sparkContext.addFile将文件添加入executor节点");
        }

        System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
        if (!new File("/etc/krb5.conf").exists()) {
            throw new RuntimeException("executor登录失败, /etc/krb5.conf 文件不存在");
        }

        UserGroupInformation.setConfiguration(krb_conf);
        UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytabPath);
        UserGroupInformation.setLoginUser(ugi);
        
        return ugi;
    }
}

4、kerberos认证过程中出现的问题以及处理方案

问题1:

Caused by: GSSException: No valid credentials provided (Mechanism level: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)

解决1:_HOST 写死成ES 主机名,局限是:host 只能配置一台ip

问题2:

org.elasticsearch.hadoop.rest.EsHadooptransportException : Missing required negotiation token

org.elasticsearch.hadoop.rest.EsHadoopNodesLeftException: Connection error (check network and/or proxy settings) - all nodes failed; tried[[hostname]]

解决2: 使用官方推荐配置,HTTP/_HOST@REALM.NAME.HERE

 问题3:时钟同步问题;Java连接ES有时候也会出现同样错误

Caused by: org.elasticsearch.client.ResponseException: method [POST],host[http://test.com:9200],URI[/_bulk?timeout=1m],status line[HTTP/1.1 401 Unauthorized]
Caused by: ElasticsearchException[Elasticsearch exception [type=g_s_s_excpetion,reason=Failure unspecified at GSS-API level (Mechanism level: Request is a replay (34))]]; nested: ElasticsearchException[Elasticsearch exception [type=krb_ap_err_exception, reason=Request is a replay (34)]]

解决3: es的JVM 增加一个参数 -Dsun.security.krb5.rcache=none

5、参考文章

Kerberos | Elasticsearch for Apache Hadoop [master] | Elastic

https://community.cloudera.com/t5/Support-Questions/Solr-quot-Request-is-a-replay-quot-Ambari-Infra-Solr-2-5/m-p/212870

https://web.mit.edu/kerberos/krb5-devel/doc/basic/rcache_def.html

http://www.srcmini.com/20576.html

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐