写入分区表:

准备工作:先建好分区表

 

方法一:(使用dataframe)

写数据到数据所在的位置,因为hive分区的本质就是分文件夹,先用spark把数据写到文件夹位置,然后执行sql添加分区

1.写数据到文件夹
//df为DataFrame
 df.write.mode(SaveMode.Overwrite).format("parquet")
      .partitionBy("day" , "dev_platform" ).save(outputPath)

2.寻找刚刚数据新建的分区, 并添加hive分区
    val hivePartitions: util.List[Row] = df.select($"day", $"dev_platform").distinct().collectAsList()
    hivePartitions.toArray().foreach(r => {
      val Row(day, dev_platform) = r
      spark.sql( s"ALTER TABLE collectlog.login_origin ADD IF NOT EXISTS PARTITION (day=$day, dev_platform=$dev_platform)")
    })

方法三:

spark接kafka数据,然后使用sparksql写入spark内存临时表,再使用sparksql(insert into select)的方式写入hive表

val session = SparkSession.builder().appName("WarehouseInventoryByNewMysqlSnap").enableHiveSupport().getOrCreate()	
val sc: SparkContext=session.sparkContext	
session.sql("use bi_work")	
import session.implicits._	
val data = sc.textFile("path").map(x=>x.split("\\s+")).map(x=>Person(x(0),x(1).toInt,x(2)))	
data.toDF().registerTempTable("table1")	
session.sql("insert into table2 partition(date='2015-04-02') select name,col1,col2 from table1")

性能测试:三台spark机器,原始配置,kafka3机器3分区,2158w数据,历时137s

 

方法四:

sparkstreaming消费kafka入hive的非分区表,然后使用sparksql把非分区表数据入到分区表里去。

 

方法五:

这种方法不需要开发代码

kafka -> flume ->hdfs落盘 -> hive

 

方法六:

一个程序实现从kafka读取数据到spark再到hive分区表,没有用到临时表

其中这两行代码

sqlContext.setConf("hive.exec.dynamic.partition.mode","nonstrict")

sqlContext.setConf("hive.exec.dynamic.partition","true")

保证了kafka的数据正确落到hive表正确的分区

这个方法和方法七一样,方法七是我手动实践的,方法六是网上大神写的

object KafkaToHive{
	def main(args: Array[String]){
		val sparkConf = new SparkConf().setAppName("KafkaToHive")
		val sc = new SparkContext(sparkConf)
		val ssc = new StringContext(sc,Seconds(60))
		// 创建kafka参数
		val kafkaParams = Map[String,Object](
			//ip为kafka集群ip,端口为集群端口
			"bootstrap.servers" -> "ip1:port1,ip2:port2,ip:port3",
			"group.id" -> "KafkaToHive_group1",  //自定义组名称
			"auto.offset.reset" -> "earliest",
			"enable.auto.commit" -> "false")
		val topics = Array("test1")
		val stream = KafkaUtils.createDirectStreaming[String,String](
			ssc,PreferConsistent,
			Subscribe[String,String](topics,kafkaParms)
		stream.foreachRDD(rdd=>{
			if(rdd.count>0){
				val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
				//TODO 具体处理逻辑
				//写入Hive
				//value为实际操作中的结果集,即是//TODO返回的结果集
				val subRdd = rdd.sparkContext.parallelize(value)
				val sqlContext : SQLContext = new HiveContext(rdd.sparkContext)
				sqlContext.setConf("hive.exec.dynamic.partition.mode","nonstrict")
				sqlContext.setConf("hive.exec.dynamic.partition","true")	
                sqlContext.sql("use database1")
		     	val tempTable = sqlContext
		     	.read
		     	.format("json")
		     	.json(subRdd)
		     	.select(cols.map(new Column(_)): _*)
		     	.coalesce(1)
		     	.write
		     	.mode(SaveMode.Append)
		     	.insertInto("task_exec_time")
		        //提交offset
		       stream.asInstanceOf[CanCommitOffsets].commotAsync(offsetRanges)
		}
	})
}

方法七:

网上还有一种方式是df.write.mode(SaveMode.Overwrite).format("orc").partitionBy("day" , "dev_platform" ).insertInto("table") 

但是通过实践,insertinto分区表不能和partitionby一起用,所以正确用法是

 

df.write.mode(SaveMode.Overwrite).format("orc").insertInto("table") 

在执行这个代码之前,我先把表创建好,把用到的分区创建好

然后制造了两批数据到kafka,分别属于两个分区,然后执行代码,数据会进入相应的分区(原因是我的hive和spark修改了下面两个参数),以orc格式存储,虽然数据都在一次RDD处理内,但是却生成了多个小文件,估计是因为两个分区的数据在kafka里搀着,所以会这样

sqlContext.setConf("hive.exec.dynamic.partition.mode","nonstrict")
sqlContext.setConf("hive.exec.dynamic.partition","true")

性能测试:

2126w数据,kafka3机器3分区,spark3个excutor,数据有两个分区各1000w,140s

 

由于是分区表,分区字段和表不一样,应该不能直接插,而且插入的格式都需要测试是否支持。

我看了一下dataframe的源码

df.write.mode(SaveMode.Overwrite).format("parquet")
      .partitionBy("day" , "dev_platform" ).save(outputPath)

在这么写入时源码写了这几种支持的格式

mode只支持以下几种

/**
   * Specifies the behavior when data or table already exists. Options include:
   *   - `overwrite`: overwrite the existing data.
   *   - `append`: append the data.
   *   - `ignore`: ignore the operation (i.e. no-op).
   *   - `error` or `errorifexists`: default option, throw an exception at runtime.
   *
   * @since 1.4.0
   */
  def mode(saveMode: String): DataFrameWriter[T] = {...}

format仅支持这几种格式

/**
   * Specifies the underlying output data source. Built-in options include "parquet", "json", etc.
   *
   * @since 1.4.0
   */
  def format(source: String): DataFrameWriter[T] = {...}

dataframe与hive相关的api:

registerTempTable(tableName:String):Unit,	
inserInto(tableName:String):Unit	
insertInto(tableName:String,overwrite:Boolean):Unit	
saveAsTable(tableName:String,source:String,mode:SaveMode,options:Map[String,String]):Unit

registerTempTable函数是创建spark临时表
insertInto函数是向表中写入数据,可以看出此函数不能指定数据库和分区等信息,不可以直接写入。
向hive数据仓库写入数据必须指定数据库,hive数据表建立可以在hive上建立,或者使用hiveContext.sql("create table .....")

 

 

 

写入非分区表:

方法一:

创建一个case类将RDD中数据类型转为case类类型,然后通过toDF转换DataFrame,调用insertInto函数时,首先指定数据库,使用的是hiveContext.sql("use DataBaseName") 语句,就可以将DataFrame数据写入hive数据表中了。

case class Person(name:String,col1:Int,col2:String)	
	
val sc = new org.apache.spark.SparkContext   	
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)	
	
import hiveContext.implicits._	
hiveContext.sql("use DataBaseName")	
val data = sc.textFile("path").map(x=>x.split("\\s+")).map(x=>Person(x(0),x(1).toInt,x(2)))	
data.toDF().insertInto("tableName")

 

疑问:

1.如果使用kafka到spark内存表到hive这种方法,写内存表用的是createOrRepace,而spark任务是并行的,如果每个机器的处理速度不一样,会不会导致机器1先执行完一轮任务,等到机器1执行第二轮的createOrRepace时,机器2还在读老表,这样岂不是表数据会被覆盖,从而丢数据吗?

答:事实证明并不会丢数据

 

会出现的问题:

1.权限问题,spark在insert hive表select 内存表时,会出现错误,报错无法移动文件

Unable to move source hdfs://hdfs-ha/warehouse/tablespace/managed/hive/yisa_oe.db/test_vehicle/.hive-staging_hive_2021-03-02_17-16-31_106_8814286242601465788-1/-ext-10000 to destination hdfs://hdfs-ha/warehouse/tablespace/managed/hive/yisa_oe.db/test_vehicle/capture_date=2020-03-02;

Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to move source hdfs://hdfs-ha/warehouse/tablespace/managed/hive/yisa_oe.db/test_vehicle/.hive-staging_hive_2021-03-02_17-16-31_106_8814286242601465788-1/-ext-10000 to destination hdfs://hdfs-ha/warehouse/tablespace/managed/hive/yisa_oe.db/test_vehicle/capture_date=2020-03-02

Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: org.apache.hadoop.hive.ql.metadata.HiveException: Load Data failed for hdfs://hdfs-ha/warehouse/tablespace/managed/hive/yisa_oe.db/test_vehicle/.hive-staging_hive_2021-03-02_17-16-31_106_8814286242601465788-1/-ext-10000 as the file is not owned by spark and load data is also not ran as spark

解决:

需要在spark hive-site.xml添加hive.load.data.owner=spark (具体的执行用户),我跑这个程序的时候用的是root用户,所以报错,我切换到spark用户,再执行这个程序,问题解决。

 

 

2.数据问题
问题描述:
最开始创建test_vehicle表以后,我在hive的命令行用insert values方式插入了1条数据,然后用spark用户执行方法三写入了10条数据,然后用root用户执行方法三写入了10条数据。
用hive命令行查询这个表,select count(*) from table 结果显示是1
用hive命令行查询这个表,select * from table 结果显示是21条
用spark-shell命令行查询这个表,select count(*) from table 结果显示是10
用spark-shell命令行查询这个表,select * from table 结果显示是10条,只显示了第一次spark用户插的那10条数据
本以为是权限问题,但是上HDFS文件系统看了一下,root用户和hive用户还有spark用户的用户组都是hadoop,而且分区文件夹和分区数据都对组内用户有读权限,应该不是这个问题。
目录结构是这样的:
表文件夹
       分区文件夹1
       	         _sucess
                 part-000-xxx  (root用户写的)
                 part-000-xxxxx  (spark用户写的)
       分区文件夹2(通过hive客户端插入的分区数据)
                 -ext-10000
                           0000_0

解决:

用spark和root用户分别插数据到hive,之所以用spark-shell命令查询数量不对,是因为spark-shell只加载了启动时数据。只要关掉spark-shell然后再启动一遍,就可以查到20条数据了。这里牵扯到spark session的问题,每次关闭spark-shell再打开太麻烦,可以用这个命令刷新元数据:

 

spark.catalog.refreshTable("yisa_oe.test_vehicle")

至于hive中select * 和select count(*) 数据量不一致的问题,是因为hive的一个参数设置:

 

Compute simple queries using stats only 参数设为false
hive.compute.query.using.stats=false

把这几个参数设置为false以后,hive的查询数据量就一致了。

剩下的问题就是spark查不到hive插入的数据,而hive能查spark插入的数据了

我认为是文件夹的原因,然后再spark已有数据分区上,用hive插入了一条这个分区的数据,这时hive不会再新建那个ext-1000文件夹了,数据是和spark插的数据同级的,所以这种情况用spark能查到hive的插的数据。

使用sparksql执行 show partitions test_vehicle,分区显示的也是正确的分区数量

所以需要解决的就是hive插数据会创建文件夹的问题,经过测试,在hive中只有使用insert into table values(xxx)时才会创建多余文件夹,如果使用insert select 方式不会创建多级目录。

至于insert values这种方式怎么办,留一个尾巴以后解决。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐