一、功能描述

本功能组件主要通过Java的API实现HBase的操作。利用log4j进行数据迁移过程的记录,采取批处理的方式实现数据迁移的过程。

技术实现

  1. 利用Java的API连接HBase数据库
  2. 利用log4j将执行信息进行输出,并捕获异常

二、依赖导入

首先,在Maven工程中的pom.xml的<dependencies></dependencies>中添加以下的依赖

<!--导入HBase客户端的依赖-->
<dependency>
	<groupId>org.apache.hbase</groupId>
	<artifactId>hbase-client</artifactId>
	<version>2.3.5</version>
</dependency>
<!--导入hadoop的公共组件的依赖-->
<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-common</artifactId>
	<version>3.1.3</version>
</dependency>
<!--HBase运行时必须的依赖资源-->
<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-auth</artifactId>
	<version>3.1.3</version>
</dependency>
<!--导入Zookeeper的依赖-->
<dependency>
	<groupId>org.apache.zookeeper</groupId>
	<artifactId>zookeeper</artifactId>
	<version>3.6.3</version>
</dependency>

三、配置信息

3.1log4j的配置

log4j的配置文件主要实现程序运行的日志信息的输出,以及程序异常信息的输出,方便开发者以及运维人员进行程序状态的动态感知。

#指定log4j的输出信息
log4j.rootLogger=INFO, stdout, logfile
#指定log4j的标准输出
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
#指定log4j的标准输出的样式
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
#指定标准输出的转换的格式
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
#指定日志文件的输出
log4j.appender.logfile=org.apache.log4j.FileAppender
#指定log4j的输出路径文件名
log4j.appender.logfile.File=log/hd.log
#指定日志日志输出样式
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
#指定日志文件的转换格式
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

3.2连接配置

#java连接HBase的配置
#定义连接端口号
hbase.zookeeper.property.clientPort=2181
#定义Zookeeper选举队列
hbase.zookeeper.quorum=10.68.128.215,10.68.128.211,10.68.128.212
#定义HBase的主节点
hbase.master=hdfs://10.68.128.215:60000
#定义hdfs的路径
hbase.root.dir=hdfs://10.68.128.215:9000/hbase


#java连接单节点配置
#访问HBase需要通过Zookeeper进行访问
hbase.zk=single01:2181

四、Configuration

Configuration主要实现配置信息的读取,并将读取的方法进行相应的封装。

public class Configuration {
    //初始化log4j的句柄
    private static Logger logger = Logger.getLogger(Configuration.class);
    //定义字符串和整型查询为空的返回值
    private static final String STR_EMPTY = "NULL";
    private static final String INT_EMPTY = "0";
    //初始化Properties读取的方法
    private static Properties config;
	//静态代码块:主要实现将配置文件读入本文件中
    static {
        // 初始化方法
        config = new Properties();
        //获取资源文件路径
        URL datasource = Thread.currentThread().getContextClassLoader().getResource("datasource.properties");
        //尝试进行数据的加载
        try {
            config.load(new FileReader(datasource.getPath()));
        } catch (IOException e) {
            //当出现异常时,输出异常并将程序退出
            logger.error(e);
            System.exit(-1);
        }
    }
	//获取配置文件的配置项字符串数据
    protected static String getString(String key){
        return config.getProperty(key, STR_EMPTY);
    }
	//获取配置文件的配置项整型数据
    protected static int getInt(String key){
        return Integer.parseInt(config.getProperty(key,INT_EMPTY));
    }
}

五、Common

Common接口主要实现自动关闭的方法,实现了时间计算的方法

public interface Common {
    //定义将微秒转化为秒的基数
    float MILLI_SEC = 1000.0f;
    //由于接口只进行数据的封装,如果需要在接口里实现方法,就需要在方法前增加default关键字,这是jdk1.8的新特性
    //下面方法主要实现接口的自动关闭的方法
    default void close(AutoCloseable...closes){
        //增强for进行遍历
        for (AutoCloseable close : closes) {
            //如果对象不为空
            if (null!=close) {
                //尝试进行关闭
                try {
                    close.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
	//定义花费的时间
    default String timeCost(long begin){
        return "time consumption : "+(System.currentTimeMillis()-begin)/MILLI_SEC;
    }
}

六、Java操作HBase集群

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

public class Query extends Configuration implements Common{
	//初始化log4j的检测对象
    private static Logger logger = Logger.getLogger(Query.class);
	public static Configuration configuration;
	public static Connection connection;
	public static Admin admin;
	/**
	 * 初始化链接
	 */
	public static void init() {
		configuration = HBaseConfiguration.create();
		configuration.set("hbase.zookeeper.property.clientPort",getString( "hbase.zookeeper.property.clientPort"));
		configuration.set("hbase.zookeeper.quorum", getString( "hbase.zookeeper.quorum"));
		configuration.set("hbase.master", getString("hbase.master"));
		configuration.set("hbase.root.dir", getString("hbase.root.dir"));

		try {
			connection = ConnectionFactory.createConnection(configuration);
			admin = connection.getAdmin();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	/**
	 * 关闭连接
	 */
	public static void close() {
		try {
			if (null != admin) {
				admin.close();
			}
			if (null != connection) {
				connection.close();
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	/**
	 * 创建表
	 * @param tableName 表名
	 * @param family 列族列表
	 * @throws IOException
	 */
	public static void createTable(String tableName, String[] cols) throws IOException {
		init();
		TableName tName = TableName.valueOf(tableName);
		if (admin.tableExists(tName)) {
			println(tableName + " exists.");
		} else {
			HTableDescriptor hTableDesc = new HTableDescriptor(tName);
			for (String col : cols) {
				HColumnDescriptor hColumnDesc = new HColumnDescriptor(col);
				hTableDesc.addFamily(hColumnDesc);
			}
			admin.createTable(hTableDesc);
		}

		close(admin,connection);
	}

	/**
	 * 删除表
	 * @param tableName 表名称
	 * @throws IOException
	 */
	public static void deleteTable(String tableName) throws IOException {
		init();
		TableName tName = TableName.valueOf(tableName);
		if (admin.tableExists(tName)) {
			admin.disableTable(tName);
			admin.deleteTable(tName);
		} else {
			println(tableName + " not exists.");
		}
		close(admin,connection);
	}

	/**
	 * 查看已有表
	 * @throws IOException
	 */
	public static void listTables() {
		init();
		HTableDescriptor hTableDescriptors[] = null;
		try {
			hTableDescriptors = admin.listTables();
		} catch (IOException e) {
			e.printStackTrace();
		}
		for (HTableDescriptor hTableDescriptor : hTableDescriptors) {
			println(hTableDescriptor.getNameAsString());
		}
		close(admin,connection);
	}

	/**
	 * 插入单行
	 * 
	 * @param tableName 表名称
	 * @param rowKey RowKey
	 * @param colFamily 列族
	 * @param col 列
	 * @param value 值
	 * @throws IOException
	 */
	public static void insert(String tableName, String rowKey, String colFamily, String col, String value) throws IOException {
		init();
		Table table = connection.getTable(TableName.valueOf(tableName));
		Put put = new Put(Bytes.toBytes(rowKey));
		put.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(col), Bytes.toBytes(value));
		table.put(put);

		/*
		 * 批量插入 List<Put> putList = new ArrayList<Put>(); puts.add(put); table.put(putList);
		 */

		table.close();
		close(admin,connection);
	}

	public static void delete(String tableName, String rowKey, String colFamily, String col) throws IOException {
		init();

		if (!admin.tableExists(TableName.valueOf(tableName))) {
			println(tableName + " not exists.");
		} else {
			Table table = connection.getTable(TableName.valueOf(tableName));
			Delete del = new Delete(Bytes.toBytes(rowKey));
			if (colFamily != null) {
				del.addFamily(Bytes.toBytes(colFamily));
			}
			if (colFamily != null && col != null) {
				del.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(col));
			}
			/*
			 * 批量删除 List<Delete> deleteList = new ArrayList<Delete>(); deleteList.add(delete); table.delete(deleteList);
			 */
			table.delete(del);
			table.close();
		}
		close(admin,connection);
	}

	/**
	 * 根据RowKey获取数据
	 * @param tableName 表名称
	 * @param rowKey RowKey名称
	 * @param colFamily 列族名称
	 * @param col 列名称
	 * @throws IOException
	 */
	public static void getData(String tableName, String rowKey, String colFamily, String col) throws IOException {
		init();
		Table table = connection.getTable(TableName.valueOf(tableName));
		Get get = new Get(Bytes.toBytes(rowKey));
		if (colFamily != null) {
			get.addFamily(Bytes.toBytes(colFamily));
		}
		if (colFamily != null && col != null) {
			get.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(col));
		}
		Result result = table.get(get);
		showCell(result);
		table.close();
		close(admin,connection);
	}

	/**
	 * 根据RowKey获取信息
	 * @param tableName
	 * @param rowKey
	 * @throws IOException
	 */
	public static void getData(String tableName, String rowKey) throws IOException {
		getData(tableName, rowKey, null, null);
	}

	/**
	 * 格式化输出
	 * @param result
	 */
	public static void showCell(Result result) {
		Cell[] cells = result.rawCells();
		for (Cell cell : cells) {
			println("RowName: " + new String(CellUtil.cloneRow(cell)) + " ");
			println("Timetamp: " + cell.getTimestamp() + " ");
			println("column Family: " + new String(CellUtil.cloneFamily(cell)) + " ");
			println("row Name: " + new String(CellUtil.cloneQualifier(cell)) + " ");
			println("value: " + new String(CellUtil.cloneValue(cell)) + " ");
		}
	}

	/**
	 * 打印
	 * @param obj 打印对象
	 */
	private static void println(Object obj) {
		System.out.println(obj);
	}
}
Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐