shell--buckload

数据准备
1,zss,23,M
2,lss,33,M
3,fj,35,F
4,ny,42,M
5,xq,44,F
1)在hbase中创建表  
create  'tb_friends' , 'cf'
2) 使用shell命令将数据转换成hfile文件
hbase org.apache.hadoop.hbase.mapreduce.ImportTsv \   -- 使用类执行操作
-Dimporttsv.separator=, \  --指定行数据的字段分隔符
-Dimporttsv.columns='HBASE_ROW_KEY,cf:name,cf:age,cf:gender' \  -- hbase中数据封装
-Dimporttsv.bulk.output=/fs/output \  -- hfile文件存储在HDFS上的路径
tb_friends \   --hbase中的表 
/csv/friends.csv  --输入的文件路径 文件放在HDFS上
3) 将hfile文件导入到表中
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /fs/output/  tb_friends 

java--使用MR直接将数据写入HBase表中

public class LoadMovie {

    static class LoadMapper extends Mapper<LongWritable, Text ,Text , MovieBean> {
        Text k = new  Text() ;
        Gson gs = new Gson() ;
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            try {
                String line = value.toString();
                // 解析json
                MovieBean mb = gs.fromJson(line, MovieBean.class);
                // 获取电影id
                String movie = mb.getMovie();
                // 定长电影id
                String newMid = StringUtils.leftPad(movie, 6, '0');
                // 时间
                String timeStamp = mb.getTimeStamp();
                String newTime = StringUtils.leftPad(timeStamp, 10, "0");
                String  rowkey = newMid+"_"+newTime;
                k.set(rowkey);
                // 输出数据    rk
                context.write(k , mb);
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }

    /**
     * 将数据直接写入到Hbase表中
     *   Put 将每条数据封装在Put中
     */
    static class LoadReducer extends TableReducer<Text , MovieBean , ImmutableBytesWritable> {
        @Override
        protected void reduce(Text key, Iterable<MovieBean> values, Context context) throws IOException, InterruptedException {
            // 行键
            String rk = key.toString();
            // 获取rk对应的电影Bean数据
            MovieBean mb = values.iterator().next();
            // 获取数据属性
            String movie = mb.getMovie();
            double rate = mb.getRate();
            String timeStamp = mb.getTimeStamp();
            String uid = mb.getUid();
            // 创建put
            Put put = new Put(Bytes.toBytes(rk));
            // 添加单元格
            put.addColumn("cf".getBytes(), "movie".getBytes(), Bytes.toBytes(movie));
            put.addColumn("cf".getBytes(), "rate".getBytes(), Bytes.toBytes(rate));
            put.addColumn("cf".getBytes(), "timeStamp".getBytes(), Bytes.toBytes(timeStamp));
            put.addColumn("cf".getBytes(), "uid".getBytes(), Bytes.toBytes(uid));
            // 写出去
            context.write(null, put);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "linux01:2181,linux02:2181,linux03:218");
        Job job = Job.getInstance(conf);
        job.setMapperClass(LoadMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(MovieBean.class);
        FileInputFormat.setInputPaths(job, new Path("E:\\mrdata\\movie\\input"));
        TableMapReduceUtil.initTableReducerJob("tb_movie" , LoadReducer.class,job);
        job.waitForCompletion(true);
    }
}
public class MovieBean implements Writable {
    private  String  movie ;
    private  double  rate ;
    private  String timeStamp ;
    private  String uid ;

    /**
     * 注意保留空参构造器
     * @return
     */
    public String getMovie() {
        return movie;
    }

    public void setMovie(String movie) {
        this.movie = movie;
    }

    public double getRate() {
        return rate;
    }

    public void setRate(double rate) {
        this.rate = rate;
    }

    public String getTimeStamp() {
        return timeStamp;
    }

    public void setTimeStamp(String timeStamp) {
        this.timeStamp = timeStamp;
    }

    public String getUid() {
        return uid;
    }

    public void setUid(String uid) {
        this.uid = uid;
    }

    @Override
    public String toString() {
        return "Movie{" +
                "movie='" + movie + '\'' +
                ", rate=" + rate +
                ", timeStamp='" + timeStamp + '\'' +
                ", uid='" + uid + '\'' +
                '}';
    }
    // 序列化
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(movie);
        dataOutput.writeDouble(rate);
        dataOutput.writeUTF(timeStamp);
        dataOutput.writeUTF(uid);

    }
    // 反序列化
    public void readFields(DataInput dataInput) throws IOException {
        // 和写出的数据类型和顺序保证一致
        movie = dataInput.readUTF();
        rate =dataInput.readDouble();
        timeStamp = dataInput.readUTF();
        uid = dataInput.readUTF() ;

    }

}

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐