MapReduce——wordcount

//先在开始前介绍环境:

//Hadoop2.7.5 zookeeper3.4.9 虚拟机JDK jdk1.8.0_141 本地JDK jdk-8u241-windows-x64

//环境配置详见

[https://blog.csdn.net/weixin_47878012/article/details/121579060]:

//IDEA必不可少

//IDEA必不可少

//IDEA必不可少

//在开始前将hdfs-site.xml当中的权限关闭

<property>
  <name>dfs.permissions</name>
  <value>false</value>
</property>

//项目git地址添加链接描述

1.配置windows的Hadoop运行环境

MapReduce本地运行时需要在windows系统需要配置hadoop运行环境,否则直接运行代码会出现以下问题:

缺少winutils.exe

Could not locate executable null \bin\winutils.exe in the hadoop binaries 

缺少hadoop.dll

Unable to load native-hadoop library for your platform… using builtin-Java 
classes where applicable 

步骤:

Hadoop2.7.5配置好的文件已经上传到我的博客

[https://download.csdn.net/download/weixin_47878012/85075777?spm=1001.2014.3001.5503]:

第一步:将hadoop2.7.5文件夹拷贝到一个没有中文没有空格的路径下面

第二步:在windows上面配置hadoop的环境变量: HADOOP_HOME,并 将%HADOOP_HOME%\bin添加到path中

第三步:把hadoop2.7.5文件夹中bin目录下的hadoop.dll文件放到系统盘: C:\Windows\System32 目录 第四步:关闭windows重启

在这里插入图片描述

2.MapReduce 编程规范

Map 阶段 2 个步骤

  1. 设置 InputFormat 类, 将数据切分为 Key-Value(K1和V1) 对, 输入到第二步
  2. 自定义 Map 逻辑, 将第一步的结果转换成另外的 Key-Value(K2和V2) 对, 输出结果

Shuffle阶段4个步骤

3.对输出的 Key-Value 对进行分区

4.对不同分区的数据按照相同的 Key 排序

5.(可选) 对分组过的数据初步规约, 降低数据的网络拷贝

6.对数据进行分组, 相同 Key 的 Value 放入一个集合中

Reduce 阶段 2 个步骤

7.对多个 Map 任务的结果进行排序以及合并, 编写 Reduce 函数实现自己的逻辑, 对输入的 Key-Value 进行处理, 转为新的 Key-Value(K3和V3)输出

8.设置 OutputFormat 处理并保存 Reduce 输出的 Key-Value 数据

3.WordCount

//需求: 在一堆给定的文本文件中统计输出每一个单词出现的总次数

Step 1. 数据格式准备

1.创建一个新的文件

cd /export/servers
vim wordcount.txt

2.向其中放入以下内容并保存

hello,world,hadoop
hive,sqoop,flume,hello
kitty,tom,jerry,world
hadoop

3.上传到 HDFS

hdfs dfs -mkdir /wordcount/
hdfs dfs -put wordcount.txt /wordcount/

4.创建Maven工程导入Maven坐标

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <!--XAUFE@yzy--><!--XAUFE@yzy--><!--XAUFE@yzy--><!--XAUFE@yzy--><!--XAUFE@yzy--><!--XAUFE@yzy-->

    <modelVersion>4.0.0</modelVersion>
    <groupId>XAUFE.yzy</groupId>
    <artifactId>WordCount</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>
    <!--XAUFE@yzy-->
    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.7.5</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.7.4</version>
            <!--XAUFE@yzy-->
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                    <!--    <verbal>true</verbal>-->
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <minimizeJar>true</minimizeJar>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

        </plugins>
    </build>

</project>

Step 2. Mapper

package cn.itcast.mapreduce;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
    /*为了解决序列化问题Mapper自己定义了数据类型XAUFE@yzy
    四个泛型解释:
    KEYIN:K1的类型
    VALUEIN:K2的类型

    KEYOUT:K2的类型
    VALUEOUT:V2的类型
    * */
public class WordCountMapper extends Mapper<LongWritable,Text,Text,LongWritable> {
    //map方法就是将k1v1转为k2v2
    /*参数
    key k1  行偏移量
    value v1    每一行的文本数据
    context 表示上下文对象
    如何将k1v1转化为k2v2:
    XAUFE@yzy
    k1      v1
    0       hello world hadoop
    15      hdfs hive hello
    k2      v2
    hello   1
    world   1
    hdfs    1
    hello   1
    * */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        Text text = new Text();
        LongWritable longWritable = new LongWritable();
        //1:将一行的文本数据进行拆分  alt
        String[] split = value.toString().split(",");
        //2:遍历数组,组装k2v2    iter
        for (String word : split) {
            //3:将k2v2写入上下文 靠context
            text.set(word);
            longWritable.set(1);
            context.write(text,longWritable);
        }
    }
}

Step 3. Reducer

package cn.itcast.mapreduce;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountReducer extends Reducer<Text, LongWritable,Text,LongWritable> {
    //reduc方法作用:将k2v2转化为k3v3,写入上下文方法中
    /*
    * 参数:
    * key :新k2
    * value : 集合新 v2
    * context 表示上下文对象*/
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values,Context context) throws IOException, InterruptedException {
    //如何将k2v2转化为k3v3
        /*、
        * XAUFE@yzy
        新 k2    v2
          hello  《1,1,1》
          world  《1,1》
          hadoop 《1》

          k3    v3
          hello 3
          world 2
          hadoop 1*
         */
        long count = 0;
        //1遍历集合 将集合中数字相加 得到v3
        for (LongWritable value : values) {
            count += value.get();
        }
        //2将k3和v3写入上下文中
        context.write(key,new LongWritable(count));
    }
}

Step 4. 定义主类, 描述 Job 并提交 Job

package cn.itcast.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.net.URI;

public class JobMain extends Configured implements Tool {
    //指定一个job任务
    @Override
    public int run(String[] strings) throws Exception {
        //1创建一个job任务对象
        Job job = Job.getInstance(super.getConf(), "wordcount");

        /*
        * 如果打包运行出错,需要加配置
        * job.setJarByClass(JobMain.class);
        * XAUFE@yzy
        * */

        //2配置job任务对象(八个步骤)
        //第一步指定文件读取方式和路径
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("hdfs://node01:8020/wordcount"));
        /**
         *
         * 本地运行时输入路径
         */
        //TextInputFormat.addInputPath(job,new Path("file:///G:\\mapreduce\\wordcount_input"));
        //目标文件夹不需要存在
        job.setMapperClass(WordCountMapper.class);
        //设置map阶段k2的类型
        job.setMapOutputKeyClass(Text.class);
        //设置map阶段v2的类型
        job.setMapOutputValueClass(LongWritable.class);
        //跳过Shuffle阶段

        //第三四五六步用默认方式

        //第七步指定reduce阶段处理方式和数据类型
        job.setReducerClass(WordCountReducer.class);
        //设置k3v3的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        //第八步设置输出类型
        job.setOutputFormatClass(TextOutputFormat.class);
        //设置输出路径   不管本地运行还是集群运行,输出目录不能存在XAUFE@yzy
        Path path = new Path("hdfs://node01:8020/wordcount_out");
        TextOutputFormat.setOutputPath(job,path);
        //TextOutputFormat.setOutputPath(job,new Path("hdfs://node01:8020/wordcount_out"));
        /*
        * 如果输出目录已经存在
        * 获取FileSystem
        * 判读目录是否存在
        * XAUFE@yzy
        * */
        FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration());
        boolean bl2 = fileSystem.exists(path);
        if (bl2){
            //删除目标目录
            fileSystem.delete(path,true);
        }
        /**
         *
         * 本地运行时输出路径
         */
        //TextOutputFormat.setOutputPath(job,new Path("file:///G:\\mapreduce\\wordcount_output"));
        //等待任务结束
        boolean bl = job.waitForCompletion(true);
        return bl ? 0:1;
    }
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        //启动job任务
        int run = ToolRunner.run(configuration,new JobMain(),args);
        System.exit(run);
    }
}

4. MapReduce 运行模式

本地运行模式

1.MapReduce 程序是被提交给 LocalJobRunner 在本地以单进程的形式运行

2.处理的数据及输出结果可以在本地文件系统, 也可以在hdfs上

3.提前在window系统中创建输入文件

4.更改输入输出路径

TextInputFormat.addInputPath(job,new Path("hdfs://node01:8020/wordcount"));
TextOutputFormat.setOutputPath(job,new Path("hdfs://node01:8020/wordcount_out"));

更改为

TextInputFormat.addInputPath(job,new Path("file:///G:\\mapreduce\\wordcount_input"));
TextOutputFormat.setOutputPath(job,new Path("hdfs://node01:8020/wordcount_out"));

5.执行结束就可以看到输出文件wordcount_out找到part-r-00000文件为统计结果
在这里插入图片描述

集群运行模式

1.将 MapReduce 程序提交给 Yarn 集群, 分发到很多的节点上并发执行

2.处理的数据和输出结果应该位于 HDFS 文件系统

3.提交集群的实现步骤:

1)将程序打成JAR包,在IDEA右侧Maven中点击package
在这里插入图片描述

2)将打好的JAR包传到Hadoop上

cd /export/serves
mkdir jar_test
cd jar_text
rz -E

3)用Hadoop命令运行JAR包: JAR包名后跟主函数的路径

hadoop jar original-WordCount-1.0-SNAPSHOT.jar XAUFE/yzy/mapreduce/JobMain

在这里插入图片描述

4.执行结束后找到wordcount_out中part-r-00000文件为统计结果
在这里插入图片描述

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐