springboot整合hbase
第一步:POM文件<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency>
·
第一步:POM文件
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-shaded-client -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-shaded-client</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
第二步:yml配置
server:
port: 8080
#hbase配置
hbase:
config:
hbase:
zookeeper:
property:
clientPort: 2181
quorum: 172.18.99.***,172.18.99.***,172.18.99.***
第三步:新建读取配置类
@ConfigurationProperties(prefix = "hbase")
public class HBaseProperties {
private Map<String, String> config;
public Map<String, String> getConfig() {
return config;
}
public void setConfig(Map<String, String> config) {
this.config = config;
}
}
第四步:新建hbase配置类
@Configuration
@EnableConfigurationProperties(HBaseProperties.class)
public class HBaseConfig {
private final HBaseProperties properties;
public HBaseConfig(HBaseProperties properties) {
this.properties = properties;
}
public org.apache.hadoop.conf.Configuration configuration() {
org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create();
Map<String, String> config = properties.getConfig();
Set<String> keySet = config.keySet();
for (String key : keySet) {
configuration.set(key, config.get(key));
}
return configuration;
}
}
第五步:构建HbaseClient操作API
@Component
@Slf4j
public class HBaseClient {
@Autowired
private HBaseConfig config;
private static Connection connection = null;
private static Admin admin = null;
@PostConstruct
private void init() {
if (connection != null) {
return;
}
try {
connection = ConnectionFactory.createConnection(config.configuration());
admin = connection.getAdmin();
} catch (IOException e) {
log.error("HBase create connection failed: {}", e);
}
}
/**
* create 'tableName','[Column Family 1]','[Column Family 2]'
* @param tableName
* @param columnFamilies 列族名
* @throws IOException
*/
public void createTable(String tableName, String... columnFamilies) throws IOException {
TableName name = TableName.valueOf(tableName);
boolean isExists = this.tableExists(tableName);
if (isExists) {
throw new TableExistsException(tableName + "is exists!");
}
TableDescriptorBuilder descriptorBuilder = TableDescriptorBuilder.newBuilder(name);
List<ColumnFamilyDescriptor> columnFamilyList = new ArrayList<>();
for (String columnFamily : columnFamilies) {
ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder
.newBuilder(columnFamily.getBytes()).build();
columnFamilyList.add(columnFamilyDescriptor);
}
descriptorBuilder.setColumnFamilies(columnFamilyList);
TableDescriptor tableDescriptor = descriptorBuilder.build();
admin.createTable(tableDescriptor);
}
/**
* put <tableName>,<rowKey>,<family:column>,<value>,<timestamp>
* @param tableName
* @param rowKey
* @param columnFamily
* @param column
* @param value
* @throws IOException
*/
public void insertOrUpdate(String tableName, String rowKey, String columnFamily, String column, String value)
throws IOException {
this.insertOrUpdate(tableName, rowKey, columnFamily, new String[]{column}, new String[]{value});
}
/**
* put <tableName>,<rowKey>,<family:column>,<value>,<timestamp>
* @param tableName
* @param rowKey
* @param columnFamily
* @param columns
* @param values
* @throws IOException
*/
public void insertOrUpdate(String tableName, String rowKey, String columnFamily, String[] columns, String[] values)
throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
Put put = new Put(Bytes.toBytes(rowKey));
for (int i = 0; i < columns.length; i++) {
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columns[i]), Bytes.toBytes(values[i]));
table.put(put);
}
}
/**
* @param tableName
* @param rowKey
* @throws IOException
*/
public void deleteRow(String tableName, String rowKey) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
Delete delete = new Delete(rowKey.getBytes());
table.delete(delete);
}
/**
* @param tableName
* @param rowKey
* @param columnFamily
* @throws IOException
*/
public void deleteColumnFamily(String tableName, String rowKey, String columnFamily) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
Delete delete = new Delete(rowKey.getBytes());
delete.addFamily(Bytes.toBytes(columnFamily));
table.delete(delete);
}
/**
* delete 'tableName','rowKey','columnFamily:column'
* @param tableName
* @param rowKey
* @param columnFamily
* @param column
* @throws IOException
*/
public void deleteColumn(String tableName, String rowKey, String columnFamily, String column) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
Delete delete = new Delete(rowKey.getBytes());
delete.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
table.delete(delete);
}
/**
* disable 'tableName' 之后 drop 'tableName'
* @param tableName
* @throws IOException
*/
public void deleteTable(String tableName) throws IOException {
boolean isExists = this.tableExists(tableName);
if (!isExists) {
return;
}
TableName name = TableName.valueOf(tableName);
admin.disableTable(name);
admin.deleteTable(name);
}
/**
* get 'tableName','rowkey','family:column'
* @param tableName
* @param rowkey
* @param family
* @param column
* @return
*/
public String getValue(String tableName, String rowkey, String family, String column) {
Table table = null;
String value = "";
if (StringUtils.isBlank(tableName) || StringUtils.isBlank(family) || StringUtils.isBlank(rowkey) || StringUtils
.isBlank(column)) {
return null;
}
try {
table = connection.getTable(TableName.valueOf(tableName));
Get g = new Get(rowkey.getBytes());
g.addColumn(family.getBytes(), column.getBytes());
Result result = table.get(g);
List<Cell> ceList = result.listCells();
if (ceList != null && ceList.size() > 0) {
for (Cell cell : ceList) {
value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
table.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return value;
}
/**
* get 'tableName','rowKey'
* @param tableName
* @param rowKey
* @return
* @throws IOException
*/
public String selectOneRow(String tableName, String rowKey) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
Get get = new Get(rowKey.getBytes());
Result result = table.get(get);
NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map = result.getMap();
for (Cell cell : result.rawCells()) {
String row = Bytes.toString(cell.getRowArray());
String columnFamily = Bytes.toString(cell.getFamilyArray());
String column = Bytes.toString(cell.getQualifierArray());
String value = Bytes.toString(cell.getValueArray());
// 可以通过反射封装成对象(列名和Java属性保持一致)
System.out.println(row);
System.out.println(columnFamily);
System.out.println(column);
System.out.println(value);
}
return null;
}
/**
* scan 't1',{FILTER=>"PrefixFilter('2015')"}
* @param tableName
* @param rowKeyFilter
* @return
* @throws IOException
*/
public String scanTable(String tableName, String rowKeyFilter) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
if (!StringUtils.isEmpty(rowKeyFilter)) {
RowFilter rowFilter = new RowFilter(CompareOperator.EQUAL, new SubstringComparator(rowKeyFilter));
scan.setFilter(rowFilter);
}
ResultScanner scanner = table.getScanner(scan);
try {
for (Result result : scanner) {
System.out.println(Bytes.toString(result.getRow()));
for (Cell cell : result.rawCells()) {
System.out.println(cell);
}
}
} finally {
if (scanner != null) {
scanner.close();
}
}
return null;
}
/**
* 判断表是否已经存在,这里使用间接的方式来实现
*
* admin.tableExists() 会报NoSuchColumnFamilyException, 有人说是hbase-client版本问题
* @param tableName
* @return
* @throws IOException
*/
public boolean tableExists(String tableName) throws IOException {
TableName[] tableNames = admin.listTableNames();
if (tableNames != null && tableNames.length > 0) {
for (int i = 0; i < tableNames.length; i++) {
if (tableName.equals(tableNames[i].getNameAsString())) {
return true;
}
}
}
return false;
}
}
第六步:单元测试
@SpringBootTest(classes = HbaseDemoApplication.class)
@RunWith(SpringJUnit4ClassRunner.class)
public class HBaseClientTest {
private final static String TABLE = "quick-hbase-table";
private final static String TABLE_FAM_1 = "quick";
private final static String TABLE_FAM_2 = "hbase";
@Autowired
private HBaseClient hBaseClient;
@Test
public void createTable() throws IOException {
hBaseClient.createTable(TABLE, TABLE_FAM_1, TABLE_FAM_2);
}
/**
* 向TABLE中插入一条记录,列族quick下,写了两个key,speed和感觉feel,列族hbase下插入三个key 动作action、时间time和用户user,类似一个日志
*/
@Test
public void insertOrUpdate() throws IOException {
hBaseClient.insertOrUpdate(TABLE, "1", TABLE_FAM_1, "speed", "1km/h");
hBaseClient.insertOrUpdate(TABLE, "1", TABLE_FAM_1, "feel", "better");
hBaseClient.insertOrUpdate(TABLE, "1", TABLE_FAM_2, "action", "create table");
hBaseClient.insertOrUpdate(TABLE, "1", TABLE_FAM_2, "time", "2019年07月20日17:52:53");
hBaseClient.insertOrUpdate(TABLE, "1", TABLE_FAM_2, "user", "admin");
/**
* shell 结果
* hbase(main):007:0> scan 'quick-hbase-table'
* ROW COLUMN+CELL
* 1 column=hbase:action, timestamp=1563616496366, value=create table
* 1 column=hbase:time, timestamp=1563616496379, value=2019\xE5\xB9\xB407\xE6\x9C\x8820\xE6\x97\xA517:52:53
* 1 column=hbase:user, timestamp=1563616496384, value=admin
* 1 column=quick:feel, timestamp=1563616496362, value=better
* 1 column=quick:speed, timestamp=1563616496353, value=1km/h
* 1 row(s)
*/
hBaseClient.insertOrUpdate(TABLE, "2", TABLE_FAM_2, "user", "admin");
}
@Test
public void deleteRow() throws IOException {
hBaseClient.deleteRow(TABLE, "2");
}
@Test
public void deleteColumnFamily() throws IOException {
hBaseClient.deleteColumnFamily(TABLE, "1", TABLE_FAM_2);
}
@Test
public void deleteColumn() throws IOException {
hBaseClient.deleteColumn(TABLE, "1", TABLE_FAM_2, "action");
}
@Test
public void deleteTable() throws IOException {
hBaseClient.deleteTable(TABLE);
}
@Test
public void getValue() {
String result = hBaseClient.getValue(TABLE, "1", TABLE_FAM_2, "time");
System.out.println(result);
}
@Test
public void selectOneRow() throws IOException {
hBaseClient.selectOneRow(TABLE, "1");
}
@Test
public void scanTable() throws IOException {
hBaseClient.scanTable(TABLE, "{FILTER=>\"PrefixFilter('2019')\"");
}
@Test
public void tableExists() throws IOException {
System.out.println(hBaseClient.tableExists(TABLE));
}
}
觉得有用,就点个赞吧!
更多推荐
已为社区贡献1条内容
所有评论(0)