目录

一、测试环境

二、测试

1、代码

2、输出

3、用时比较


一、测试环境

环境:

spark:2.2.0
Elasticsearch:7.14.0

主要maven:

       <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-spark-20_2.11</artifactId>
            <version>7.14.0</version>
        </dependency> 

数据:

    数据量:56126
    数据格式:
    {
        "_index" : "testes",
        "_type" : "_doc",
        "_id" : "W5pGpHsBmFEDko1-WBb7",
        "_score" : 1.0,
        "_source" : {
          "classId" : 1001,
          "name" : "zhangsan",
          "gender" : "F",
          "age" : 20,
          "birthday" : "1995-12-11 12:12:13"
        }
      }

二、测试

1、代码

package com.dt.spark.Test

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.elasticsearch.spark.rdd.EsSpark
import org.elasticsearch.spark.sql.EsSparkSQL

object EsSpad {

  case class Student(classId: Int, name: String, gender: String, age: Int, birthday: String)

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("saveToEsTest")
      .setMaster("local")
    sparkConf.set("es.nodes", "127.0.0.1")
    sparkConf.set("es.port", "9200")
    sparkConf.set("es.index.auto.create", "true")
    sparkConf.set("es.write.operation", "index")
    sparkConf.set("es.nodes.wan.only", "true")
    sparkConf.set("es.mapping.date.rich", "false")

    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    import spark.implicits._
    import org.apache.spark.sql.functions._

    /**
      * 方案一
      */
    var t1: Long = System.currentTimeMillis()
    val df01: DataFrame = spark.read.format("es").load(s"testes/_doc")
      .filter(col("name") === "zhangsan")
    var t2: Long = System.currentTimeMillis()
    println("======================:方案一时间:" + (t2 - t1) + "ms 查询总条数:" + df01.count())

    /**
      * 方案二
      */
    var t11: Long = System.currentTimeMillis()
    val df05: DataFrame = spark.read.format("es").load(s"testes/_doc")
    df05.createTempView("student")
    val df06: DataFrame = spark.sql("select * from student where name='zhangsan'")
    var t12: Long = System.currentTimeMillis()
    println("======================:方案二时间:" + (t12 - t11) + "ms 查询总条数:" + df06.count())

    val query: String =
      s"""
        {
         |  "query":{
         |    "match":{
         |      "name":"zhangsan"
         |    }
         |  }
         |}
         |""".stripMargin
    /**
      * 方案三
      */
    var t3: Long = System.currentTimeMillis()
    val df02: DataFrame = EsSparkSQL.esDF(spark, s"testes/_doc", query)
    var t4: Long = System.currentTimeMillis()
    var tt = (t4 - t3)
    println("======================:方案三时间:" + (t4 - t3) + "ms 查询总条数:" + df02.count())
    //df02.show(10)

    /**
      * 方案四
      */
    var t5: Long = System.currentTimeMillis()
    val data: RDD[(String, collection.Map[String, AnyRef])] = EsSpark.esRDD(sc, s"testes/_doc", query)
    var t6: Long = System.currentTimeMillis()
    println("======================:方案四时间:" + (t6 - t5) + "ms 查询总条数:" + data.count())

    //将Rdd转化为DF(方法一)
    var t7: Long = System.currentTimeMillis()
    val value: RDD[collection.Map[String, AnyRef]] = data.map(row => {
      row._2
    })
    val rdd: RDD[Row] = value.map(m => Row(m("classId").toString.toInt, m("name").toString, m("gender").toString, m("age").toString.toInt, m("birthday").toString))
    val schema: StructType = StructType(Array(StructField("classId", IntegerType, true), StructField("name", StringType, true), StructField("gender", StringType, true)
      , StructField("age", IntegerType, true), StructField("birthday", StringType, true)))
    val df03: DataFrame = spark.createDataFrame(rdd, schema)
    var t8: Long = System.currentTimeMillis()
    println("======================:方案四rdd转df(方法1)时间:" + (t8 - t7) + "ms 查询总条数:" + df03.count())
    //df03.show(10)

    //将Rdd转化为DF(方法二)
    var t9: Long = System.currentTimeMillis()
    val value1: RDD[collection.Map[String, AnyRef]] = data.map(row => {
      row._2
    })
    val df04: DataFrame = value1.map(m => Student(m("classId").toString.toInt, m("name").toString, m("gender").toString, m("age").toString.toInt, m("birthday").toString))
      .toDF()
    var t10: Long = System.currentTimeMillis()
    println("======================:方案四rdd转df(方法2)时间:" + (t10 - t9) + "ms 查询总条数:" + df04.count())
    //df04.show(10)

  }
}

2、输出

======================:方案一时间:1190ms 查询总条数:8018
======================:方案二时间:174ms 查询总条数:8018
======================:方案三时间:39ms 查询总条数:8018
======================:方案四时间:1ms 查询总条数:8018
======================:方案四rdd转df(方法1)时间:19ms 查询总条数:8018
======================:方案四rdd转df(方法2)时间:63ms 查询总条数:8018

3、用时比较

方案查询分析时间(ms)用时排名
方案一11901
方案二1742
方案三394
方案四(1)205
方案四(2)643

综上比较方案四(1)方案三方案最好,在spark+es的查询开发中优先选择这两种方案;

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐