HBase批量导入操作
shell--buckload数据准备1,zss,23,M2,lss,33,M3,fj,35,F4,ny,42,M5,xq,44,F1)在hbase中创建表create'tb_friends' , 'cf'2) 使用shell命令将数据转换成hfile文件hbase org.apache.hadoop.hbase.mapreduce.ImportTsv \-- 使用类执行操作-Dimporttsv
·
shell--buckload
数据准备
1,zss,23,M
2,lss,33,M
3,fj,35,F
4,ny,42,M
5,xq,44,F
1)在hbase中创建表
create 'tb_friends' , 'cf'
2) 使用shell命令将数据转换成hfile文件
hbase org.apache.hadoop.hbase.mapreduce.ImportTsv \ -- 使用类执行操作
-Dimporttsv.separator=, \ --指定行数据的字段分隔符
-Dimporttsv.columns='HBASE_ROW_KEY,cf:name,cf:age,cf:gender' \ -- hbase中数据封装
-Dimporttsv.bulk.output=/fs/output \ -- hfile文件存储在HDFS上的路径
tb_friends \ --hbase中的表
/csv/friends.csv --输入的文件路径 文件放在HDFS上
3) 将hfile文件导入到表中
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /fs/output/ tb_friends
java--使用MR直接将数据写入HBase表中
public class LoadMovie {
static class LoadMapper extends Mapper<LongWritable, Text ,Text , MovieBean> {
Text k = new Text() ;
Gson gs = new Gson() ;
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
try {
String line = value.toString();
// 解析json
MovieBean mb = gs.fromJson(line, MovieBean.class);
// 获取电影id
String movie = mb.getMovie();
// 定长电影id
String newMid = StringUtils.leftPad(movie, 6, '0');
// 时间
String timeStamp = mb.getTimeStamp();
String newTime = StringUtils.leftPad(timeStamp, 10, "0");
String rowkey = newMid+"_"+newTime;
k.set(rowkey);
// 输出数据 rk
context.write(k , mb);
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 将数据直接写入到Hbase表中
* Put 将每条数据封装在Put中
*/
static class LoadReducer extends TableReducer<Text , MovieBean , ImmutableBytesWritable> {
@Override
protected void reduce(Text key, Iterable<MovieBean> values, Context context) throws IOException, InterruptedException {
// 行键
String rk = key.toString();
// 获取rk对应的电影Bean数据
MovieBean mb = values.iterator().next();
// 获取数据属性
String movie = mb.getMovie();
double rate = mb.getRate();
String timeStamp = mb.getTimeStamp();
String uid = mb.getUid();
// 创建put
Put put = new Put(Bytes.toBytes(rk));
// 添加单元格
put.addColumn("cf".getBytes(), "movie".getBytes(), Bytes.toBytes(movie));
put.addColumn("cf".getBytes(), "rate".getBytes(), Bytes.toBytes(rate));
put.addColumn("cf".getBytes(), "timeStamp".getBytes(), Bytes.toBytes(timeStamp));
put.addColumn("cf".getBytes(), "uid".getBytes(), Bytes.toBytes(uid));
// 写出去
context.write(null, put);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "linux01:2181,linux02:2181,linux03:218");
Job job = Job.getInstance(conf);
job.setMapperClass(LoadMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(MovieBean.class);
FileInputFormat.setInputPaths(job, new Path("E:\\mrdata\\movie\\input"));
TableMapReduceUtil.initTableReducerJob("tb_movie" , LoadReducer.class,job);
job.waitForCompletion(true);
}
}
public class MovieBean implements Writable {
private String movie ;
private double rate ;
private String timeStamp ;
private String uid ;
/**
* 注意保留空参构造器
* @return
*/
public String getMovie() {
return movie;
}
public void setMovie(String movie) {
this.movie = movie;
}
public double getRate() {
return rate;
}
public void setRate(double rate) {
this.rate = rate;
}
public String getTimeStamp() {
return timeStamp;
}
public void setTimeStamp(String timeStamp) {
this.timeStamp = timeStamp;
}
public String getUid() {
return uid;
}
public void setUid(String uid) {
this.uid = uid;
}
@Override
public String toString() {
return "Movie{" +
"movie='" + movie + '\'' +
", rate=" + rate +
", timeStamp='" + timeStamp + '\'' +
", uid='" + uid + '\'' +
'}';
}
// 序列化
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(movie);
dataOutput.writeDouble(rate);
dataOutput.writeUTF(timeStamp);
dataOutput.writeUTF(uid);
}
// 反序列化
public void readFields(DataInput dataInput) throws IOException {
// 和写出的数据类型和顺序保证一致
movie = dataInput.readUTF();
rate =dataInput.readDouble();
timeStamp = dataInput.readUTF();
uid = dataInput.readUTF() ;
}
}
更多推荐
已为社区贡献2条内容
所有评论(0)