java api 操作 hbase (主要批量存放数据)
操作步骤如下:
#创建配置对象,创建连接对象
Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum","single01:2181 ");
Connection hbaseCon=ConnectionFactory.createConnection(config);
//操作对象
Admin admin=hbaseCon.getAdmin();
admin.xxx(tableName)
//创建表名对象
 final String HTable="kb16nb:student";
TableName tableName = TableName.valueOf(HTable);

//操作数据
Table table=hbaseCon.getTable(tableName);
//单行添加数据 
Put row = new Put(byte[] rowkey);
row.addColumn(byte[] columnfamily,byte[] column,byte[] value)
...
table.put(row);

//多行(少量)
List<Put> rows =new ArrayList();
rows.add(row);
...
table.put(rows);

//批处理
  //lambada 创建hbase 批量插入数据异常侦听对象
  //java中函数式接口,可以用lambda表达式写
BufferedMutator.ExceptionListener listener=(e,mutator)->{
    //异常信息(原因)
    String msg = e.getMessage();
    //出异常的行数
    int numExceptions = e.getNumExceptions();
    //记录出异常的行的行键,以便事后检查并再处理
    //用Log4j记录
    logger.error("HBASE MUTATE EXCEPTION : "+msg+","+numExceptions);
    if(numExceptions>0){
        StringBuilder builder=new StringBuilder();
        builder.append(Bytes.toString(e.getRow(0).getRow()));
        final String SEP=",";
        for (int i = 0; i <numExceptions ; i++) {
            builder.append(SEP);
            builder.append(Bytes.toString(e.getRow(i).getRow()));
        }
        logger.error(builder.toString());
    }
};

final int BUFFER_SIZE=8*1024;
BufferedMutatorParams bmp=new BufferedMutatorParams(tableName)
        .listener(listener).writeBufferSize(BUFFER_SIZE);
BufferedMutator mutator=hbaseCon.getBufferedMutator(bmp);

//创建List
List<Put> list=new ArrayList<>(BUFFER_SIZE);
...
//放入数据
mutator.mutate(list);

log4j日志
#1导入jar包
#2做配置信息 resources/log4j.properties 
log4j.rootLogger=INFO, stdout, logfile
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=log/hd.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
#3创建对象
private static final Logger logger=Logger.getLogger(App.class);

代码如下:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

/**
 * java api 操作 hbase,插入数据
 *
 */
public class App 
{
    private static final Logger logger=Logger.getLogger(App.class);
    static void close(AutoCloseable...closes){
        for (AutoCloseable close : closes) {
            if(null!=close){
                try {
                    close.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

        }
    }

    public static void main( String[] args ) {
        Configuration config = HBaseConfiguration.create();
        config.set("hbase.zookeeper.quorum","single01:2181 ");
        //base连接对象
        Connection hbaseCon=null;
        //admin操作对象(命名空间,数据表...)
        Admin admin =null;
        //table操作表数据
        Table table =null;
        try {
             hbaseCon = ConnectionFactory.createConnection(config);
             admin=hbaseCon.getAdmin();

             final String HTable="kb16nb:student";
            TableName tableName = TableName.valueOf(HTable);
            if (admin.tableExists(tableName)) {

                table=hbaseCon.getTable(tableName);

                //lambda 创建hbase 批量插入数据的异常侦听对象
                //java中一个接口只有一个方法,可以用lambda表达式写
                BufferedMutator.ExceptionListener listener=(e,mutator)->{
                    //异常信息(原因)
                    String msg = e.getMessage();
                    //出异常的行数
                    int numExceptions = e.getNumExceptions();
                    //用Log4j记录,记录出异常的行的行键,以便事后检查并再处理
                    logger.error("HBASE MUTATE EXCEPTION : "+msg+","+numExceptions);
                    if(numExceptions>0){
                        StringBuilder builder=new StringBuilder();
                        builder.append(Bytes.toString(e.getRow(0).getRow()));
                        final String SEP=",";
                        for (int i = 1; i <numExceptions ; i++) {
                            builder.append(SEP);
                            builder.append(Bytes.toString(e.getRow(i).getRow()));
                        }
                        logger.error(builder.toString());
                    }
                };

                final int BUFFER_SIZE=8*1024;
                BufferedMutatorParams bmp=new BufferedMutatorParams(tableName)
                        .listener(listener).writeBufferSize(BUFFER_SIZE);
                BufferedMutator mutator=hbaseCon.getBufferedMutator(bmp);

                //创建List,放入数据
                List<Put> list=new ArrayList<>(BUFFER_SIZE);
                Random rand=new Random();
                int sum = 0;
                for (int i = 0; i < 100000; i++) {
                    Put put = new Put(Bytes.toBytes(1    + i + ""));
                    put.addColumn(Bytes.toBytes("base"),Bytes.toBytes("name"),Bytes.toBytes("henry"+i));
                    put.addColumn(Bytes.toBytes("base"),Bytes.toBytes("age"),Bytes.toBytes(18+rand.nextInt(1)));
                    int product =rand.nextInt(2);
                    String columnFamily=product ==0?"bigdata":"cloud";
                    String subject01=product ==0 ? "hive":"net";
                    String subject02=product ==0 ? "hbase":"shell";
                    int socre01=55+rand.nextInt(45),socre02=55+rand.nextInt(45);
                    put.addColumn(Bytes.toBytes("base"),Bytes.toBytes("product"),Bytes.toBytes(columnFamily));
                    put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(subject01),Bytes.toBytes(socre01));
                    put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(subject02),Bytes.toBytes(socre02));
                    list.add(put);
                    if(list.size()>=BUFFER_SIZE){
                        mutator.mutate(list);
                        sum+=list.size();
                        System.out.println("CURRENT"+list.size()+",SUM"+sum);
                        list.clear();
                    }
                }

                if (!list.isEmpty()){
                    mutator.mutate(list);
                    sum+=list.size();
                    System.out.println("CURRENT"+list.size()+",SUM"+sum);
                    list.clear();
                }


                //单行添加数据
                // 通过rowkey创建 Put(ROW行) 对象
                //Bytes.toBytes("Object") 把任意对象转变为字节数组
                /*Put put = new Put(Bytes.toBytes("1"));
                put.addColumn(Bytes.toBytes("base"),Bytes.toBytes("name"),Bytes.toBytes("lisi"));
                put.addColumn(Bytes.toBytes("base"),Bytes.toBytes("age"),Bytes.toBytes(18));
                put.addColumn(Bytes.toBytes("base"),Bytes.toBytes("product"),Bytes.toBytes("bigdata"));

                put.addColumn(Bytes.toBytes("bigdata"),Bytes.toBytes("hive"),Bytes.toBytes(66));
                put.addColumn(Bytes.toBytes("bigdata"),Bytes.toBytes("hbasee"),Bytes.toBytes(73));

                table.put(put);
                System.out.println("SUCCEED");*/

            }else{
                System.out.println(HTable+" NOT EXISTE");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally{
            close(table,admin,hbaseCon);

        }
    }
}
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐