云计算技术 实验七 MapReduce编程基础
master通知分配了Reduce作业的worker它负责的分区在什么位置(肯定不止一个地方,每个Map作业产生的中间键值对都可能映射到所有R个不同分区),当Reduce worker把所有它负责的中间键值对都读过来后,先对它们进行排序,使得相同键的键值对聚集在一起。reduce worker遍历排序后的中间键值对,对于每个唯一的键,都将键与关联的值传递给reduce函数,reduce函数产生的输
参考资料为:
教材代码-林子雨编著《大数据基础编程、实验和案例教程(第2版)》教材所有章节代码_厦大数据库实验室博客
1.实验学时
4学时
2.实验目的
- 熟悉MapReduce编程框架。
- 了解Map部分和Reduce部分的工作原理。
- 实现简单的MapReduce编程。
3.实验内容
(一)实现词频统计的基本的MapReduce编程。
首先创建两个txt文件。
让后向里面输入想要统计的句子。
然后启动ecplise完成程序编写:
首先编写map处理逻辑:(这里选择在windows上先编写,然后在linux上再复现一次)
下面为java代码:此为map处理逻辑
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private static final IntWritable one = new IntWritable(1);
private Text word = new Text();
public TokenizerMapper() {
}
public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while(itr.hasMoreTokens()) {
this.word.set(itr.nextToken());
context.write(this.word, one);
}
}
}
在map阶段,文件wordfile1.txt和文件wordfile2.txt中的数据被读入,然后以键值对的形式被提交给map函数处理。键值对交给map函数之后,就可以运行自定义的map处理逻辑。
之后编写reduce处理逻辑。
Map阶段处理得到的中间结果,经过shuffle阶段,会分发给对应的reduce任务处理。
下面为java代码,此为reduce任务处理
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public IntSumReducer() {
}
public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
IntWritable val;
for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
val = (IntWritable)i$.next();
}
this.result.set(sum);
context.write(key, this.result);
}
}
之后编写main函数
为了使TokenizerMapper类和IntSumReduce类能够正常协同工作,需要在主函数中通过job类设置hadoop程序的运行环境。
下面为java代码,此为main函数
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
if(otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count"); //设置环境参数
job.setJarByClass(WordCount.class); //设置整个程序的类名
job.setMapperClass(WordCount.TokenizerMapper.class); //添加Mapper类
job.setReducerClass(WordCount.IntSumReducer.class); //添加Reducer类
job.setOutputKeyClass(Text.class); //设置输出类型
job.setOutputValueClass(IntWritable.class); //设置输出类型
for(int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i])); //设置输入文件
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));//设置输出文件
System.exit(job.waitForCompletion(true)?0:1);
}
然后是打包程序。
先打开hadoop对应文件夹:
将代码传入文件夹之后,使用之前下载的java jar进行编译
编译完成之后查看文件夹,多出三个.class文件,然后进行文件的打包。
然后启动hadoop
然后输入命令查看结果:
结果:
(二)配置eclipse环境,跑词频统计的程序。
先启动eclipse
创建新的java工程进行编写程序
然后导入jar包
然后开始编写java程序
先创建新的java类开始编写
然后将之前编写号的代码输入到java文件之中,然后进行运行查看结果。
完整程序:
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public WordCount() {
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
if(otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(WordCount.TokenizerMapper.class);
job.setCombinerClass(WordCount.IntSumReducer.class);
job.setReducerClass(WordCount.IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for(int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true)?0:1);
}
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private static final IntWritable one = new IntWritable(1);
private Text word = new Text();
public TokenizerMapper() {
}
public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while(itr.hasMoreTokens()) {
this.word.set(itr.nextToken());
context.write(this.word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public IntSumReducer() {
}
public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
IntWritable val;
for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
val = (IntWritable)i$.next();
}
this.result.set(sum);
context.write(key, this.result);
}
}
}
查看结果:
结果已经呈现出来,然后把java程序打包生成jar包。放到hadoop平台之上运行。
查看对应文件夹:
然后为了运行,先启动hadoop
然后删除input和output防止出错
然后新建文件
由于先前已经将对应文件传入linux,然后可以考虑现将文件上传到hdfs中的/user/Hadoop/input中
然后使用jar命令查看;
然后查看结果:
(三)编写MapReduce程序,实现计算平均成绩的程序。
首先编写程序。
程序的主要点是输入三个txt文件,然后计算三个txt文件对应的科目之和的平均值。这里的导入方法与前面相似。
然后先创建新的项目,导入jar包
然后将对应的代码输入java文件中。
注意导入的包的个数问题。
然后导入对应的包:
然后导出jar文件。
编写对应的txt文件,设置
然后输入./bin/hdfs dfs -cat output*/
注意,这里的名字不能使用单个字符,不然会报错!!!,所以后面改成了多个字符。
4.思考题
(一)MapReduce的工作原理是什么?
MapRedece分为两部分,一个是Map函数,一个是Reduce函数。Map函数接受一个键值对(key-value pair),产生一组中间键值对。MapReduce框架会将map函数产生的中间键值对里键相同的值传递给一个reduce函数。 Reduce函数接受一个键,以及相关的一组值,将这组值进行合并产生一组规模更小的值(通常只有一个或零个值)。
下面是一个图介绍MapReduce的工作流程:
MapReduce库先把user program的输入文件划分为M份(M为用户定义),每一份通常有16MB到64MB,如图左方所示分成了split0~4;然后使用fork将用户进程拷贝到集群内其它机器上。
user program的副本中有一个称为master,其余称为worker,master是负责调度的,为空闲worker分配作业(Map作业或者Reduce作业),worker的数量也是可以由用户指定的。
被分配了Map作业的worker,开始读取对应分片的输入数据,Map作业数量是由M决定的,和split一一对应;Map作业从输入数据中抽取出键值对,每一个键值对都作为参数传递给map函数,map函数产生的中间键值对被缓存在内存中。
缓存的中间键值对会被定期写入本地磁盘,而且被分为R个区,R的大小是由用户定义的,将来每个区会对应一个Reduce作业;这些中间键值对的位置会被通报给master,master负责将信息转发给Reduce worker。
master通知分配了Reduce作业的worker它负责的分区在什么位置(肯定不止一个地方,每个Map作业产生的中间键值对都可能映射到所有R个不同分区),当Reduce worker把所有它负责的中间键值对都读过来后,先对它们进行排序,使得相同键的键值对聚集在一起。因为不同的键可能会映射到同一个分区也就是同一个Reduce作业(谁让分区少呢),所以排序是必须的。
reduce worker遍历排序后的中间键值对,对于每个唯一的键,都将键与关联的值传递给reduce函数,reduce函数产生的输出会添加到这个分区的输出文件中。
当所有的Map和Reduce作业都完成了,master唤醒正版的user program,MapReduce函数调用返回user program的代码。
(二)Hadoop是如何运行MapReduce程序的?
有两个方法,这两个方法的前提是需要启动hadoop才可以运行。
方法一:
将自己的编译软件与hadoop相连(我用的是MyEclipse去链接hadoop),直接运行程序。运行完成之后在输出文件夹就可以查看输出的文件。
方法二:
方法二的话更加复杂,需要将mapreduce程序打包成jar文件。需要在linux上的eclipse编写好程序之后,将程序导出打包,之后执行这个jar文件,在输出文件中查看结果即可。
5.实验结论或体会
1.实验开始编写程序之前,需要将hadoop启动方才可以继续编写程序。
2.程序导出的时候,需要将jar文件导出到相应的hadoop程序的文件夹下,这样方便程序的运行。
3.编写程序的时候,需要将导入的包一一对应,确保所有的包都导入到程序之中。
4.TXT文件需要提前写好,方便运行程序。
更多推荐
所有评论(0)