利用Java的API实现HBase数据库的增删查改
文章目录一、功能描述二、依赖导入三、配置信息3.1log4j的配置3.2连接配置四、Configuration五、Common六、Java操作HBase集群一、功能描述本功能组件主要通过Java的API实现HBase的操作。利用log4j进行数据迁移过程的记录,采取批处理的方式实现数据迁移的过程。技术实现利用Java的API连接HBase数据库利用log4j将执行信息进行输出,并捕获异常二、依赖导
·
一、功能描述
本功能组件主要通过Java的API实现HBase的操作。利用log4j进行数据迁移过程的记录,采取批处理的方式实现数据迁移的过程。
技术实现
- 利用Java的API连接HBase数据库
- 利用log4j将执行信息进行输出,并捕获异常
二、依赖导入
首先,在Maven工程中的pom.xml的<dependencies></dependencies>中添加以下的依赖
<!--导入HBase客户端的依赖-->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.3.5</version>
</dependency>
<!--导入hadoop的公共组件的依赖-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.3</version>
</dependency>
<!--HBase运行时必须的依赖资源-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>3.1.3</version>
</dependency>
<!--导入Zookeeper的依赖-->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.3</version>
</dependency>
三、配置信息
3.1log4j的配置
log4j的配置文件主要实现程序运行的日志信息的输出,以及程序异常信息的输出,方便开发者以及运维人员进行程序状态的动态感知。
#指定log4j的输出信息
log4j.rootLogger=INFO, stdout, logfile
#指定log4j的标准输出
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
#指定log4j的标准输出的样式
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
#指定标准输出的转换的格式
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
#指定日志文件的输出
log4j.appender.logfile=org.apache.log4j.FileAppender
#指定log4j的输出路径文件名
log4j.appender.logfile.File=log/hd.log
#指定日志日志输出样式
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
#指定日志文件的转换格式
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
3.2连接配置
#java连接HBase的配置
#定义连接端口号
hbase.zookeeper.property.clientPort=2181
#定义Zookeeper选举队列
hbase.zookeeper.quorum=10.68.128.215,10.68.128.211,10.68.128.212
#定义HBase的主节点
hbase.master=hdfs://10.68.128.215:60000
#定义hdfs的路径
hbase.root.dir=hdfs://10.68.128.215:9000/hbase
#java连接单节点配置
#访问HBase需要通过Zookeeper进行访问
hbase.zk=single01:2181
四、Configuration
Configuration主要实现配置信息的读取,并将读取的方法进行相应的封装。
public class Configuration {
//初始化log4j的句柄
private static Logger logger = Logger.getLogger(Configuration.class);
//定义字符串和整型查询为空的返回值
private static final String STR_EMPTY = "NULL";
private static final String INT_EMPTY = "0";
//初始化Properties读取的方法
private static Properties config;
//静态代码块:主要实现将配置文件读入本文件中
static {
// 初始化方法
config = new Properties();
//获取资源文件路径
URL datasource = Thread.currentThread().getContextClassLoader().getResource("datasource.properties");
//尝试进行数据的加载
try {
config.load(new FileReader(datasource.getPath()));
} catch (IOException e) {
//当出现异常时,输出异常并将程序退出
logger.error(e);
System.exit(-1);
}
}
//获取配置文件的配置项字符串数据
protected static String getString(String key){
return config.getProperty(key, STR_EMPTY);
}
//获取配置文件的配置项整型数据
protected static int getInt(String key){
return Integer.parseInt(config.getProperty(key,INT_EMPTY));
}
}
五、Common
Common接口主要实现自动关闭的方法,实现了时间计算的方法
public interface Common {
//定义将微秒转化为秒的基数
float MILLI_SEC = 1000.0f;
//由于接口只进行数据的封装,如果需要在接口里实现方法,就需要在方法前增加default关键字,这是jdk1.8的新特性
//下面方法主要实现接口的自动关闭的方法
default void close(AutoCloseable...closes){
//增强for进行遍历
for (AutoCloseable close : closes) {
//如果对象不为空
if (null!=close) {
//尝试进行关闭
try {
close.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
//定义花费的时间
default String timeCost(long begin){
return "time consumption : "+(System.currentTimeMillis()-begin)/MILLI_SEC;
}
}
六、Java操作HBase集群
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
public class Query extends Configuration implements Common{
//初始化log4j的检测对象
private static Logger logger = Logger.getLogger(Query.class);
public static Configuration configuration;
public static Connection connection;
public static Admin admin;
/**
* 初始化链接
*/
public static void init() {
configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.property.clientPort",getString( "hbase.zookeeper.property.clientPort"));
configuration.set("hbase.zookeeper.quorum", getString( "hbase.zookeeper.quorum"));
configuration.set("hbase.master", getString("hbase.master"));
configuration.set("hbase.root.dir", getString("hbase.root.dir"));
try {
connection = ConnectionFactory.createConnection(configuration);
admin = connection.getAdmin();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 关闭连接
*/
public static void close() {
try {
if (null != admin) {
admin.close();
}
if (null != connection) {
connection.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 创建表
* @param tableName 表名
* @param family 列族列表
* @throws IOException
*/
public static void createTable(String tableName, String[] cols) throws IOException {
init();
TableName tName = TableName.valueOf(tableName);
if (admin.tableExists(tName)) {
println(tableName + " exists.");
} else {
HTableDescriptor hTableDesc = new HTableDescriptor(tName);
for (String col : cols) {
HColumnDescriptor hColumnDesc = new HColumnDescriptor(col);
hTableDesc.addFamily(hColumnDesc);
}
admin.createTable(hTableDesc);
}
close(admin,connection);
}
/**
* 删除表
* @param tableName 表名称
* @throws IOException
*/
public static void deleteTable(String tableName) throws IOException {
init();
TableName tName = TableName.valueOf(tableName);
if (admin.tableExists(tName)) {
admin.disableTable(tName);
admin.deleteTable(tName);
} else {
println(tableName + " not exists.");
}
close(admin,connection);
}
/**
* 查看已有表
* @throws IOException
*/
public static void listTables() {
init();
HTableDescriptor hTableDescriptors[] = null;
try {
hTableDescriptors = admin.listTables();
} catch (IOException e) {
e.printStackTrace();
}
for (HTableDescriptor hTableDescriptor : hTableDescriptors) {
println(hTableDescriptor.getNameAsString());
}
close(admin,connection);
}
/**
* 插入单行
*
* @param tableName 表名称
* @param rowKey RowKey
* @param colFamily 列族
* @param col 列
* @param value 值
* @throws IOException
*/
public static void insert(String tableName, String rowKey, String colFamily, String col, String value) throws IOException {
init();
Table table = connection.getTable(TableName.valueOf(tableName));
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(col), Bytes.toBytes(value));
table.put(put);
/*
* 批量插入 List<Put> putList = new ArrayList<Put>(); puts.add(put); table.put(putList);
*/
table.close();
close(admin,connection);
}
public static void delete(String tableName, String rowKey, String colFamily, String col) throws IOException {
init();
if (!admin.tableExists(TableName.valueOf(tableName))) {
println(tableName + " not exists.");
} else {
Table table = connection.getTable(TableName.valueOf(tableName));
Delete del = new Delete(Bytes.toBytes(rowKey));
if (colFamily != null) {
del.addFamily(Bytes.toBytes(colFamily));
}
if (colFamily != null && col != null) {
del.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(col));
}
/*
* 批量删除 List<Delete> deleteList = new ArrayList<Delete>(); deleteList.add(delete); table.delete(deleteList);
*/
table.delete(del);
table.close();
}
close(admin,connection);
}
/**
* 根据RowKey获取数据
* @param tableName 表名称
* @param rowKey RowKey名称
* @param colFamily 列族名称
* @param col 列名称
* @throws IOException
*/
public static void getData(String tableName, String rowKey, String colFamily, String col) throws IOException {
init();
Table table = connection.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowKey));
if (colFamily != null) {
get.addFamily(Bytes.toBytes(colFamily));
}
if (colFamily != null && col != null) {
get.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(col));
}
Result result = table.get(get);
showCell(result);
table.close();
close(admin,connection);
}
/**
* 根据RowKey获取信息
* @param tableName
* @param rowKey
* @throws IOException
*/
public static void getData(String tableName, String rowKey) throws IOException {
getData(tableName, rowKey, null, null);
}
/**
* 格式化输出
* @param result
*/
public static void showCell(Result result) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
println("RowName: " + new String(CellUtil.cloneRow(cell)) + " ");
println("Timetamp: " + cell.getTimestamp() + " ");
println("column Family: " + new String(CellUtil.cloneFamily(cell)) + " ");
println("row Name: " + new String(CellUtil.cloneQualifier(cell)) + " ");
println("value: " + new String(CellUtil.cloneValue(cell)) + " ");
}
}
/**
* 打印
* @param obj 打印对象
*/
private static void println(Object obj) {
System.out.println(obj);
}
}
更多推荐
已为社区贡献5条内容
所有评论(0)