Java客户端连接Hbase
介绍使用java客户端连接Hbase进行各种CRUD操作。
·
使用
下面介绍了Java客户端连接Hbase的使用,1.2.0版本和2.3.4版本亲测下面使用的API都是一致的。(也猜测1.x版本
和2.x版本的应该基本一致)。Java连接Hbase也很简单,Maven中导入对应的 `hbase-client` 依赖即可。
1.pom配置
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.0</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>apache release</id>
<url>https://repository.apache.org/content/repositories/releases/</url>
</repository>
</repositories>
语法
0)初始化
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public static void main(String[] args) {
Configuration config = null;
Connection conn = null;
Table table = null;
// 创建配置
config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", "192.168.100.98");
config.set("hbase.zookeeper.property.clientPort", "2181");
try {
// 创建连接
conn = ConnectionFactory.createConnection(config);
// 获取表
table = conn.getTable(TableName.valueOf("FEI:WEN"));
// 查询指定表的全部数据
// queryAllTableData(table);
// 查询指定rowkey的数据
queryRowKey(table);
// 略。。。
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (conn != null) {
conn.close();
}
} catch (IOException e) {
e.printStackTrace();
}
try {
if (table != null) {
table.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 输出
* @param result
* @throws IOException
*/
private static void output(Result result) throws IOException {
CellScanner cellScanner = result.cellScanner();
while (cellScanner.advance()) {
Cell cell = cellScanner.current();
byte[] rowArray = cell.getRowArray(); //本kv所属的行键的字节数组
byte[] familyArray = cell.getFamilyArray(); //列族名的字节数组
byte[] qualifierArray = cell.getQualifierArray(); //列名的字节数据
byte[] valueArray = cell.getValueArray(); // value的字节数组
System.out.printf("|%10s|%10s|%10s|%10s|\n",
new String(rowArray, cell.getRowOffset(), cell.getRowLength()),
new String(familyArray, cell.getFamilyOffset(), cell.getFamilyLength()),
new String(qualifierArray, cell.getQualifierOffset(), cell.getQualifierLength()),
new String(valueArray, cell.getValueOffset(), cell.getValueLength()));
}
}
1)创建命名空间
/**
* 创建命名空间
* @param conn
* @throws IOException
*/
private static void createNamespace(Connection conn) throws IOException {
Admin admin = conn.getAdmin();
NamespaceDescriptor.Builder builder = NamespaceDescriptor.create("FEI");
NamespaceDescriptor build = builder.build();
admin.createNamespace(build);
admin.close();
}
2)查看命名空间
查看指定命名空间
/**
* 查看指定命名空间
* @param conn
*/
private static void queryOneNamespace(Connection conn) throws IOException {
Admin admin = conn.getAdmin();
NamespaceDescriptor test = admin.getNamespaceDescriptor("FEI");
System.out.println(test);
Map<String, String> configuration = test.getConfiguration();
System.out.println(configuration);
String name = test.getName();
System.out.println(name);
admin.close();
}
输出:
{NAME => 'FEI'}
{}
FEI
查看全部命名空间
/**
* 查看全部命名空间
* @param conn
*/
private static void queryAllNamespace(Connection conn) throws IOException {
Admin admin = conn.getAdmin();
NamespaceDescriptor[] namespaceDescriptors = admin.listNamespaceDescriptors();
for (NamespaceDescriptor namespaceDescriptor : namespaceDescriptors) {
System.out.println(namespaceDescriptor);
}
admin.close();
}
输出:
{NAME => 'FEI'}
{NAME => 'SYSTEM'}
{NAME => 'TEST'}
{NAME => 'default'}
{NAME => 'hbase'}
3)查看指定命名空间下表
/**
* 查看指定命名空间下的表
* @param conn
*/
private static void queryTableByNamespace(Connection conn) throws IOException {
Admin admin = conn.getAdmin();
HTableDescriptor[] feis = admin.listTableDescriptorsByNamespace("FEI");
for (HTableDescriptor fei : feis) {
System.out.println(fei.getNameAsString());
}
admin.close();
}
输出:
FEI:EMP
FEI:IMAGES
FEI:STUDENTS
FEI:TWO
FEI:WEN
4)查看所有表
/**
* 查看所有表
* @param conn
*/
private static void queryAllTable(Connection conn) throws IOException {
Admin admin = conn.getAdmin();
TableName[] tableNames = admin.listTableNames();
for (TableName tableName : tableNames) {
System.out.println(tableName);
}
admin.close();
}
输出:
FEI:EMP
FEI:IMAGES
FEI:STUDENTS
FEI:TWO
FEI:WEN
SYSTEM:CATALOG
SYSTEM:FUNCTION
SYSTEM:LOG
SYSTEM:MUTEX
SYSTEM:SEQUENCE
SYSTEM:STATS
US_POPULATION
user
5)查看指定表
/**
* 查看指定表元数据
* @param conn
*/
private static void queryOneTableMetadata(Connection conn) throws IOException {
Admin admin = conn.getAdmin();
HTableDescriptor fei_wen = admin.getTableDescriptor(TableName.valueOf("FEI:WEN"));
System.out.println(fei_wen);
String name = fei_wen.getNameAsString();
System.out.println("\n命名空间+表名: " + name);
HColumnDescriptor[] columnFamilies = fei_wen.getColumnFamilies();
for (HColumnDescriptor columnFamily : columnFamilies) {
System.out.println("\n列族: " + columnFamily);
}
Map<String, String> configuration = fei_wen.getConfiguration();
System.out.println("\n表配置: " + configuration);
TableName tableName = fei_wen.getTableName();
System.out.println("\n命名空间+表名: " + tableName.getNameAsString());
System.out.println("\n命名空间: " + tableName.getNamespaceAsString());
System.out.println("\n表名: " + tableName.getQualifierAsString());
admin.close();
}
输出:
'FEI:WEN', {NAME => 'co', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}
命名空间+表名: FEI:WEN
列族: {NAME => 'co', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}
表配置: {}
命名空间+表名: FEI:WEN
命名空间: FEI
表名: WEN
6)创建表
/**
* 创建表
*
* 可以用多个HColumnDescriptor来定义多个列族,然后通过hTableDescriptor.addFamily添加,但是目前只建议一个表创建一个列族,
* 防止对性能有影响
* @param conn
*/
private static void createTable(Connection conn) {
Admin admin = null;
try {
// 获取表管理器对象
admin = conn.getAdmin();
// 创建一个表描述对象
HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf("FEI:CLASS"));
// 创建一个列族描述对象
HColumnDescriptor base_info = new HColumnDescriptor("base_info");
// 通过列族描述定义对象,可以设置列族的很多重要属性信息
base_info.setMaxVersions(3); // 设置该列族中存储数据的最大版本数,默认是1
hTableDescriptor.addFamily(base_info);
admin.createTable(hTableDescriptor);
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
if (admin != null) {
admin.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
7)修改表
新增列族
public static void updateTable(Connection conn) {
Admin admin = null;
try {
admin = conn.getAdmin();
HTableDescriptor fei_class = admin.getTableDescriptor(TableName.valueOf("FEI:CLASS"));
// 增加列族
HColumnDescriptor newFamily = new HColumnDescriptor("test_info".getBytes());
newFamily.setBloomFilterType(BloomType.ROWCOL); // 设置布隆过滤器的类型
fei_class.addFamily(newFamily);
admin.modifyTable(TableName.valueOf("FEI:CLASS"),fei_class);
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
if (admin != null) {
admin.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
删除列族
public static void updateTable(Connection conn) {
Admin admin = null;
try {
admin = conn.getAdmin();
HTableDescriptor fei_class = admin.getTableDescriptor(TableName.valueOf("FEI:CLASS"));
// 删除列族
fei_class.removeFamily("test_info".getBytes());
admin.modifyTable(TableName.valueOf("FEI:CLASS"),fei_class);
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
if (admin != null) {
admin.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
8)删除表
/**
* 删除表
*
* 想删除表,必须先关闭表
* @param conn
*/
public static void deleteTable(Connection conn) {
Admin admin = null;
try {
admin = conn.getAdmin();
// 关闭表
admin.disableTable(TableName.valueOf("FEI:CLASS"));
// 删除表
admin.deleteTable(TableName.valueOf("FEI:CLASS"));
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
if (admin != null) {
admin.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
9)查询表全部数据
/**
* 查询指定表的全部数据
*/
private static void queryAllTableData(Table table) {
try {
ResultScanner scanner = table.getScanner(new Scan());
System.out.printf("|%10s|%10s|%10s|%10s|\n", "row key", "family", "qualifier", "value");
for (Result result : scanner) {
output(result);
}
} catch (IOException e) {
e.printStackTrace();
}
}
输出:
| row key| family| qualifier| value|
| 1| co| age| 28|
| 1| co| id| 1|
| 1| co| name| jack|
| 2| co| age| 33|
| 2| co| id| 2|
| 2| co| name| roes|
9)查询指定rowkey的数据
/**
* 查询指定rowkey的数据
*/
public static void queryRowKey(Table table) {
try {
// get对象指定行键
Get get = new Get("1".getBytes(StandardCharsets.UTF_8));
Result result = table.get(get);
System.out.printf("|%10s|%10s|%10s|%10s|\n", "row key", "family", "qualifier", "value");
output(result);
} catch (IOException e) {
e.printStackTrace();
}
}
输出:
| row key| family| qualifier| value|
| 1| co| age| 28|
| 1| co| id| 1|
| 1| co| name| jack|
10)根据rowkey查询指定列
/**
* 根据rowkey查询指定列
*/
public static void queryValueByRowKey(Table table) {
try {
// get对象指定行键
Get get = new Get("1".getBytes(StandardCharsets.UTF_8));
Result result = table.get(get);
byte[] value = result.getValue("co".getBytes(), "name".getBytes());
System.out.println(new String(value));
} catch (IOException e) {
e.printStackTrace();
}
}
输出:
jack
11)插入数据
插入单条数据
/**
* 插入单条数据
* @param table
*/
private static void insertOneData(Table table) {
Put put = new Put("3".getBytes());
put.addColumn("co".getBytes(), "class".getBytes(), "一班".getBytes());
try {
table.put(put);
} catch (IOException e) {
e.printStackTrace();
}
}
批量插入数据
/**
* 批量插入数据
* @param table
*/
private static void insertBatchData(Table table) {
List<Put> puts = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Put put = new Put((i+"").getBytes());
put.addColumn("co".getBytes(), "id".getBytes(), ((i + 3) + "").getBytes());
put.addColumn("co".getBytes(), "name".getBytes(), ("张"+ i).getBytes());
put.addColumn("co".getBytes(), "age".getBytes(), (i + "").getBytes());
puts.add(put);
}
try {
table.put(puts);
} catch (IOException e) {
e.printStackTrace();
}
}
12)修改数据
通过Put覆盖,格式和插入一样
/**
* 修改数据
* @param table
*/
private static void updateData(Table table) throws IOException {
Put put1 = new Put("9".getBytes());
put1.addColumn("co".getBytes(), "name".getBytes(), "刘胡兰".getBytes());
Put put2 = new Put("8".getBytes());
put2.addColumn("co".getBytes(), "name".getBytes(), "王伟".getBytes());
Put put3 = new Put("7".getBytes());
put3.addColumn("co".getBytes(), "name".getBytes(), "金素荣".getBytes());
Put put4 = new Put("6".getBytes());
put4.addColumn("co".getBytes(), "name".getBytes(), "小日本".getBytes());
List<Put> puts = new ArrayList<>();
puts.add(put1);
puts.add(put2);
puts.add(put3);
puts.add(put4);
table.put(puts);
}
13)删除数据
/**
* 删除数据
* @param table
*/
private static void deleteData(Table table) throws IOException {
Delete d1 = new Delete("1".getBytes());
Delete d2 = new Delete("2".getBytes());
Delete d3 = new Delete("3".getBytes());
Delete d4 = new Delete("4".getBytes());
List<Delete> deletes = new ArrayList<>();
deletes.add(d1);
deletes.add(d2);
deletes.add(d3);
deletes.add(d4);
table.delete(deletes);
}
14)图片转为Base64,存入Hbase
/**
* 图片转为Base64,存入Hbase
*/
private static void imageToBase64(Table table) throws IOException {
String imageFile = "C:\\Users\\fei\\Desktop\\superset处理500页面.png";
InputStream in = null;
byte[] data = null;
String encode = null; // 返回Base64编码过的字节数组字符串
BASE64Encoder encoder = new BASE64Encoder();
try {
in = new FileInputStream(imageFile);
data = new byte[in.available()];
in.read(data);
encode = encoder.encode(data);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (in != null) {
in.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
// 将字符串存入Hbase
if (encode != null) {
Put put = new Put("1".getBytes());
put.addColumn("image".getBytes(), "test".getBytes(), encode.getBytes());
table.put(put);
}
}
15)从Hbase获取base64,转换为图表
/**
* 从Hbase获取base64,转换为图表
*/
private static void base64ToImage(Table table) throws IOException {
// 从Hbase获取base64
Get get = new Get("1".getBytes(StandardCharsets.UTF_8));
Result result = table.get(get);
byte[] value = result.getValue("image".getBytes(), "test".getBytes());
// 将字节数组字符串转换为图片
String encode = new String(value);
OutputStream out = null;
String outFile = "D:\\test.png";
BASE64Decoder decoder = new BASE64Decoder();
try {
out = new FileOutputStream(outFile);
byte[] b = decoder.decodeBuffer(encode);
for (int i = 0; i < b.length; i++) {
if (b[i] < 0) {
b[i] += 256;
}
}
out.write(b);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (out != null) {
out.flush();
out.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)