Spark连接ES实现kerberos认证
Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVERspark连接ES,kerberos认证Missing required negotiation tokenMechanism level: Request is a replay (34)
·
1、jar包
<!-- spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- elasticsearch -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>6.8.21</version>
</dependency>
<!-- hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.8.3</version>
</dependency>
2、代码
package study
import org.apache.spark.SparkConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.elasticsearch.spark.sql.DefaultSource15
import scala.collection.mutable
object SparkToEs {
def main(args: Array[String]): Unit = {
initEsProperties(false)
}
def initEsProperties(useKerberos: Boolean): Unit = {
// 构建spark上下文
val conf = new SparkConf().setAppName("df").setMaster("local[2]")
val spark = SparkSession.builder()
.config(conf)
.getOrCreate()
val DOC_ID = "a1"
val esConfig = new mutable.HashMap[String, String]
var host = ""
esConfig += ("es.resource" -> "cool_es_test/cool_es_test_tb")
esConfig += ("es.write.operation" -> "index")
val httpUrls = "http://192.168.1.151:9200"
if (httpUrls.contains(",")) {
esConfig += ("es.nodes.wan.only" -> "true")
esConfig += ("es.nodes" -> httpUrls)
host = httpUrls.split(":")(1).substring(2)
} else {
esConfig += ("es.nodes.wan.only" -> "true")
esConfig += ("es.nodes" -> httpUrls.split(":")(1).substring(2))
esConfig += ("es.port" -> httpUrls.split(":")(2).replace("/", ""))
host = httpUrls.split(":")(1).substring(2)
}
esConfig += ("es.mapping.id" -> DOC_ID)
if (useKerberos) {
val KEYTAB = "D:\\workspace\\demo\\src\\main\\resources\\kerberos\\XXXXX.keytab"
val PRINCIPAL = "XXXXX"
val KRB5CONF = "D:\\workspace\\demo\\src\\main\\resources\\kerberos\\krb5.conf"
var es_principal = "HTTP/" + host + "@EXAMPLE.COM"
esConfig += ("es.security.authencation" -> "kerberos")
esConfig += ("es.net.spnego.auth.elasticsearch.principal" -> es_principal)
esConfig += ("es.security.user.provider.class" -> "com.cool.kerberos.OHadoopUserProvider")
esConfig += ("es.spark.principal" -> PRINCIPAL)
esConfig += ("es.spark.keytab" -> KEYTAB)
spark.sparkContext.addFile(KEYTAB)
System.setProperty("http.auth.preference", "Kerberos")
System.setProperty("java.security.krb5.conf", KRB5CONF)
System.setProperty("sun.security.krb5.debug", "false")
System.setProperty("sun.security.spnego.debug", "false")
}
val ds: DefaultSource15 = new DefaultSource15
val someData = Seq(Row("2022-06-23 11:", 6, 10.6, "2022-10-27", "13:11:22", "2021-10-27T14:33:33"))
val someSchema = List(
StructField("a1", StringType, true),
StructField("a2", IntegerType, true),
StructField("a3", DoubleType, true),
StructField("a4", StringType, true),
StructField("a5", StringType, true),
StructField("a6", StringType, true)
)
val df = spark.createDataFrame(spark.sparkContext.parallelize(someData), StructType(someSchema))
df.show()
val writeModeMap = Map("append" -> SaveMode.Append, "overwrite" -> SaveMode.Overwrite)
ds.createRelation(spark.sqlContext, writeModeMap.getOrElse("append", SaveMode.Append), esConfig.toMap, df)
}
}
3、spark连接es,需要保证每个executor都有对应凭证连接ES
package com.cool.kerberos;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.spark.SparkFiles;
import org.elasticsearch.hadoop.EsHadoopException;
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.mr.security.HadoopUser;
import org.elasticsearch.hadoop.security.User;
import org.elasticsearch.hadoop.security.UserProvider;
import java.io.File;
import java.io.IOException;
/**
* 重写了HadoopUserProvider类
*
* 修改内容: 在获取ugi的时候,判断是否登录,如果没有就自动登录,并返回ugi,这样在每个executor节点中都会自动登录
* 用法: esConifg中配置 es.security.user.provider.class
* esConfig.put(ConfigurationOptions.ES_SECURITY_USER_PROVIDER_CLASS -> "com.cool.kerberos.KHadoopUserProvider"
*
*
*/
public class OHadoopUserProvider extends UserProvider {
public static final String SPARK_ES_PRINCIPAL = "es.spark.principal";
public static final String SPARK_ES_KEYTAB = "es.spark.keytab";
public KHadoopUserProvider() {
super();
}
public static KHadoopUserProvider create(Settings settings) {
KHadoopUserProvider provider = new KHadoopUserProvider();
provider.setSettings(settings);
return provider;
}
@Override
public User getUser() {
try {
UserGroupInformation ugi = UserGroupInformation.getLoginUser();
if (ugi == null || !ugi.hasKerberosCredentials()) {
ugi = doLogin(settings);
}
return new HadoopUser(ugi, getSettings());
} catch (IOException e) {
throw new EsHadoopException("Could not retrieve the current user", e);
}
}
private UserGroupInformation doLogin(Settings settings) throws IOException {
Configuration krb_conf = new Configuration();
krb_conf.set("hadoop.security.authentication", "kerberos");
krb_conf.set(ConfigurationOptions.ES_SECURITY_AUTHENTICATION, "kerberos");
String keytab = settings.getProperty(SPARK_ES_KEYTAB);
String principal = settings.getProperty(SPARK_ES_PRINCIPAL);
System.out.println("keytab: " + keytab + "; principal: " + principal);
if (principal == null || keytab == null) {
throw new RuntimeException("principal or keytabPath 参数不存在, 请配置 " + SPARK_ES_KEYTAB + " and " + SPARK_ES_PRINCIPAL);
}
String keytabPath = SparkFiles.get(keytab);
// 判断文件是否存在
if (!new File(keytabPath).exists()) {
throw new RuntimeException("executor端登陆失败, keytab文件(" + keytabPath + ")不存在, 请通过sparkContext.addFile将文件添加入executor节点");
}
System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
if (!new File("/etc/krb5.conf").exists()) {
throw new RuntimeException("executor登录失败, /etc/krb5.conf 文件不存在");
}
UserGroupInformation.setConfiguration(krb_conf);
UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytabPath);
UserGroupInformation.setLoginUser(ugi);
return ugi;
}
}
4、kerberos认证过程中出现的问题以及处理方案
问题1:
Caused by: GSSException: No valid credentials provided (Mechanism level: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)
解决1:_HOST 写死成ES 主机名,局限是:host 只能配置一台ip
问题2:
org.elasticsearch.hadoop.rest.EsHadooptransportException : Missing required negotiation token
org.elasticsearch.hadoop.rest.EsHadoopNodesLeftException: Connection error (check network and/or proxy settings) - all nodes failed; tried[[hostname]]
解决2: 使用官方推荐配置,HTTP/_HOST@REALM.NAME.HERE
问题3:时钟同步问题;Java连接ES有时候也会出现同样错误
Caused by: org.elasticsearch.client.ResponseException: method [POST],host[http://test.com:9200],URI[/_bulk?timeout=1m],status line[HTTP/1.1 401 Unauthorized]
Caused by: ElasticsearchException[Elasticsearch exception [type=g_s_s_excpetion,reason=Failure unspecified at GSS-API level (Mechanism level: Request is a replay (34))]]; nested: ElasticsearchException[Elasticsearch exception [type=krb_ap_err_exception, reason=Request is a replay (34)]]
解决3: es的JVM 增加一个参数 -Dsun.security.krb5.rcache=none
5、参考文章
Kerberos | Elasticsearch for Apache Hadoop [master] | Elastic
https://web.mit.edu/kerberos/krb5-devel/doc/basic/rcache_def.html
更多推荐
已为社区贡献3条内容
所有评论(0)