初次写入分区数据

spark写入hive分区表时,如果数据表事先不存在,可以选择手动建表,可以使用以下代码写入数据,会自动创建数据表:

df = spark.createDataFrame([(1, "alice", "20220412"), (2, "bob", "20220412")], ["id", "name", "date"])
df.show()
df.write.format("orc").mode("overwrite").partitionBy("date").saveAsTable("test.test_part")

此时去读取数据表内容,就能看到date分区为20220412的数据。

非初次写入数据

DSL方式

而在第二次写入数据表时,上面的写入方式就不能使用了,因为它会直接将整个表覆盖写入。例如你执行如下代码:

df = spark.createDataFrame([(1, "alice", "20220413"), (2, "bob", "20220413")], ["id", "name", "date"])
df.show()
df.write.format("orc").mode("overwrite").partitionBy("date").saveAsTable("test.test_part")

执行完之后查看数据,会发现之前插入的20220412分区的数据不见了。这是因为这种方式写入数据的时候,会将整个数据表删除,重新建一张数据表,再把新的数据写进去。

针对这种情况,我们可以采用以下的方式:

df = spark.createDataFrame([(1, "alice", "20220414"), (2, "bob", "20220414")], ["id", "name", "date"])
df.show()
df.write.format("orc").mode("overwrite").insertInto("test.test_part")

但是这种写入方式要想达到不覆盖整张数据表的目的,需要配置以下参数:

spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

关于这个参数可参考:sparkHivePartitionOverwrite

执行完上述代码以后,我们查看数据表test.test_part,会看到20220413和20220414分区的数据都在。
注意:dataframe数据的列顺序不能发生改变,因为insertInto无法识别列名,无法根据列名来写入数据。

SQL方式

当然了,有些同学更习惯用sparksql的方式,那么也可以采用如下的方式写入数据:

df = spark.createDataFrame([(1, "alice", "20220415"), (2, "bob", "20220415")], ["id", "name", "date"])
df.show()
df.drop("date").createOrReplaceTempView("test_temp")

spark.sql("insert overwrite table test.test_part partition(date='20220415') select * from test_temp")

这种方式写入数据需要在中间创建一个临时视图,同时是不需要设置spark.sql.sources.partitionOverwriteMode参数的。
需要注意的是,在这种方式中,数据分区列不能包含在数据中,因为在sql语句中已经指定了分区列和分区值。同时需要注意数据列的顺序不能发生改变

判断数据表是否存在

对于第一次写入数据,我们可以手动在hive中创建数据表,也可以选择直接用文中的方法写入数据,让spark自动为我们创建数据表。但是对于第二次写入数据来说,数据表必须要提前存在,如果数据表事先不存在,那么就会报错提醒数据表找不到。
那么这里就涉及到一个条件语句:判断数据表是否存在,从而决定选用哪种写入数据的方式。
一开始笔者选择的是先获取数据表列表,然后判断某个表是否在这个列表中,方式如下:

tablelist = [i.name for i in spark.catalog.listTable("test")]

这种方式是从指定数据库中获取所有表名,然后判断表是否在列表中,逻辑上问题不大,执行速度也挺快的。但是这是建立在之前hive元数据存放在mysql数据库的前提下,最近项目有一些改动,将元数据从mysql迁移到了glue(亚马逊提供的元数据管理工具)中,在任务执行中发现,元数据的读取特别耗时。尤其是上述一个简单的读取表名列表就耗时一分钟以上。
于是在搜索了一番之后,笔者将上述代码改为了以下的方式:

is_exist = spark.sql("show tables in test like 'test_part'")
print(is_exist.count() == 0)

这种方式是采用sparksql的方式,在读取的时候并未将所有表均读出来,而是通过字符串匹配获取匹配到的数据表名。在执行速度上也大大提升,从之前一分半钟提升到了一秒以内出结果。这一改动使得整个项目几百个任务的执行时间节省了好几个小时。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐