一、hive外部表映射hbase表

1. hbase里面创建表

create 'B_TEST_STU', { NAME => 'info', COMPRESSION => 'SNAPPY' }

2. 建立hive外部表并映射hbase表

create database yh_test;
use yh_test;
create external table yh_test.stu(
id String, 
name String
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.hbase.HBaseSerDe'
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 
WITH SERDEPROPERTIES ("hbase.columns.mapping" = 
":key,
info:name,
info:gender
") TBLPROPERTIES("hbase.table.name" ="B_TEST_STU");

3.测试

在hive表插入数据

insert into table stu  values ('001','zhangsan','1');

去hbase表查看数据是否生成

scan 'hb_stu'

二、spark bulkload方式

参考官网
分为两个步骤:

  1. 生成hfile文件格式数据
  2. 将生成的hfile文件移动到hbase对应位置

1.准备事项

  1. hdfs、hive、hbase、zookeeper、spark集群均可正常使用
  2. 将hive的hive-site.xml文件复制到$SPARK_HOME/conf目录、项目的resource目录下(保证spark能够访问到外部hive)
  3. 将对应保存元数据数据库的驱动添加到项目resource目录下

在这里插入图片描述

2.pom文件

<properties>
        <hadoop-version>2.6.0</hadoop-version>
        <hive-version>1.1.0</hive-version>
        <hbase-version>1.2.0</hbase-version>
        <spark.version>2.3.3</spark.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.12</version>
        </dependency>

        <!-- Spark Dependencies -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>${hbase-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase-version}</version>
        </dependency>

        <!-- 指定hadoop-client API的版本 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-hbase-handler</artifactId>
            <version>${hive-version}</version>
        </dependency>
    </dependencies>

    <build>
        <pluginManagement>
            <plugins>
                <!-- 编译scala的插件 -->
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                </plugin>
                <!-- 编译java的插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5.1</version>
                </plugin>
            </plugins>
        </pluginManagement>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>


            <!-- 打jar插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

3.代码

package com.yh

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.sql.SparkSession

import java.net.URI

/**
 *
spark-on-yarn提交方式

 bin/spark-submit \
--class com.yh.BulkloadTest \
--master yarn \
--executor-memory 1G \
--total-executor-cores 2 \
--num-executors 2 \
hdfs://node01:8020/spark_bulkload_hbase-1.0-SNAPSHOT.jar \

 */
object BulkloadTest {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("BulkloadTest")
      .master("local[*]")
      .enableHiveSupport()
      .getOrCreate()
    //从hive中读取数据,数据是在hdfs上,hive是个外部表,你也可以用内部表,都一样
    spark.sql("use yh_test")
    spark.sql("select *,id rowkey from stu").show()
    val Data = spark.sql("select *,id rowkey from stu")

    val dataRdd = Data.rdd.flatMap(row => { //cf是列族名,ID、DATA_TYPE、DEVICE_ID为字段名
      val rowkey = row.getAs[String]("rowkey".toLowerCase)
      Array(
        (rowkey, ("info", "id", row.getAs[String]("id"))),
        (rowkey, ("info", "name", row.getAs[String]("name")))
      )
    })
    //要保证行键,列族,列名的整体有序,必须先排序后处理,防止数据异常过滤rowkey
    val rdds = dataRdd.filter(x => x._1 != null).sortBy(x => (x._1, x._2._1, x._2._2)).map(x => {
      //将rdd转换成HFile需要的格式,Hfile的key是ImmutableBytesWritable,那么我们定义的RDD也是要以ImmutableBytesWritable的实例为key
      //KeyValue的实例为value
      val rowKey = Bytes.toBytes(x._1)
      val family = Bytes.toBytes(x._2._1)
      val colum = Bytes.toBytes(x._2._2)
      val value = Bytes.toBytes(x._2._3)
      (new ImmutableBytesWritable(rowKey), new KeyValue(rowKey, family, colum, value))
    })

    //临时文件保存位置,在hdfs上
    print(" 临时文件保存位置 hdfs://node01:8020/tmp/test2")
    val tmpdir = "hdfs://node01:8020/tmp/test2"

    val hconf = new Configuration()
    hconf.set("fs.defaultFS", "hdfs://node01:8020")

    val fs = FileSystem.get(new URI("hdfs://node01:8020"), hconf, "hadoop") //hadoop为你的服务器用户名
    if (fs.exists(new Path(tmpdir))) { //由于生成Hfile文件的目录必须是不存在的,所以我们存在的话就把它删除掉
      println("删除临时文件夹")
      fs.delete(new Path(tmpdir), true)
    }

    //创建HBase的配置
    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", "node01,node02,node03")
    conf.set("hbase.zookeeper.property.clientPort", "2181")

    //为了预防hfile文件数过多无法进行导入,设置该参数值
    conf.setInt("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily", 5000)
    //此处运行完成之后,在tmpdir生成的Hfile文件
    rdds.saveAsNewAPIHadoopFile(tmpdir,
      classOf[ImmutableBytesWritable],
      classOf[KeyValue],
      classOf[HFileOutputFormat2],
      conf)
    //开始HFile导入到Hbase
    val load = new LoadIncrementalHFiles(conf)
    //hbase的表名
    val tableName = "B_TEST_STU"

    //创建hbase的链接,利用默认的配置文件,实际上读取的hbase的master地址
    val conn = ConnectionFactory.createConnection(conf)
    //根据表名获取表
    val table = conn.getTable(TableName.valueOf(tableName))

    try {

      //获取hbase表的region分布
      val regionLocator = conn.getRegionLocator(TableName.valueOf(tableName))
      println("获取hbase表的region分布:   "+regionLocator)
      //创建一个hadoop的mapreduce的job
      val job = Job.getInstance(conf)

      //设置job名称,随便起一个就行
      job.setJobName("bulkload_stu_test")

      //此处最重要,需要设置文件输出的key,因为我们要生成HFil,所以outkey要用ImmutableBytesWritable
      job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])

      //输出文件的内容KeyValue
      job.setMapOutputValueClass(classOf[KeyValue])
      print("配置HFileOutputFormat2的信息-----开始导入----------")
      //配置HFileOutputFormat2的信息
      HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator)
      print("开始导入----------")
      //开始导入
      load.doBulkLoad(new Path(tmpdir), conn.getAdmin, table, regionLocator)
      print("结束导入----------")
    } finally {
      table.close()
      conn.close()
    }
  }
}

上面是基于hive1.x
后面我又基于hive2.x实现了bulkload方式导入。

<properties>
        <hadoop-version>2.6.5</hadoop-version>
        <hive-version>2.3.2</hive-version>
        <hbase-version>1.2.4</hbase-version>
        <spark-version>2.0.0</spark-version>
        <scala-version>2.11.8</scala-version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala-version}</version>
        </dependency>

        <!-- Spark Dependencies -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>${spark-version}</version>
        </dependency>

        <!--连接连接hive元数据的服务-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive-thriftserver_2.11</artifactId>
            <version>${spark-version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>${hbase-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase-version}</version>
        </dependency>

        <!-- 指定hadoop-client API的版本 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop-version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-hbase-handler</artifactId>
            <version>${hive-version}</version>
        </dependency>
    </dependencies>

    <build>
        <pluginManagement>
            <plugins>
                <!-- 编译scala的插件 -->
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                </plugin>
                <!-- 编译java的插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5.1</version>
                </plugin>
            </plugins>
        </pluginManagement>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <!-- 打jar插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

整体上差不多。有一个小错误:
在这里插入图片描述
解决方式,指定spark.sql.warehouse.dir:

val spark = SparkSession
      .builder()
      .master("local[*]")
      .appName("SavingDetail")
      .config("spark.sql.warehouse.dir","/spark-warehouse/")
      .enableHiveSupport()
      .getOrCreate()

执行main方法即可将hive数据通过bulkload方式导入数据。

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐