经常听到有人讲:spark写hive 分区表时,原本想覆盖一个分区的数据,但因为错误的编码导致整个表的分区被覆盖。本文针对此问题进行测试。

1. 测试结论

蓝色字体表示推荐方式

  1. 需要指定如下参数:"spark.sql.sources.partitionOverwriteMode", "dynamic"  "hive.exec.dynamic.partition.mode", "nonstrict"

  2. saveAsTable方法会导致全表覆盖写,需要用insert into,详情下面的源代码

  3. insert into需要主要DataFrame列的顺序要和hive表里的顺序一致

操作方式效果
df1.write.mode("overwrite").partitionBy("year").saveAsTable(tableName)所有分区被覆盖
df1.write.mode("overwrite").format("Hive").partitionBy("year").saveAsTable(tableName)所有分区被覆盖
df1.write.option("partitionOverwriteMode", "dynamic").mode("append").insertInto(tableName)动态分区,如果分区存在则追加
df1.write.option("partitionOverwriteMode", "dynamic").mode("overwrite").insertInto(tableName)动态分区,如果分区存在则覆盖
INSERT OVERWRITE TABLE $tableName partition (year) SELECT * FROM tmp动态分区,如果分区存在则覆盖
INSERT INTO TABLE $tableName partition (year) SELECT * FROM tmp动态分区,如果分区存在则追加
df1.write.mode("overwrite").insertInto(tableName)动态分区,如果分区存在则覆盖
df1.write.mode("append").insertInto(tableName)动态分区,如果分区存在则追加
INSERT OVERWRITE TABLE $tableName partition (year=2024) SELECT * FROM tmp针对指定分区覆盖写入
INSERT INTO TABLE $tableName partition (year=2024) SELECT * FROM tmp针对指定分区追加数据

1.1. 参数说明

1.1.1. hive.exec.dynamic.partition.mode

hive.exec.dynamic.partition.mode: 默认值是strict,即不允许分区列全部是动态的,目的是为了防止用户有可能原意是只在子分区内进行动态建分区,但是由于疏忽忘记为主分区列指定值了,这将导致一个dml语句在短时间内创建大量的新的分区(对应大量新的文件夹),对系统性能带来影响。 如果指定为nonstrict,则允许所有的分区列都是动态分区列

insert overwrite table partition_test partition(stat_date='20110728',province) select member_id,name,province from partition_test_input where stat_date='20110728';

stat_date叫做静态分区列,province叫做动态分区列。select子句中需要把动态分区列按照分区的顺序写出来,静态分区列不用写出来。

这样stat_date='20110728'的所有数据,会根据province的不同分别插入到/user/hive/warehouse/partition_test/stat_date=20110728/下面的不同的子文件夹下,如果源数据对应的province子分区不存在,则会自动创建,非常方便,而且避免了人工控制插入数据与分区的映射关系存在的潜在风险。

注意,动态分区不允许主分区采用动态列而副分区采用静态列,这样将导致所有的主分区都要创建副分区静态列所定义的分区:

hive> insert overwrite table partition_test partition(stat_date,province='gd')  select member_id,name,province from partition_test_input where province='gd';
       FAILED: Error in semantic analysis: Line 1:48 Dynamic partition cannot be the parent of a static partition 'gd'

相关的参数如下:

  • hive.exec.dynamic.partition=true : 开启动态分区 
  • hive.exec.max.dynamic.partitions.pernode (缺省值100):每一个mapreduce job允许创建的分区的最大数量,如果超过了这个数量就会报错
  • hive.exec.max.dynamic.partitions (缺省值1000):一个dml语句允许创建的所有分区的最大数量
  • hive.exec.max.created.files (缺省值100000):所有的mapreduce job允许创建的文件的最大数量

1.1.2. spark.sql.sources.partitionOverwriteMode

When INSERT OVERWRITE a partitioned data source table, we currently support 2 modes: static and dynamic. In static mode, Spark deletes all the partitions that match the partition specification(e.g. PARTITION(a=1,b)) in the INSERT statement, before overwriting. In dynamic mode, Spark doesn't delete partitions ahead, and only overwrite those partitions that have data written into it at runtime. By default we use static mode to keep the same behavior of Spark prior to 2.3. Note that this config doesn't affect Hive serde tables, as they are always overwritten with dynamic mode. This can also be set as an output option for a data source using key partitionOverwriteMode (which takes precedence over this setting), e.g. dataframe.write.option("partitionOverwriteMode", "dynamic").save(path).

当INSERT OVERWRITE 一个已分区的数据源表时,我们目前支持两种模式:静态和动态在静态模式下,Spark会删除所有符合分区规格的分区。PARTITION(a=1,b))在INSERT语句中,然后overwriting。在动态模式下,Spark不会提前删除分区,只会在运行时覆盖那些已经写入数据的分区。默认情况下,我们使用静态模式来保持Spark 2.3之前的相同行为。注意,这个配置不会影响Hive serde表,因为它们总是被动态模式覆盖。也可以使用partitionOverwriteMode将其设置为数据源的输出选项(优先于此设置),例如dataframe.write.option("partitionOverwriteMode", "dynamic").save(path)

相关参数如下:

spark.sql.sources.partitionColumnTypeInference.enabled:如果为true,则自动推断已分区列的数据类型。

spark.sql.statistics.fallBackToHdfs:当为true时,如果表元数据中没有表的统计信息,它将返回到HDFS。这在确定一个表是否足够小到可以使用广播连接时非常有用。这个标志只对非分区的Hive表有效。对于非分区的数据源表,如果表统计信息不可用,将自动重新计算它。对于分区数据源和分区Hive表,如果表统计信息不可用,则为'spark.sql.defaultSizeInBytes'。

1.2. 查看hive分区命令

  1.  show partitions test_partition;
  2.  desc test_partition partition (year=2018)
  3. desc extended test_partition partition (year=2018)

2. 测试源代码 

object SparkHiveTest {

  def main(args: Array[String]): Unit = {
    testHivePartitionOverwrite();
  }

  def testHivePartitionOverwrite(): Unit = {
    val spark = SparkSession
      .builder()
      .appName("testHivePartitionOverwrite")
      .master("local[2]")
      // 必须替换为自己的hivemetastore service's thrift url
      .config("hive.metastore.uris", "thrift://172.25.20.170:9083")
      .config("spark.sql.parquet.writeLegacyFormat", true)
      .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
      .config("hive.exec.dynamic.partition.mode", "nonstrict")
      .enableHiveSupport()
      .getOrCreate()

    import spark.sql

    val data = Array(
      ("001", "张三", 21, "2017"), ("002", "李四", 18, "2018"),
      ("001", "王五", 21, "2019"), ("002", "赵六", 18, "2020"),
      ("001", "王二麻子", 21, "2021"), ("002", "马顺子", 18, "2022")
    )

    val df = spark.createDataFrame(data).toDF("id", "name", "age", "year")
    //创建临时表
    df.createOrReplaceTempView("temp_table")

    val tableName = "test_partition"
    //切换hive的数据库
    sql("use default")
    // 1、创建分区表,并写入数据
    df.write.mode("overwrite").partitionBy("year").saveAsTable(tableName)
    spark.table(tableName).show()

    val data1 = Array(("011", "李太白", 21, "2018"), ("011", "李商隐", 21, "2018"), ("011", "李清照", 21, "2023"))
    val df1 = spark.createDataFrame(data1).toDF("id", "name", "age", "year")
    //以下两种写法都会对全表覆盖
    //df1.write.mode("overwrite").partitionBy("year").saveAsTable(tableName)
    //df1.write.mode("overwrite").format("Hive").partitionBy("year").saveAsTable(tableName) //不成功,全表覆盖
//    df1.write.option("partitionOverwriteMode", "static").mode("overwrite").insertInto(tableName)
//    df1.write.option("partitionOverwriteMode", "dynamic").mode("append").insertInto(tableName)

    //以下两种写法都是动态分区,如果分区已经存在,则覆盖写入, 如果替换为append,则是追加写入
    //        df1.write.mode("overwrite").insertInto(tableName)
    //采用SQL方式,如果分区已经存在,则覆盖写入, 如果替换为INTO,则是追加写入
    df1.createOrReplaceTempView("tmp")
    val sqlStr =
      s"""
         |INSERT OVERWRITE TABLE $tableName partition (year)
         |SELECT * FROM tmp
      """.stripMargin
    sql(sqlStr);

    //指定分区写入
    //    val data2:Array[(String,String,Int)] = Array()
    //    val df2 = spark.createDataFrame(data2).toDF("id", "name", "age")
    //    df2.createOrReplaceTempView("tmp")
    //    val sqlStr =
    //      s"""
    //         |INSERT INTO TABLE $tableName partition (year=2026)
    //         |SELECT * FROM tmp
    //          """.stripMargin
    //    spark.sql(sqlStr);

    //打印表中的数据
    //    spark.table(tableName).show()
    sql("select * from test_partition").foreach(println(_))

    spark.stop
  }
}

参考

https://www.cnblogs.com/tree1123/p/13440952.html

https://www.cnblogs.com/liqiu/p/4095654.html

http://spark.apache.org/docs/2.4.7/configuration.html#spark-sql

https://spark.apache.org/docs/latest/configuration.html

 

 

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐