利用JAVA向Hbase表中批量插入数据

插入数据的文件格式要求:

1.文件名格式为:命名空间名_表名_时间戳.时间戳

如:dsj_test_1624591726565.1624591726565

2.文件内格式:
第一行用于描述表结构
:行键 ,列簇名1:列名1,列簇名1:列名2,列簇名2:列名1

	如::key,cf1:name,cf2:age,cf2:pos,cf2:salary,cf3:rst

其他行为具体插入的数据

:key,cf1:name,cf2:age,cf2:pos,cf2:salary,cf3:rst
zbstu0,henry0,20,market clerk,1,how are you
zbstu1,henry1,24,market clerk,3,how are you
zbstu2,henry2,34,market clerk,2,how are you
zbstu3,henry3,33,market clerk,3,how are you

配置文件内容

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=log/hd.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

JAVA实现代码

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;


public class Java2HBase {

    static Configuration config = null;
//加载配置文件
    public static void  init(String...items){
       config =HBaseConfiguration.create();
        for (String item : items) {
            String[] ps = item.split("=");
            config.set(ps[0],ps[1]);
        }
    }

//用于释放资源
    private  static  void  close(AutoCloseable...closes)  {
        for (AutoCloseable close : closes) {
            if (close!=null) {
                try {
                    close.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }


    private  static Connection con() throws IOException {
        return ConnectionFactory.createConnection(config);
    }

    private  static Admin admin(Connection con) throws IOException {
        return con.getAdmin();
    }
     /**
     * 将file路径指向的文件数据映射至hbase
     * 文件名即表名,为了防止命名冲突:tablename_timestamp
     * 文件首行为表结构: :key,cf:col,...
     *
     * @param file
     */
    public  static void putBatch(String file,String regexSep){
        File data = new File(file);
        Connection con = null;
        BufferedReader br=null;
        BufferedMutator mutator= null;
            try {
                //输入文件验证
                if (!data.exists() || !data.isFile()) {
                    throw new IOException(file + "not exist or not file error");
                }
                //解析hbase表名
                String[] ns = data.getName().split("_|\\.");
                String tablename = ns[0]+":"+ns[1];
                con = con();
                TableName tn = TableName.valueOf(tablename);
                //验证hbase表是否存在
                if (!tableExists(con,tn)){
                    throw new IOException("hbase table ["+tablename+"] not exist error");
                }
                //通过文件首行解析hbase表结构
                br = new BufferedReader(new FileReader(data));
                String  line = null;
                if (null==(line=br.readLine())){
                    throw new IOException("file [ "+file +" ] empty error");
                }
                String[] ps = line.split(regexSep);
                //创建批量插入异常侦听
                DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
                BufferedMutator.ExceptionListener listener = (e,_mutator)->{
                    System.err.println("put data into table ["+tablename+"] error "
                            +e.getNumExceptions()+" rows,retry put at "+dtf.format(LocalDateTime.now()));
                    int count=0;
                    for (int i = 0; i < e.getNumExceptions(); i++) {
                        Row row = e.getRow(i);
                        try {
                            _mutator.mutate((Put)row);
                            count++;
                        } catch (IOException ex) {
                            ex.printStackTrace();
                            System.err.println("retry put "+row+" error,please check it");
                        }
                        System.err.println(" retry put data into table ["+tablename+"] from error total "
                                +e.getNumExceptions()+" rows, finish "+ count +"rows , at "+dtf.format(LocalDateTime.now()));
                    }

                };

                BufferedMutatorParams bmp = new BufferedMutatorParams(tn)
                        .writeBufferSize(8*1024*1024)
                        .listener(listener);
                 mutator = con.getBufferedMutator(bmp);
                int count = 0,CAPACITY = 1000;
                List<Put> list = new ArrayList<>(CAPACITY);
                Put put = null;
                while (null != (line = br.readLine())){
                    String[] arr = line.split(regexSep);
                    put= new Put (Bytes.toBytes(arr[0]));
                    for (int i = 1; i <ps.length ; i++) {
                        String[] ts = ps[i].split(":");
                        put.addColumn(Bytes.toBytes(ts[0]),Bytes.toBytes(ts[1]),Bytes.toBytes(arr[i]));
                    }
                    list.add(put);

                    if (list.size()==CAPACITY){
                        mutator.mutate(list);
                        count += list.size();
                        list.clear();
                    }
                }
                mutator.mutate(list);
                count += list.size();
                list.clear();
                System.out.println("batch put into [ "+tablename+","+count+" rows ] from [ "+ file+" ]in success");
            } catch (IOException e) {
                e.printStackTrace();
                System.err.println("batch put  from [ "+ file+" ]in failure");
            }finally {
                close(br,mutator,con);
            }

    }

}
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐