hive数据导入hbase
将hive数据导入hbase一、hive外部表映射hbase表1. hbase里面创建表2. 建立hive外部表并映射hbase表3.测试spark bulkload方式准备事项pom文件代码一、hive外部表映射hbase表1. hbase里面创建表create 'B_TEST_STU', { NAME => 'info', COMPRESSION => 'SNAPPY' }2. 建
·
将hive数据导入hbase
一、hive外部表映射hbase表
1. hbase里面创建表
create 'B_TEST_STU', { NAME => 'info', COMPRESSION => 'SNAPPY' }
2. 建立hive外部表并映射hbase表
create database yh_test;
use yh_test;
create external table yh_test.stu(
id String,
name String
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.hbase.HBaseSerDe'
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" =
":key,
info:name,
info:gender
") TBLPROPERTIES("hbase.table.name" ="B_TEST_STU");
3.测试
在hive表插入数据
insert into table stu values ('001','zhangsan','1');
去hbase表查看数据是否生成
scan 'hb_stu'
二、spark bulkload方式
参考官网
分为两个步骤:
- 生成hfile文件格式数据
- 将生成的hfile文件移动到hbase对应位置
1.准备事项
- hdfs、hive、hbase、zookeeper、spark集群均可正常使用
- 将hive的hive-site.xml文件复制到$SPARK_HOME/conf目录、项目的resource目录下(保证spark能够访问到外部hive)
- 将对应保存元数据数据库的驱动添加到项目resource目录下 ‘
2.pom文件
<properties>
<hadoop-version>2.6.0</hadoop-version>
<hive-version>1.1.0</hive-version>
<hbase-version>1.2.0</hbase-version>
<spark.version>2.3.3</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.12</version>
</dependency>
<!-- Spark Dependencies -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase-version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase-version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase-version}</version>
</dependency>
<!-- 指定hadoop-client API的版本 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop-version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-hbase-handler</artifactId>
<version>${hive-version}</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<!-- 编译scala的插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<!-- 编译java的插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打jar插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
3.代码
package com.yh
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.sql.SparkSession
import java.net.URI
/**
*
spark-on-yarn提交方式
bin/spark-submit \
--class com.yh.BulkloadTest \
--master yarn \
--executor-memory 1G \
--total-executor-cores 2 \
--num-executors 2 \
hdfs://node01:8020/spark_bulkload_hbase-1.0-SNAPSHOT.jar \
*/
object BulkloadTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("BulkloadTest")
.master("local[*]")
.enableHiveSupport()
.getOrCreate()
//从hive中读取数据,数据是在hdfs上,hive是个外部表,你也可以用内部表,都一样
spark.sql("use yh_test")
spark.sql("select *,id rowkey from stu").show()
val Data = spark.sql("select *,id rowkey from stu")
val dataRdd = Data.rdd.flatMap(row => { //cf是列族名,ID、DATA_TYPE、DEVICE_ID为字段名
val rowkey = row.getAs[String]("rowkey".toLowerCase)
Array(
(rowkey, ("info", "id", row.getAs[String]("id"))),
(rowkey, ("info", "name", row.getAs[String]("name")))
)
})
//要保证行键,列族,列名的整体有序,必须先排序后处理,防止数据异常过滤rowkey
val rdds = dataRdd.filter(x => x._1 != null).sortBy(x => (x._1, x._2._1, x._2._2)).map(x => {
//将rdd转换成HFile需要的格式,Hfile的key是ImmutableBytesWritable,那么我们定义的RDD也是要以ImmutableBytesWritable的实例为key
//KeyValue的实例为value
val rowKey = Bytes.toBytes(x._1)
val family = Bytes.toBytes(x._2._1)
val colum = Bytes.toBytes(x._2._2)
val value = Bytes.toBytes(x._2._3)
(new ImmutableBytesWritable(rowKey), new KeyValue(rowKey, family, colum, value))
})
//临时文件保存位置,在hdfs上
print(" 临时文件保存位置 hdfs://node01:8020/tmp/test2")
val tmpdir = "hdfs://node01:8020/tmp/test2"
val hconf = new Configuration()
hconf.set("fs.defaultFS", "hdfs://node01:8020")
val fs = FileSystem.get(new URI("hdfs://node01:8020"), hconf, "hadoop") //hadoop为你的服务器用户名
if (fs.exists(new Path(tmpdir))) { //由于生成Hfile文件的目录必须是不存在的,所以我们存在的话就把它删除掉
println("删除临时文件夹")
fs.delete(new Path(tmpdir), true)
}
//创建HBase的配置
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "node01,node02,node03")
conf.set("hbase.zookeeper.property.clientPort", "2181")
//为了预防hfile文件数过多无法进行导入,设置该参数值
conf.setInt("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily", 5000)
//此处运行完成之后,在tmpdir生成的Hfile文件
rdds.saveAsNewAPIHadoopFile(tmpdir,
classOf[ImmutableBytesWritable],
classOf[KeyValue],
classOf[HFileOutputFormat2],
conf)
//开始HFile导入到Hbase
val load = new LoadIncrementalHFiles(conf)
//hbase的表名
val tableName = "B_TEST_STU"
//创建hbase的链接,利用默认的配置文件,实际上读取的hbase的master地址
val conn = ConnectionFactory.createConnection(conf)
//根据表名获取表
val table = conn.getTable(TableName.valueOf(tableName))
try {
//获取hbase表的region分布
val regionLocator = conn.getRegionLocator(TableName.valueOf(tableName))
println("获取hbase表的region分布: "+regionLocator)
//创建一个hadoop的mapreduce的job
val job = Job.getInstance(conf)
//设置job名称,随便起一个就行
job.setJobName("bulkload_stu_test")
//此处最重要,需要设置文件输出的key,因为我们要生成HFil,所以outkey要用ImmutableBytesWritable
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
//输出文件的内容KeyValue
job.setMapOutputValueClass(classOf[KeyValue])
print("配置HFileOutputFormat2的信息-----开始导入----------")
//配置HFileOutputFormat2的信息
HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator)
print("开始导入----------")
//开始导入
load.doBulkLoad(new Path(tmpdir), conn.getAdmin, table, regionLocator)
print("结束导入----------")
} finally {
table.close()
conn.close()
}
}
}
上面是基于hive1.x
后面我又基于hive2.x实现了bulkload方式导入。
<properties>
<hadoop-version>2.6.5</hadoop-version>
<hive-version>2.3.2</hive-version>
<hbase-version>1.2.4</hbase-version>
<spark-version>2.0.0</spark-version>
<scala-version>2.11.8</scala-version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala-version}</version>
</dependency>
<!-- Spark Dependencies -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark-version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark-version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark-version}</version>
</dependency>
<!--连接连接hive元数据的服务-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_2.11</artifactId>
<version>${spark-version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase-version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase-version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase-version}</version>
</dependency>
<!-- 指定hadoop-client API的版本 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop-version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop-version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop-version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-hbase-handler</artifactId>
<version>${hive-version}</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<!-- 编译scala的插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<!-- 编译java的插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打jar插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
整体上差不多。有一个小错误:
解决方式,指定spark.sql.warehouse.dir:
val spark = SparkSession
.builder()
.master("local[*]")
.appName("SavingDetail")
.config("spark.sql.warehouse.dir","/spark-warehouse/")
.enableHiveSupport()
.getOrCreate()
执行main方法即可将hive数据通过bulkload方式导入数据。
更多推荐
已为社区贡献2条内容
所有评论(0)