MapReduce 操作 Hbase
目录1.hbase建表2.创建数据3.导入依赖4.写map和job5.输出结果6.虚拟机查询1.hbase建表hbase(main):002:0> create 'emp','info'0 row(s) in 1.7460 seconds=> Hbase::Table - emp2.创建数据[root@hadoop dool]# vim emp.txt1201,gopal,manage
·
目录
一、配置
1.hbase建表
hbase(main):002:0> create 'emp','info'
0 row(s) in 1.7460 seconds
=> Hbase::Table - emp
2.创建数据
[root@hadoop dool]# vim emp.txt
1201,gopal,manager,50000,TP
1202,manisha,preader,50000,TP
1203,kalil,phpdev,30000,AC
1204,prasanth,phpdev,30000,AC
1205,kranthi,admin,20000,TP
1206,satishp,grpdes,20000,GR
3.导入依赖
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.4.13</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.4.13</version>
</dependency>
</dependencies>
二、从HDFS中导入到HBase中
1.写map和job
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
public class toHbase extends Configured implements Tool {
public static void main(String[] args) {
try {
ToolRunner.run(new toHbase(),null);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public int run(String[] strings) throws Exception {
//获取连接
Configuration configuration = new Configuration();
//先指定HDFS的位置
configuration.set("fs.defaultFS", "hdfs://192.168.17.151:9000");
//指定hbase位置
configuration.set("hbase.rootdir","hdfs://hadoop:9000/hbase");
//指定zookeeper的位置
configuration.set("hbase.zookeeper.quorum", "192.168.17.151");
//获取job
Job job = Job.getInstance(configuration);
//指定主类的位置mapper的位置
job.setJarByClass(toHbase.class);
job.setMapperClass(toMapper.class);
//指定Mapper输出的key和value的类型
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
//指定输入路径
FileInputFormat.addInputPath(job,new Path("/emps.txt"));
//指定输出表格
TableMapReduceUtil.initTableReducerJob("emp",null,job);
//指定输出的reduce数量,最少一个
job.setNumReduceTasks(1);
//提交任务
boolean b = job.waitForCompletion(true);
System.out.println(b);
return 0;
}
}
class toMapper extends Mapper<LongWritable, Text , ImmutableBytesWritable, Put> {
private ImmutableBytesWritable rowKey = new ImmutableBytesWritable();
//私有byte数组,字符串转byte格式
private byte[] info = Bytes.toBytes("info");
private byte[] name = Bytes.toBytes("name");
private byte[] job = Bytes.toBytes("job");
private byte[] salary = Bytes.toBytes("salary");
private byte[] deptName = Bytes.toBytes("deptName");
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context) throws IOException, InterruptedException {
//拆分
String[] split = value.toString().split(",");
//作判断
if(split.length == 5){
//rowKey赋值
rowKey.set(Bytes.toBytes(split[0]));
//创建put对象
Put put = new Put(Bytes.toBytes(split[0]));
put.addColumn(info,name,Bytes.toBytes(split[1]));
put.addColumn(info,job,Bytes.toBytes(split[2]));
put.addColumn(info,salary,Bytes.toBytes(split[3]));
put.addColumn(info,deptName,Bytes.toBytes(split[4]));
context.write(rowKey,put);
}
}
}
2.输出结果
3.虚拟机查询
hbase(main):007:0> scan 'emp'
ROW COLUMN+CELL
1201 column=info:deptName, timestamp=1650564108850, value=TP
1201 column=info:job, timestamp=1650564108850, value=manager
1201 column=info:name, timestamp=1650564108850, value=gopal
1201 column=info:salary, timestamp=1650564108850, value=50000
1202 column=info:deptName, timestamp=1650564108850, value=TP
1202 column=info:job, timestamp=1650564108850, value=preader
1202 column=info:name, timestamp=1650564108850, value=manisha
1202 column=info:salary, timestamp=1650564108850, value=50000
1203 column=info:deptName, timestamp=1650564108850, value=AC
1203 column=info:job, timestamp=1650564108850, value=phpdev
1203 column=info:name, timestamp=1650564108850, value=kalil
1203 column=info:salary, timestamp=1650564108850, value=30000
1204 column=info:deptName, timestamp=1650564108850, value=AC
1204 column=info:job, timestamp=1650564108850, value=phpdev
1204 column=info:name, timestamp=1650564108850, value=prasanth
1204 column=info:salary, timestamp=1650564108850, value=30000
1205 column=info:deptName, timestamp=1650564108850, value=TP
1205 column=info:job, timestamp=1650564108850, value=admin
1205 column=info:name, timestamp=1650564108850, value=kranthi
1205 column=info:salary, timestamp=1650564108850, value=20000
1206 column=info:deptName, timestamp=1650564108850, value=GR
1206 column=info:job, timestamp=1650564108850, value=grpdes
1206 column=info:name, timestamp=1650564108850, value=satishp
1206 column=info:salary, timestamp=1650564108850, value=20000
6 row(s) in 0.2600 seconds
三、从HBase中导出到HDFS
1.代码如下:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
* MapReduce操作HBase:将HBase中的数据写入到HDFS
*/
public class FromHBaseToHDFS extends Configured implements Tool {
public static void main(String[] args) {
try {
ToolRunner.run(new FromHBaseToHDFS(),null);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public int run(String[] strings) throws Exception {
Configuration configuration = new Configuration();
//先指定HDFS的位置
configuration.set("fs.defaultFS", "hdfs://192.168.17.151:9000");
//指定hbase位置
configuration.set("hbase.rootdir","hdfs://hadoop:9000/hbase");
//指定zookeeper的位置
configuration.set("hbase.zookeeper.quorum", "192.168.17.151");
//获取job
Job job = Job.getInstance(configuration);
job.setJarByClass(FromHBaseToHDFS.class);
//创建过滤器
BinaryComparator binaryComparator = new BinaryComparator("30000".getBytes());
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("info".getBytes(), "salary".getBytes(), CompareFilter.CompareOp.LESS_OR_EQUAL, binaryComparator);
//创建扫描器
Scan scan = new Scan();
scan.setFilter(singleColumnValueFilter);
TableMapReduceUtil.initTableMapperJob("emps",scan,HDFSMapper.class,NullWritable.class,Text.class,job);
//设置reduce
job.setReducerClass(HDFSReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
Path path=new Path("/0427");
FileSystem fileSystem = FileSystem.get(configuration);
//递归删除
if(fileSystem.exists(path)){
fileSystem.delete(path,true);
}
FileOutputFormat.setOutputPath(job, path);
boolean b = job.waitForCompletion(true);
System.out.println(b);
return 0;
}
}
class HDFSMapper extends TableMapper<NullWritable, Text>{
@Override
protected void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, NullWritable, Text>.Context context) throws IOException, InterruptedException {
Text val = new Text();
String row = null;
String name = null;
String job = null;
String salary = null;
String deptName = null;
for (Cell c : value.listCells()) {
//拿到id
row = Bytes.toString(c.getRow());
if(Bytes.toString(CellUtil.cloneQualifier(c)).equals("name")){
byte[] value1 = CellUtil.cloneValue(c);
name = Bytes.toString(value1);
}
if(Bytes.toString(CellUtil.cloneQualifier(c)).equals("job")){
byte[] value2 = CellUtil.cloneValue(c);
job = Bytes.toString(value2);
}
if(Bytes.toString(CellUtil.cloneQualifier(c)).equals("salary")){
byte[] value2 = CellUtil.cloneValue(c);
salary = Bytes.toString(value2);
}
if(Bytes.toString(CellUtil.cloneQualifier(c)).equals("deptName")){
byte[] value2 = CellUtil.cloneValue(c);
deptName = Bytes.toString(value2);
}
}
val.set("row:"+row+",name:"+name+",job:"+job+",salary:"+salary+",deptName:"+deptName);
context.write(NullWritable.get(),val);
}
}
class HDFSReducer extends Reducer<NullWritable, Text, NullWritable, Text>{
@Override
protected void reduce(NullWritable key, Iterable<Text> values, Reducer<NullWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException {
for (Text value : values) {
context.write(NullWritable.get(),value);
}
}
}
2.输出结果
3.虚拟机查询
[root@hadoop ~]# hdfs dfs -cat /0427/part-r-00000
row:1206,name:satishp,job:grpdes,salary:20000,deptName:GR
row:1205,name:kranthi,job:admin,salary:20000,deptName:TP
row:1204,name:prasanth,job:phpdev,salary:30000,deptName:AC
row:1203,name:kalil,job:phpdev,salary:30000,deptName:AC
更多推荐
已为社区贡献5条内容
所有评论(0)