java api 操作 hbase,插入数据(单行,多行,批量插入)
java api 操作 hbase (主要批量存放数据)操作步骤如下:#创建配置对象,创建连接对象Configuration config = HBaseConfiguration.create();config.set("hbase.zookeeper.quorum","single01:2181 ");Connection hbaseCon=ConnectionFactory.createCo
java api 操作 hbase (主要批量存放数据) 操作步骤如下: #创建配置对象,创建连接对象 Configuration config = HBaseConfiguration.create(); config.set("hbase.zookeeper.quorum","single01:2181 "); Connection hbaseCon=ConnectionFactory.createConnection(config); //操作对象 Admin admin=hbaseCon.getAdmin(); admin.xxx(tableName) //创建表名对象 final String HTable="kb16nb:student"; TableName tableName = TableName.valueOf(HTable); //操作数据 Table table=hbaseCon.getTable(tableName); //单行添加数据 Put row = new Put(byte[] rowkey); row.addColumn(byte[] columnfamily,byte[] column,byte[] value) ... table.put(row); //多行(少量) List<Put> rows =new ArrayList(); rows.add(row); ... table.put(rows); //批处理 //lambada 创建hbase 批量插入数据异常侦听对象 //java中函数式接口,可以用lambda表达式写 BufferedMutator.ExceptionListener listener=(e,mutator)->{ //异常信息(原因) String msg = e.getMessage(); //出异常的行数 int numExceptions = e.getNumExceptions(); //记录出异常的行的行键,以便事后检查并再处理 //用Log4j记录 logger.error("HBASE MUTATE EXCEPTION : "+msg+","+numExceptions); if(numExceptions>0){ StringBuilder builder=new StringBuilder(); builder.append(Bytes.toString(e.getRow(0).getRow())); final String SEP=","; for (int i = 0; i <numExceptions ; i++) { builder.append(SEP); builder.append(Bytes.toString(e.getRow(i).getRow())); } logger.error(builder.toString()); } }; final int BUFFER_SIZE=8*1024; BufferedMutatorParams bmp=new BufferedMutatorParams(tableName) .listener(listener).writeBufferSize(BUFFER_SIZE); BufferedMutator mutator=hbaseCon.getBufferedMutator(bmp); //创建List List<Put> list=new ArrayList<>(BUFFER_SIZE); ... //放入数据 mutator.mutate(list);
log4j日志
#1导入jar包
#2做配置信息 resources/log4j.properties
log4j.rootLogger=INFO, stdout, logfile
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=log/hd.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
#3创建对象
private static final Logger logger=Logger.getLogger(App.class);
代码如下:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
/**
* java api 操作 hbase,插入数据
*
*/
public class App
{
private static final Logger logger=Logger.getLogger(App.class);
static void close(AutoCloseable...closes){
for (AutoCloseable close : closes) {
if(null!=close){
try {
close.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
public static void main( String[] args ) {
Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum","single01:2181 ");
//base连接对象
Connection hbaseCon=null;
//admin操作对象(命名空间,数据表...)
Admin admin =null;
//table操作表数据
Table table =null;
try {
hbaseCon = ConnectionFactory.createConnection(config);
admin=hbaseCon.getAdmin();
final String HTable="kb16nb:student";
TableName tableName = TableName.valueOf(HTable);
if (admin.tableExists(tableName)) {
table=hbaseCon.getTable(tableName);
//lambda 创建hbase 批量插入数据的异常侦听对象
//java中一个接口只有一个方法,可以用lambda表达式写
BufferedMutator.ExceptionListener listener=(e,mutator)->{
//异常信息(原因)
String msg = e.getMessage();
//出异常的行数
int numExceptions = e.getNumExceptions();
//用Log4j记录,记录出异常的行的行键,以便事后检查并再处理
logger.error("HBASE MUTATE EXCEPTION : "+msg+","+numExceptions);
if(numExceptions>0){
StringBuilder builder=new StringBuilder();
builder.append(Bytes.toString(e.getRow(0).getRow()));
final String SEP=",";
for (int i = 1; i <numExceptions ; i++) {
builder.append(SEP);
builder.append(Bytes.toString(e.getRow(i).getRow()));
}
logger.error(builder.toString());
}
};
final int BUFFER_SIZE=8*1024;
BufferedMutatorParams bmp=new BufferedMutatorParams(tableName)
.listener(listener).writeBufferSize(BUFFER_SIZE);
BufferedMutator mutator=hbaseCon.getBufferedMutator(bmp);
//创建List,放入数据
List<Put> list=new ArrayList<>(BUFFER_SIZE);
Random rand=new Random();
int sum = 0;
for (int i = 0; i < 100000; i++) {
Put put = new Put(Bytes.toBytes(1 + i + ""));
put.addColumn(Bytes.toBytes("base"),Bytes.toBytes("name"),Bytes.toBytes("henry"+i));
put.addColumn(Bytes.toBytes("base"),Bytes.toBytes("age"),Bytes.toBytes(18+rand.nextInt(1)));
int product =rand.nextInt(2);
String columnFamily=product ==0?"bigdata":"cloud";
String subject01=product ==0 ? "hive":"net";
String subject02=product ==0 ? "hbase":"shell";
int socre01=55+rand.nextInt(45),socre02=55+rand.nextInt(45);
put.addColumn(Bytes.toBytes("base"),Bytes.toBytes("product"),Bytes.toBytes(columnFamily));
put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(subject01),Bytes.toBytes(socre01));
put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(subject02),Bytes.toBytes(socre02));
list.add(put);
if(list.size()>=BUFFER_SIZE){
mutator.mutate(list);
sum+=list.size();
System.out.println("CURRENT"+list.size()+",SUM"+sum);
list.clear();
}
}
if (!list.isEmpty()){
mutator.mutate(list);
sum+=list.size();
System.out.println("CURRENT"+list.size()+",SUM"+sum);
list.clear();
}
//单行添加数据
// 通过rowkey创建 Put(ROW行) 对象
//Bytes.toBytes("Object") 把任意对象转变为字节数组
/*Put put = new Put(Bytes.toBytes("1"));
put.addColumn(Bytes.toBytes("base"),Bytes.toBytes("name"),Bytes.toBytes("lisi"));
put.addColumn(Bytes.toBytes("base"),Bytes.toBytes("age"),Bytes.toBytes(18));
put.addColumn(Bytes.toBytes("base"),Bytes.toBytes("product"),Bytes.toBytes("bigdata"));
put.addColumn(Bytes.toBytes("bigdata"),Bytes.toBytes("hive"),Bytes.toBytes(66));
put.addColumn(Bytes.toBytes("bigdata"),Bytes.toBytes("hbasee"),Bytes.toBytes(73));
table.put(put);
System.out.println("SUCCEED");*/
}else{
System.out.println(HTable+" NOT EXISTE");
}
} catch (IOException e) {
e.printStackTrace();
}finally{
close(table,admin,hbaseCon);
}
}
}
更多推荐
所有评论(0)