【Spark+Es】Spark多方案读取Es性能比较
目录一、测试环境二、测试1、代码2、输出3、用时比较一、测试环境环境:spark:2.2.0Elasticsearch:7.14.0主要maven:<dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch-spark-20_2.11</artifactId
·
目录
一、测试环境
环境:
spark:2.2.0
Elasticsearch:7.14.0
主要maven:
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>7.14.0</version>
</dependency>
数据:
数据量:56126
数据格式:
{
"_index" : "testes",
"_type" : "_doc",
"_id" : "W5pGpHsBmFEDko1-WBb7",
"_score" : 1.0,
"_source" : {
"classId" : 1001,
"name" : "zhangsan",
"gender" : "F",
"age" : 20,
"birthday" : "1995-12-11 12:12:13"
}
}
二、测试
1、代码
package com.dt.spark.Test
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.elasticsearch.spark.rdd.EsSpark
import org.elasticsearch.spark.sql.EsSparkSQL
object EsSpad {
case class Student(classId: Int, name: String, gender: String, age: Int, birthday: String)
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("saveToEsTest")
.setMaster("local")
sparkConf.set("es.nodes", "127.0.0.1")
sparkConf.set("es.port", "9200")
sparkConf.set("es.index.auto.create", "true")
sparkConf.set("es.write.operation", "index")
sparkConf.set("es.nodes.wan.only", "true")
sparkConf.set("es.mapping.date.rich", "false")
val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
import spark.implicits._
import org.apache.spark.sql.functions._
/**
* 方案一
*/
var t1: Long = System.currentTimeMillis()
val df01: DataFrame = spark.read.format("es").load(s"testes/_doc")
.filter(col("name") === "zhangsan")
var t2: Long = System.currentTimeMillis()
println("======================:方案一时间:" + (t2 - t1) + "ms 查询总条数:" + df01.count())
/**
* 方案二
*/
var t11: Long = System.currentTimeMillis()
val df05: DataFrame = spark.read.format("es").load(s"testes/_doc")
df05.createTempView("student")
val df06: DataFrame = spark.sql("select * from student where name='zhangsan'")
var t12: Long = System.currentTimeMillis()
println("======================:方案二时间:" + (t12 - t11) + "ms 查询总条数:" + df06.count())
val query: String =
s"""
{
| "query":{
| "match":{
| "name":"zhangsan"
| }
| }
|}
|""".stripMargin
/**
* 方案三
*/
var t3: Long = System.currentTimeMillis()
val df02: DataFrame = EsSparkSQL.esDF(spark, s"testes/_doc", query)
var t4: Long = System.currentTimeMillis()
var tt = (t4 - t3)
println("======================:方案三时间:" + (t4 - t3) + "ms 查询总条数:" + df02.count())
//df02.show(10)
/**
* 方案四
*/
var t5: Long = System.currentTimeMillis()
val data: RDD[(String, collection.Map[String, AnyRef])] = EsSpark.esRDD(sc, s"testes/_doc", query)
var t6: Long = System.currentTimeMillis()
println("======================:方案四时间:" + (t6 - t5) + "ms 查询总条数:" + data.count())
//将Rdd转化为DF(方法一)
var t7: Long = System.currentTimeMillis()
val value: RDD[collection.Map[String, AnyRef]] = data.map(row => {
row._2
})
val rdd: RDD[Row] = value.map(m => Row(m("classId").toString.toInt, m("name").toString, m("gender").toString, m("age").toString.toInt, m("birthday").toString))
val schema: StructType = StructType(Array(StructField("classId", IntegerType, true), StructField("name", StringType, true), StructField("gender", StringType, true)
, StructField("age", IntegerType, true), StructField("birthday", StringType, true)))
val df03: DataFrame = spark.createDataFrame(rdd, schema)
var t8: Long = System.currentTimeMillis()
println("======================:方案四rdd转df(方法1)时间:" + (t8 - t7) + "ms 查询总条数:" + df03.count())
//df03.show(10)
//将Rdd转化为DF(方法二)
var t9: Long = System.currentTimeMillis()
val value1: RDD[collection.Map[String, AnyRef]] = data.map(row => {
row._2
})
val df04: DataFrame = value1.map(m => Student(m("classId").toString.toInt, m("name").toString, m("gender").toString, m("age").toString.toInt, m("birthday").toString))
.toDF()
var t10: Long = System.currentTimeMillis()
println("======================:方案四rdd转df(方法2)时间:" + (t10 - t9) + "ms 查询总条数:" + df04.count())
//df04.show(10)
}
}
2、输出
======================:方案一时间:1190ms 查询总条数:8018
======================:方案二时间:174ms 查询总条数:8018
======================:方案三时间:39ms 查询总条数:8018
======================:方案四时间:1ms 查询总条数:8018
======================:方案四rdd转df(方法1)时间:19ms 查询总条数:8018
======================:方案四rdd转df(方法2)时间:63ms 查询总条数:8018
3、用时比较
方案 | 查询分析时间(ms) | 用时排名 |
方案一 | 1190 | 1 |
方案二 | 174 | 2 |
方案三 | 39 | 4 |
方案四(1) | 20 | 5 |
方案四(2) | 64 | 3 |
综上比较方案四(1)和方案三方案最好,在spark+es的查询开发中优先选择这两种方案;
更多推荐
已为社区贡献5条内容
所有评论(0)