云计算实验报告 |
MapReduce |
----------------------个人作业,如果有后辈的作业习题一致,可以参考学习,一起交流,请勿直接copy,搭建环境有问题的同学可以评论或者私信一起讨论
------------------同类型的实验参考了实验楼的实验,希望大家学习的同时一定要自己亲自手打代码,才有学习的价值,遇到各种错误可以一起学习寻找解决方法
一、实验题目
某公司各个部门的相关数据处理
二、实验环境
Hadoop搭建环境:
虚拟机操作系统: ubuntu-14.04.4-desktop-i386
JDK:jdk-8u101-linux-i586
Hadoop:hadoop-2.6.0
三、实验原理
·MapReduce简介
MapReduce 是现今一个非常流行的分布式计算框架,它被设计用于并行计算海量数据。第一个提出该技术框架的是Google 公司,而Google 的灵感则来自于函数式编程语言,如LISP,Scheme,ML 等。MapReduce 框架的核心步骤主要分两部分:Map 和Reduce。当你向MapReduce 框架提交一个计算作业时,它会首先把计算作业拆分成若干个Map 任务,然后分配到不同的节点上去执行,每一个Map 任务处理输入数据中的一部分,当Map 任务完成后,它会生成一些中间文件,这些中间文件将会作为Reduce 任务的输入数据。Reduce 任务的主要目标就是把前面若干个Map 的输出汇总到一起并输出。从高层抽象来看,MapReduce的数据流图如下图所示:
·MapReduce流程分析
··Map过程
- 每个输入分片会让一个map任务来处理,默认情况下,以HDFS的一个块的大小(默认为64M)为一个分片,当然我们也可以设置块的大小。map输出的结果会暂且放在一个环形内存缓冲区中(该缓冲区的大小默认为100M,由io.sort.mb属性控制),当该缓冲区快要溢出时(默认为缓冲区大小的80%,由io.sort.spill.percent属性控制),会在本地文件系统中创建一个溢出文件,将该缓冲区中的数据写入这个文件;
- 在写入磁盘之前,线程首先根据reduce任务的数目将数据划分为相同数目的分区,也就是一个reduce任务对应一个分区的数据。这样做是为了避免有些reduce任务分配到大量数据,而有些reduce任务却分到很少数据,甚至没有分到数据的尴尬局面。其实分区就是对数据进行hash的过程。然后对每个分区中的数据进行排序,如果此时设置了Combiner,将排序后的结果进行Combia操作,这样做的目的是让尽可能少的数据写入到磁盘;
- 当map任务输出最后一个记录时,可能会有很多的溢出文件,这时需要将这些文件合并。合并的过程中会不断地进行排序和combia操作,目的有两个:
- 尽量减少每次写入磁盘的数据量
- 尽量减少下一复制阶段网络传输的数据量。最后合并成了一个已分区且已排序的文件。为了减少网络传输的数据量,这里可以将数据压缩,只要将mapred.compress.map.out设置为true就可以了
- 将分区中的数据拷贝给相对应的reduce任务。有人可能会问:分区中的数据怎么知道它对应的reduce是哪个呢?其实map任务一直和其父TaskTracker保持联系,而TaskTracker又一直和JobTracker保持心跳。所以JobTracker中保存了整个集群中的宏观信息。只要reduce任务向JobTracker获取对应的map输出位置就可以了。
··Reduce过程
n Reduce会接收到不同map任务传来的数据,并且每个map传来的数据都是有序的。如果reduce端接受的数据量相当小,则直接存储在内存中(缓冲区大小由mapred.job.shuffle.input.buffer.percent属性控制,表示用作此用途的堆空间的百分比),如果数据量超过了该缓冲区大小的一定比例(由mapred.job.shuffle.merge.percent决定),则对数据合并后溢写到磁盘中;
n 随着溢写文件的增多,后台线程会将它们合并成一个更大的有序的文件,这样做是为了给后面的合并节省时间。其实不管在map端还是reduce端,MapReduce都是反复地执行排序,合并操作;
n 合并的过程中会产生许多的中间文件(写入磁盘了),但MapReduce会让写入磁盘的数据尽可能地少,并且最后一次合并的结果并没有写入磁盘,而是直接输入到reduce函数。
·MapReduce工作机制剖析
n 在集群中的任意一个节点提交MapReduce程序;
n JobClient收到作业后,JobClient向JobTracker请求获取一个Job ID;
n 将运行作业所需要的资源文件复制到HDFS上(包括MapReduce程序打包的JAR文件、配置文件和客户端计算所得的输入划分信息),这些文件都存放在JobTracker专门为该作业创建的文件夹中,文件夹名为该作业的Job ID;
n 获得作业ID后,提交作业;
n JobTracker接收到作业后,将其放在一个作业队列里,等待作业调度器对其进行调度,当作业调度器根据自己的调度算法调度到该作业时,会根据输入划分信息为每个划分创建一个map任务,并将map任务分配给TaskTracker执行;
n 对于map和reduce任务,TaskTracker根据主机核的数量和内存的大小有固定数量的map槽和reduce槽。这里需要强调的是:map任务不是随随便便地分配给某个TaskTracker的,这里有个概念叫:数据本地化(Data-Local)。意思是:将map任务分配给含有该map处理的数据块的TaskTracker上,同时将程序JAR包复制到该TaskTracker上来运行,这叫“运算移动,数据不移动”;
n TaskTracker每隔一段时间会给JobTracker发送一个心跳,告诉JobTracker它依然在运行,同时心跳中还携带着很多的信息,比如当前map任务完成的进度等信息。当JobTracker收到作业的最后一个任务完成信息时,便把该作业设置成“成功”。当JobClient查询状态时,它将得知任务已完成,便显示一条消息给用户;
n 运行的TaskTracker从HDFS中获取运行所需要的资源,这些资源包括MapReduce程序打包的JAR文件、配置文件和客户端计算所得的输入划分等信息;
n TaskTracker获取资源后启动新的JVM虚拟机;
n 运行每一个任务。
四、实验内容
测试数据
Dept:
部门的相关信息,“部门编号,部门名称,部门所在城市”
10,ACCOUNTING,TOKYO
20,RESEARCH,GUANGZHOU
30,SALES,PEKING
40,OPERATIONS,BOSTON
50,DESIGN,SHANGHAI
Emp:
职员相关信息,“员工号,姓氏,名字,编号,入职时间(A-B月-C:20C年B月A日),工资,奖金,所属部门编号”
0001,SMITH,CLERK,0001,17-12月-02,800,,20
0002,ALLEN,SALESMAN,0002,20-2月-01,1600,300,30
0003,WARD,SALESMAN,0003,22-2月-03,1250,500,30
0004,JONES,MANAGER,0004,02-4月-01,2975,,20
0005,MARTIN,SALESMAN,0005,28-9月-03,1250,1400,30
0006,BLAKE,MANAGER,0006,01-5月-02,2850,,30
0007,CLARK,MANAGER,0007,09-6月-00,2450,,10
0008,KING,PRESIDENT,0008,17-11月-06,6500,,10
0009,TURNER,SALESMAN,0009,08-9月-12,1500,0,30
0010,JAMES,CLERK,0010,03-12月-04,950,,30
0011,FORD,ANALYST,0011,03-12月-05,3000,,20
0012,MILLER,CLERK,0012,23-1月-06,1300,,50
上传文件:bin/hdfs dfs -mkdir -p /project/input/
bin/hdfs dfs -put ~/emp /project/input/
bin/hdfs dfs -put ~/dept /project/input/
·求各个部门的总工资(SumDeptSalary.java)
本次实验采用了Map join中的Map side join。Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法如下:
- 用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI(如果是HDFS上的文件,可以这样:hdfs://jobtracker:50030/home/XXX/file)。JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。
- 用户使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。
在下面代码中,将会把数据量小的表(部门dept)缓存在内存中,在Mapper阶段对员工部门编号映射成部门名称,该名称作为key输出到Reduce中,在Reduce中计算按照部门计算各个部门的总工资。
具体详细代码在打包的实验代码文件SumDeptSalary.java中:
编译代码并打成jar包:
- Javac -cp../share/hadoop/common/hadoop-common- 2.6.0.jar: ../share/hadoop/mapreduce/hadoop-mapreduce-client-core- 2.6.0.jar:../share/hadoop/common/lib/commons-cli-1.2.jar SumDeptSalary.java -d ./
- jar -cvf SumDeptSalary.jar org/apache/hadoop/examples/ SumDeptSalary *.class
运行:
bin/hadoop jar SumDeptSalary.jar SumDeptSalary /project/input/* outputSum
输出运行结果:
bin/hdfs dfs -cat outputSum/*
ACCOUNTING 8950
RESEARCH 6775
SALES 9400
DESIGN 1300 ----------运行结果
·求各个部门的人数和平均工资(DeptNumberAveSalary.java)
关于第二个问题:求各个部门的人数和平均工资,需要得到各部门工资总数和部门人数,通过两者相除获取各部门平均工资。首先和第一个问题类似,在Mapper的Setup阶段缓存部门数据,然后在Mapper阶段抽取出部门编号和员工工资,利用缓存部门数据把部门编号对应为部门名称,接着在Shuffle阶段把传过来的数据处理为部门名称对应该部门所有员工工资的列表,最后在Reduce中按照部门归组,遍历部门所有员工,求出总数和员工数,输出部门名称和平均工资。
具体详细代码在打包的实验代码文件DeptNumberAveSalary.java中:
编译代码并打成jar包:
- Javac -cp../share/hadoop/common/hadoop-common- 2.6.0.jar: ../share/hadoop/mapreduce/hadoop-mapreduce-client-core- 2.6.0.jar:../share/hadoop/common/lib/commons-cli-1.2.jar DeptNumberAveSalary.java -d ./
- jar -cvf DeptNumberAveSalary.jar org/apache/hadoop/examples/ DeptNumberAveSalary *.class
运行:
bin/hadoop jar DeptNumberAveSalary.jar DeptNumberAveSalary/project/input/* outputAve
输出运行结果:
bin/hdfs dfs -cat outputAve/*
ACCOUNTING Dept Number:2,Ave Salary:4475
RESEARCH Dept Number:3,Ave Salary:2258
SALES Dept Number:6,Ave Salary:1566
DESIGN Dept Number:1,Ave Salary:1300 ----------运行结果
·将全体员工按照总收入从高到低排序(SalarySort.java)
第三个问题:求全体员工总收入降序排列,只需获得所有员工总收入并降序排列即可。在Mapper阶段输出所有员工总工资数据,其中key为员工总工资、value为员工姓名,在Mapper阶段的最后会先调用job.setPartitionerClass对数据进行分区,每个分区映射到一个reducer,每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。由于在这次的测试中Map的key只有0值,所以可以实现对所有数据进行排序。
具体详细代码在打包的实验代码文件SalarySort.java中:
编译代码并打成jar包:
- Javac -cp../share/hadoop/common/hadoop-common- 2.6.0.jar: ../share/hadoop/mapreduce/hadoop-mapreduce-client-core- 2.6.0.jar:../share/hadoop/common/lib/commons-cli-1.2.jar SalarySort.java -d ./
- jar -cvf SalarySort.jar org/apache/hadoop/examples/ SalarySort *.class
运行:
bin/hadoop jar SalarySort.jar SalarySort /project/input/* outputSort
输出运行结果:
bin/hdfs dfs -cat outputSort/*
6500 KING
3000 FORD
2975 JONES
2850 BLAKE
2650 MARTIN
2450 CLARK
1900 ALLEN
1750 WARD
1500 TURNER
1300 MILLER
950 JAMES
800 SMITH
----------运行结果
五、实验总结
关于本次的云计算实验project,经过了之前的partA、partB和partC三个部分的练习,对hadoop的集群和MapReduce和数据的分布式处理都有了一些基础的了解,对搭建集群和相关的linux命令也都得到了一定程度的锻炼。
在实验的partA部分,是通过品高提供的虚拟机平台来实现的,但是在之后的partB部分,却在input文件的部分屡次失败,向助教的师兄师姐和班里的同学求助,但是尝试了重新格式化、关闭防火墙、重启服务等等诸多办法,都找不到问题所在,销毁重做也解决不了问题。最终决定利用VMware Workstation 12 来搭建两台Ubuntu虚拟机来实现hadoop集群的搭建。
使用虚拟机之后,问题得到了顺利的解决,成功input了需要上传的文件,完成了余下的partB和partC两个部分。
完成后两个部分之后,对MapReduce有了更近一步的了解,相关的操作也熟能生巧了,通过在网上搜索和学习各种相关算法和代码,最终选择了实验中的三个部分来实现。在实现的过程中,参考了部分的代码,然后自己参照所给的算法重新编写了实验中的三个Java代码。在测试的过程中,使用了多组不同的数据,输出output文件的结果都是正确的,故而判断代码的实现是成功的。
Hadoop软件库作为一个框架,允许在集群服务器上使用简单的编程模型对大数据集进行分布式处理。而且高可用性并不依赖硬件,其代码库自身就能在应用层侦测并处理硬件故障,能基于服务器集群提供高可用性的服务。在很多方面都有广泛的应用,通过本次的实验和project,对集群和分布式,有了更加深刻的了解,通过之后不断的学习,相信能够给自己带来更大的收获。
------------------------------------------------------------
因为直接复制自实验报告,排版有一些问题,但是不影响阅读和理解,有问题可以询问我
所有评论(0)