用三台虚拟机搭建Hadoop分布式集群——MapReduce编程规范、运行模式、分区、计数器
Hadoop-MapReduce1.MapReduce介绍MapReduce的思想核心是“分而治之”,适用于大量复杂的任务处理场景。map负责分,即把复杂的任务分解成若干个“简单的任务”进行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系Reduce负责合,对map阶段的结果进行全局汇总MapReduce运行在yarn集群ResourceManagerNodeManager
Hadoop-MapReduce
1.MapReduce介绍
MapReduce的思想核心是“分而治之”,适用于大量复杂的任务处理场景。
- map负责分,即把复杂的任务分解成若干个“简单的任务”进行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系
- Reduce负责合,对map阶段的结果进行全局汇总
- MapReduce运行在yarn集群
- ResourceManager
- NodeManager
一个完整的MapReduce程序在分布式运行时有三类实例进程
- MRAppMaster 负责整个程序的过程调度和状态协调
- MapTask 负责Map阶段的整个数据处理流程
- ReduceTask 负责reduce阶段的整个数据处理流程
2.MapReduce编程规范
MapReduce的开发一共八个步骤,其中map阶段分为两个步骤,Shuffle阶段分为4个步骤,Reduce阶段分为两个步骤
Map阶段2个步骤
- 设置InputFormat类,将数据切分为key-value(K1-V1)对,输入到第二步
- 自定义Map逻辑,将第一步的结果转化为另外的key-value(K2-V2)对,输出结果
Shuffle阶段4个步骤
- 对输出的key-value对进行分区
- 对不同分区的数据按照相同的key排序
- (可选)对分组过的数据进行初步规约,降低数据的网络拷贝
- 对数据进行分组,相同key的value放到一个集合中
Reduce阶段2个步骤
- 对多个Map任务的结果进行排序以及合并,编写Reduce函数实现自己的逻辑,对输入的key-value进行处理,输出新的key-value(K3-V3)输出
- 设置OutputFormat处理并保存Reduce输出的key-value数据
3.WordCount
需求:在一堆给定的文本文件中统计输出每一个单词的个数
3.1 数据准备
-
创建一个新的文件
[root@node01 ~]# cd /export/servers/ [root@node01 servers]# vim wordcount.txt
-
输入以下内容
hello,world,hadoop hive,sqoop,flume,hello kitty,tom,jerry.world hadoop
-
上传到hdfs
[root@node01 servers]# hdfs dfs -mkdir /wordcount [root@node01 servers]# hdfs dfs -put wordcount.txt /wordcount
3.2 Mapper
package com.wangbin.mapreduce;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/*
泛型中的四个参数分别代表K1,V1,K2,V2
*/
public class WordCountMapper extends Mapper<LongWritable, Text,Text,LongWritable> {
/*
key : K1 行偏移量
value : V1 每一行的文本数据
context : 上下文对象
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split(",");
Text text = new Text();
LongWritable one = new LongWritable(1);
for(String word:split){
text.set(word);
context.write(text,one);
}
}
}
3.3 Reducer
package com.wangbin.mapreduce;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
/*
key : 新K2
values : 新V2(集合)
*/
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long count = 0;
LongWritable longWritable = new LongWritable();
for(LongWritable value:values){
count+=value.get();
}
longWritable.set(count);
context.write(key,longWritable);
}
}
3.4 定义主类,描述job并提交job
package com.wangbin.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class JobMain extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
//获取Job对象
Job job = Job.getInstance(super.getConf(), "wordcount");
//配置Job对象(八个步骤)
//1.指定文件的读取方式和路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("hdfs://node01:8020/wordcount"));
//2.指定map阶段的处理方式和数据类型
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//第三四五六采取默认方式
//7.指定Reduce阶段的处理方式和数据类型
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//8.设置输出类型
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("hdfs://node01:8020/wordcount_out"));
//等待任务结束
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
int run = ToolRunner.run(configuration, new JobMain(), args);
System.exit(run);
}
}
4.MapReduce运行模式
集群运行模式
- 将MapReduce程序提交给Yarn集群,分发到很多节点上并发执行
- 处理的数据和输出结果应该位于HDFS分布式文件系统
- 提交集群的实现步骤:将程序打成Jar包,并上传,然后在集群上用hadoop命令启动
首先在pom.xml中添加
<packaging>jar</packaging>
在IDEA左侧选择Maven-LifeCycle-package,等待build success
打包后在target下生成两个jar包,体积大的将所依赖的jar包均打包进去了,而体积小的没有,两个均可
上传到集群并运行
[root@node01 jar_test]# hadoop jar original-hdfs_api_demo-1.0-SNAPSHOT.jar com.wangbin.mapreduce.JobMain
如果打包运行出错,加
job.setJarByClass(JobMain.class);
上述代码中,如果输出目录存在则报错。如果想判断输出目录是否存在,存在则删除
Path path = new Path("hdfs://node01:8020/wordcount_out");
TextOutputFormat.setOutputPath(job, path);
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration());
boolean exists = fileSystem.exists(path);
if(exists){
fileSystem.delete(path,true);
}
本地运行模式
- MapReduce程序是在本地以单进程的形式进行
- 处理的数据和输出结果在本地文件系统
TextInputFormat.addInputPath(job,new Path("file:///D:\\mapreduce\\input"));
TextOutputFormat.setOutputPath(job,new Path("file:///D:\\mapreduce\\output"));
5.MapReduce分区
在MapReduce中,通过我们制定分区,会将同一个分区的数据发送给同一个Reduce当中进行处理
例如:为了方便统计,可以把一批类似的数据发送到同一个Reduce当中,在同一个Reduce当中统计相同类型的数据
相同类型的数据,就是有共性的数据,送到一起去处理
Reduce默认分区只有一个
第一步,自定义Mapper
package com.wangbin.partition;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/*
K1:行偏移量
V2:行文本数据
K2:行文本数据
V2:NullWritable
*/
public class PartitionMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(value,NullWritable.get());
}
}
第二步,自定义Partitioner
package com.wangbin.partition;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class MyPartitioner extends Partitioner<Text, NullWritable> {
/*
1.定义分区规则
2.返回对应的分区编号
*/
@Override
public int getPartition(Text text, NullWritable nullWritable, int i) {
String numstr = text.toString().split("\t")[5];
if(Integer.parseInt(numstr)>15){
return 1;
}else {
return 0;
}
}
}
第三步,自定义Reducer
package com.wangbin.partition;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class PartitionReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key,NullWritable.get());
}
}
第四步,
package com.wangbin.partition;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class JobMain extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
Job job = Job.getInstance(super.getConf(), "partitionDemo");
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("hdfs://node01:8020/input"));
job.setMapperClass(PartitionMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setPartitionerClass(MyPartitioner.class);
job.setReducerClass(PartitionReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setNumReduceTasks(2);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("hdfs://node01:8020/out"));
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
int run = ToolRunner.run(configuration, new JobMain(), args);
System.exit(run);
}
}
6.MapReduce计数器
计数器是收集作业统计信息的有效手段之一,用于质量控制或应用级统计。计数器还可以辅助诊断系统故障。
hadoop内置计数器列表
MapReduce计数器 | org.apache.hadoop.mapreduce.TaskCounter |
---|---|
文件系统计数器 | org.apache.hadoop.mapreduce.FileSystemCounter |
FileInputFormat计数器 | org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter |
FileOutputFormat计数器 | org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter |
作业计数器 | org.apache.hadoop.mapreduce.JobCounter |
第一种方式
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//参数1:计数器类型名(自定义) 参数2:计数器变量名
Counter counter = context.getCounter("MR_COUNTER", "partition_counter");
counter.increment(1L);
context.write(value,NullWritable.get());
}
第二种方式
通过enum枚举定义计数器
public static enum Counter{
MY_INPUT_RECORDS,MY_INPUT_BYTES
}
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.getCounter(Counter.MY_INPUT_RECORDS).increment(1L);
context.write(key,NullWritable.get());
}
更多推荐
所有评论(0)