要想使用spark生成hfile,然后再使用bulkload方式向HBase装载数据,你需要:

        1、首先用spark向hdfs写hfile;

        2、然后使用java代码调起bulkload程序吧hfile装载到habse数据库中。

但是在整个过程中,有以下几个点是需要注意的:

        1、在hbase表有预分区的情况下,调用 bulkLoader.doBulkLoad(hFilePath, admin, table, regionLocator)的时候,如果一个hfile文件内的数据不是刚好装载入一个分区的话,那么bulkload程序会自动split该hfile,所以,如果spark写文件时产生了很多小文件,同时要装载的数据量又非常大的话,bulkload程序会不断地进行split,导致数据装载极其缓慢,我曾经遇到一次装载300G数据花了好几个小时也装不进去。

        2、spark的sortByKey算子是针对整个rdd全局排序,做不到partition内排序,正好repartitionAndSortWithinPartitions算子可以做到分区内部排序,其运算过程跟hadoop的MapReduce是一样的。

基于以上几点,我们最终的实现思路是:比如即将装载数据的hbase表5000个预分区,我们的spark程序运行就有5000个partition,每个partition内部数据是有序的,spark程序最终写成5000个hfile文件,最后调用bulkLoader.doBulkLoad(hFilePath, admin, table, regionLocator)将会瞬间装载完成

tips:

        1、repartitionAndSortWithinPartitions是需要自定义一个Partitioner的;

        2、bulkload可以使用java api去调用,也可以在hadoop机器上使用hbase自带的命令行装载,这个地方不是本文讨论的重点,就网友自行查找了。

具体的实现请参考以下代码片段,如果想查看整个程序,可以GitHub链接上 https://github.com/Issac-Young/data-hub.git

package com.issac.studio.app.sink;

import com.issac.studio.app.entity.domain.Sink;
import com.issac.studio.app.entity.domain.config.sink.HBaseSinkConfig;
import com.issac.studio.app.entity.dto.ExternalParam;
import com.issac.studio.app.exception.NullException;
import com.issac.studio.app.exception.TypeException;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

import java.util.*;

/**
 * @description: 写HBASE
 * @file: HBaseSink
 * @author: issac.young
 * @date: 2021/5/11 9:29 上午
 * @since: v1.0.0
 * @copyright (C), 1992-2021, issac
 */
public class HBaseSink extends com.issac.studio.app.sink.Sink {
    private final static Logger log = LoggerFactory.getLogger(HBaseSink.class);

    @Override
    public void sink(Dataset<Row> ds, Sink sink, ExternalParam eParam) throws Exception {
        HBaseSinkConfig hbaseSinkConfig;
        if (sink != null) {
            if (sink.getSinkConfigEntity() instanceof HBaseSinkConfig) {
                hbaseSinkConfig = (HBaseSinkConfig) sink.getSinkConfigEntity();
            } else {
                String msg = String.format("写数据描述实体类型异常!预期是{%s}, 实际是{%s}", HBaseSinkConfig.class.getName(), sink.getSinkConfigEntity().getClass().getName());
                throw new TypeException(msg);
            }
        } else {
            String msg = "传入的数据源描述实体为null";
            throw new NullException(msg);
        }
        log.info("开始sink sinkId={}的数据", sink.getId());

        String tableName = hbaseSinkConfig.getTargetTable();
        String hFilePath = hbaseSinkConfig.gethFilePath();
        Integer isTruncate = hbaseSinkConfig.getIsTruncate();
        Integer isPreserve = hbaseSinkConfig.getIsPreserve();
        String hbaseConfig = hbaseSinkConfig.gethBaseConfig();

        // 这一步的操作每个人都会不一样,我的需求要求我这么做,反正最终得到一个JavaPairRDD就可以了
        JavaPairRDD<ImmutableBytesWritable, KeyValue> hFileRdd = ds.javaRDD()
                .flatMapToPair(new PairFlatMapFunction<Row, ImmutableBytesWritable, KeyValue>() {
                    @Override
                    public Iterator<Tuple2<ImmutableBytesWritable, KeyValue>> call(Row row) throws Exception {
                        String rowKey = row.getString(0); // 按照约定,第一个字段是rowKey

                        ArrayList<Tuple2<ImmutableBytesWritable, KeyValue>> list = new ArrayList<>();
                        for (int i = 1; i < row.length(); i++) {
                            String fieldName = row.schema().fields()[i].name();
                            String columnFamily = fieldName.split(":")[0];
                            String qualifier = fieldName.split(":")[1];
                            String value = String.valueOf(row.get(i));
                            KeyValue keyValue = new KeyValue(
                                    Bytes.toBytes(rowKey),
                                    Bytes.toBytes(columnFamily),
                                    Bytes.toBytes(qualifier),
                                    Bytes.toBytes(value));
                            list.add(new Tuple2<>(new ImmutableBytesWritable(Bytes.toBytes(rowKey)), keyValue));
                        }

                        return list.iterator();
                    }
                });

        Configuration conf = HBaseConfiguration.create();
        conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);
        HashMap<String, String> hBaseConfigMap = parseHBaseConfig(hbaseConfig);
        for (Map.Entry<String, String> entry : hBaseConfigMap.entrySet()) {
            log.info("HBaseConfiguration设置了{}={}", entry.getKey(), entry.getValue());
            conf.set(entry.getKey(), entry.getValue());
        }

        Connection connection = ConnectionFactory.createConnection(conf);
        Admin admin = connection.getAdmin();

        Job job = Job.getInstance();
        Table table = connection.getTable(TableName.valueOf(tableName));
        RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(tableName));
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(KeyValue.class);
        HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator);

        List<HRegionInfo> hRegionInfos = admin.getTableRegions(TableName.valueOf(tableName));
        if (hRegionInfos == null) {
            String msg = String.format("admin.getTableRegions(\"%s\")的结果为空,请检查该表是否存在。", tableName);
            throw new NullException(msg);
        }
        ArrayList<String> regionSplits = new ArrayList<>();
        for (HRegionInfo item : hRegionInfos) {
            regionSplits.add(new String(item.getEndKey()));
        }
        regionSplits.remove(regionSplits.size() - 1);
        log.info("HBase表{}的分区数组={}", tableName, regionSplits);

        if (regionSplits.size() > 0) {
            // 关键点就在这里,使用了repartitionAndSortWithinPartitions算子,网友可以仔细看看RegionPartitioner的内容,这里面的内容是从hadoop的MapReduce程序源码里面借鉴的
            JavaPairRDD<ImmutableBytesWritable, KeyValue> repartitioned =
                    hFileRdd.repartitionAndSortWithinPartitions(new RegionPartitioner(regionSplits.toArray(new String[regionSplits.size()])));
            repartitioned.saveAsNewAPIHadoopFile(hFilePath, ImmutableBytesWritable.class, KeyValue.class,
                    HFileOutputFormat2.class, conf);
        } else {
            hFileRdd.saveAsNewAPIHadoopFile(hFilePath, ImmutableBytesWritable.class, KeyValue.class,
                    HFileOutputFormat2.class, conf);
        }
        log.info("hfile文件已经写完!在{}目录下!", hFilePath);

        if (isTruncate == 1) {
            // 如果写数据钱需要清除原有表中数据,则执行以下内容
            if (admin.isTableDisabled(TableName.valueOf(tableName))) {
                log.info("HBASE表{}已经处于disable状态,即将进行truncate!", tableName);
            } else {
                log.info("HBASE表{}已经处于disable状态,即将进行disable!", tableName);
                admin.disableTable(TableName.valueOf(tableName));
                log.info("HBASE表{}执行disable成功,即将进行truncate!", tableName);
            }
            if (isPreserve == 1) {
                admin.truncateTable(TableName.valueOf(tableName), true);
                log.info("HBASE表{}执行truncate成功, 保留原有分区!", tableName);
            } else {
                admin.truncateTable(TableName.valueOf(tableName), false);
                log.info("HBASE表{}执行truncate成功, 不保留原有分区!", tableName);
            }
        }

        log.info("开始使用bulk装{}的hfile文件!", hFilePath);
        LoadIncrementalHFiles bulkLoader = new LoadIncrementalHFiles(conf);
        bulkLoader.doBulkLoad(new Path(hFilePath), admin, table, regionLocator);

        log.info("sink sinkId={}成功", sink.getId());
    }

    /**
     * 解析HBaseConfiguration需要参数,格式为:"hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily:3200,xxx.xxx.xx:323"
     * @param hBaseConfig : hBaseConfig
     * @author issac.young
     * @date 2021/5/12 9:14 上午
     * @return java.util.HashMap<java.lang.String, java.lang.String>
     */
    public HashMap<String, String> parseHBaseConfig(String hBaseConfig) {
        HashMap<String, String> map = new HashMap<>();
        if (StringUtils.isNotBlank(hBaseConfig)) {
            String[] keyValues = hBaseConfig.split(",");
            for (String item : keyValues) {
                String[] keyValue = item.split(":");
                map.put(keyValue[0], keyValue[1]);
            }
        }
        return map;
    }

    /**
     * 自定义spark的Partitioner,设计思想参考了hadoop中MapReduce
     */
    private static class RegionPartitioner extends Partitioner {
        private final String[] endKeys;
        private final int numPartitions;

        public RegionPartitioner(String[] endKeys) {
            this.endKeys = endKeys;
            this.numPartitions = endKeys.length + 1;
        }

        @Override
        public int numPartitions() {
            return this.numPartitions;
        }

        @Override
        public int getPartition(Object key) {
            if (this.endKeys.length == 0) {
                // 如果这个hbase表没有分区信息,则所有数据都写到一个文件里面
                // 经测试,当前情况下,这个partition里面的数据不会进行排序,所以调用RegionPartitioner的时候就避免走到这一步
                return 0;
            } else {
                byte[] keyBytes = ((ImmutableBytesWritable) key).copyBytes();
                String comparedKey = new String(keyBytes).substring(0, endKeys[0].length());
                for (int i = 0; i < this.endKeys.length; i++) {
                    if (comparedKey.compareTo(endKeys[i]) < 0) {
                        return i;
                    }
                }
                return endKeys.length;
            }
        }
    }
}

Logo

华为云1024程序员节送福利,参与活动赢单人4000元礼包,更有热门技术干货免费学习

更多推荐