前提条件

IDEA2021

hadoop2.x

hbase1.x

本博文使用的版本为hadoop2.7.3,hbase1.7.1

需求

MapReduce读取HBase的数据,使用MapReduce对读取到的数据进行词频统计处理,把计算结果输出到HBase中。

数据准备

启动hdfs和hbase

start-dfs.sh
start-hbase.sh

进入hbase shell命令行

hbase shell

创建输入表word

创建输入表word
create 'word','content'
插入数据
put 'word','1001','content:info','when all else is lost the future still remains'
put 'word','1002','content:info','sow nothing reap nothing'
put 'word','1003','content:info','keep on going never give up'
put 'word','1004','content:info','the wealth of the mind is the only wealth'

创建输出表result

create 'result','content'

编码

新建maven工程

添加依赖

pom.xml添加如下依赖:

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.7.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.7.1</version>
        </dependency>

编写Mapper

package org.example.mr;

import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import java.io.IOException;
//                                           Mapper输出k,v类型
public class WordCountMapper extends TableMapper<Text, IntWritable> {
    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
        // getValue参数列族,列
        String data = Bytes.toString(value.getValue(Bytes.toBytes("content"), Bytes.toBytes("info")));
        String[] words = data.split(" ");
        for (String w : words) {
            context.write(new Text(w), new IntWritable(1));
        }
    }
}

编写Reducer

package org.example.mr;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import java.io.IOException;

//                                          数据类型 KEYIN, VALUEIN, KEYOUT
//                                                    k3,  v3,      rowkey
public class WordCountReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
    @Override
    protected void reduce(Text k3, Iterable<IntWritable> v3, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable i : v3) {
            sum = sum + i.get();
        }

        //输出:表中的一条记录 Put对象
        //使用单词作为行键
        Put put = new Put(Bytes.toBytes(k3.toString()));
        // 列族, 列限定符, 值
        put.addColumn(Bytes.toBytes("content"), Bytes.toBytes("count"), Bytes.toBytes(String.valueOf(sum)));

        //写入HBase
        context.write(new ImmutableBytesWritable(Bytes.toBytes(k3.toString())), put);
    }
}

编写Main

package org.example.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;

import java.io.IOException;

public class WordCountMain {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        // 获取配置,并设置hbase的zk地址
        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum","192.168.193.140");
        // 创建job,并设置主类
        Job job = Job.getInstance(conf);
        job.setJarByClass(WordCountMain.class);

        // scan查询实例
        Scan scan = new Scan();
        // 添加查询的列,参数为列族、列
        scan.addColumn(Bytes.toBytes("content"),Bytes.toBytes("info"));// 参数:列族,列
        // 设置job的读取输入的表,Mapper类,输出k,v类型
        TableMapReduceUtil.initTableMapperJob("word", scan, WordCountMapper.class,
                Text.class, IntWritable.class, job);
        // 设置job的Reducer类,及Reduce输出到的表
        TableMapReduceUtil.initTableReducerJob("result", WordCountReducer.class, job);
        // 执行job
        job.waitForCompletion(true);
    }
}

测试

IDEA运行main方法,报错如下:

解决方法为

pom.xml添加如下jackson依赖

        <dependency>
            <groupId>org.codehaus.jackson</groupId>
            <artifactId>jackson-core-asl</artifactId>
            <version>1.9.13</version>
        </dependency>
        <dependency>
            <groupId>org.codehaus.jackson</groupId>
            <artifactId>jackson-mapper-asl</artifactId>
            <version>1.9.13</version>
        </dependency>

IDEA再次运行main方法,成功

查看结果

完成!enjoy it! 

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐