使用MapReduce处理HBase数据
前提条件IDEA2021安装好hadoop2.x安装好hbase1.x本博文使用的版本为hadoop2.7.3,hbase1.7.1需求MapReduce读取HBase的数据,使用MapReduce对读取到的数据进行词频统计处理,把计算结果输出到HBase中。数据准备启动hadoop和hbasestart-dfs.shstart-hbase.sh进入hbase shell命令行创建输入表word创
·
前提条件
IDEA2021
hadoop2.x
hbase1.x
本博文使用的版本为hadoop2.7.3,hbase1.7.1
需求
MapReduce读取HBase的数据,使用MapReduce对读取到的数据进行词频统计处理,把计算结果输出到HBase中。
数据准备
启动hdfs和hbase
start-dfs.sh
start-hbase.sh
进入hbase shell命令行
hbase shell
创建输入表word
创建输入表word
create 'word','content'
插入数据
put 'word','1001','content:info','when all else is lost the future still remains'
put 'word','1002','content:info','sow nothing reap nothing'
put 'word','1003','content:info','keep on going never give up'
put 'word','1004','content:info','the wealth of the mind is the only wealth'
创建输出表result
create 'result','content'
编码
新建maven工程
添加依赖
pom.xml添加如下依赖:
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.7.1</version>
</dependency>
编写Mapper
package org.example.mr;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;
// Mapper输出k,v类型
public class WordCountMapper extends TableMapper<Text, IntWritable> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
// getValue参数列族,列
String data = Bytes.toString(value.getValue(Bytes.toBytes("content"), Bytes.toBytes("info")));
String[] words = data.split(" ");
for (String w : words) {
context.write(new Text(w), new IntWritable(1));
}
}
}
编写Reducer
package org.example.mr;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;
// 数据类型 KEYIN, VALUEIN, KEYOUT
// k3, v3, rowkey
public class WordCountReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
@Override
protected void reduce(Text k3, Iterable<IntWritable> v3, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable i : v3) {
sum = sum + i.get();
}
//输出:表中的一条记录 Put对象
//使用单词作为行键
Put put = new Put(Bytes.toBytes(k3.toString()));
// 列族, 列限定符, 值
put.addColumn(Bytes.toBytes("content"), Bytes.toBytes("count"), Bytes.toBytes(String.valueOf(sum)));
//写入HBase
context.write(new ImmutableBytesWritable(Bytes.toBytes(k3.toString())), put);
}
}
编写Main
package org.example.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import java.io.IOException;
public class WordCountMain {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 获取配置,并设置hbase的zk地址
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum","192.168.193.140");
// 创建job,并设置主类
Job job = Job.getInstance(conf);
job.setJarByClass(WordCountMain.class);
// scan查询实例
Scan scan = new Scan();
// 添加查询的列,参数为列族、列
scan.addColumn(Bytes.toBytes("content"),Bytes.toBytes("info"));// 参数:列族,列
// 设置job的读取输入的表,Mapper类,输出k,v类型
TableMapReduceUtil.initTableMapperJob("word", scan, WordCountMapper.class,
Text.class, IntWritable.class, job);
// 设置job的Reducer类,及Reduce输出到的表
TableMapReduceUtil.initTableReducerJob("result", WordCountReducer.class, job);
// 执行job
job.waitForCompletion(true);
}
}
测试
IDEA运行main方法,报错如下:
解决方法为
pom.xml添加如下jackson依赖
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
IDEA再次运行main方法,成功
查看结果
完成!enjoy it!
更多推荐
已为社区贡献14条内容
所有评论(0)