hadoop中MapReduce简单的代码实现
用MapReduce简单实现——单词计数:这里我使用的是在我的hadoop集群中存取文件,在本地的hadoop中进行mapreduce操作,就没有放在虚拟机的集群上运行了(自己可以打成jar包,放到虚拟机上去运行)首先准备好读取的文档:(差不多就是这种数据)实现代码:import java.io.IOException;import java.util.StringTokenizer;import
·
用MapReduce简单实现——单词计数:
注意:这里我使用的是在我的hadoop集群中存取文件,在本地的hadoop中进行mapreduce操作,就没有放在虚拟机的集群上运行了(自己可以打成jar包,放到虚拟机上去运行)
首先准备好读取的文档:(差不多就是这种数据)
实现代码:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
* 单词计数 a.txt
*@author Ryan_Tang
*/
public class App
{
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
if(args==null || args.length!=2){
System.err.println("<Usage>:com.yc.hadoop.project.wcyc74.App <input> <output>");
System.exit(1);
}
Configuration conf = new Configuration();
//注意是处于active状态的 HDFS
conf.set("fs.defaultFS", "hdfs://node1:8020");
//注意是处理active状态的resourcemanager节点名
conf.set("yarn.resourcemanager.hostname", "node3");
System.err.println(conf);
System.err.println(conf.get("fs.defaultFS"));
//手动输入在hdfs上的读取地址 和存储地址
Path input=new Path( args[0] );
Path output=new Path(args[1]);
FileSystem fs=FileSystem.get(conf);
if(fs.exists(output)){
fs.delete(output,true);
}
Job job = Job.getInstance(conf, "WordCount");
job.setJarByClass(App.class);
//通过job设置输入输出格式
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
//设置输入、输出的路径
FileInputFormat.addInputPath(job, input );
FileOutputFormat.setOutputPath(job, output);
//设置处理Map/Reduce的类
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(IntSumReducer.class);
//设置最终输出的键和值的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//提交作业
job.waitForCompletion(true);
}
}
// Object, Text表示 Mapper的输入类型
//Text, IntWritable 输出类型
class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
// Integer new Integer("1");
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
// 回调函数 context是上下文环境
// Object key, Text value 输入数据类型
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
//System.out.println("mapper输入的键:" + key);
StringTokenizer itr = new StringTokenizer(value.toString()); // tokenizer : 空格,符号
while (itr.hasMoreTokens()) {
word.set(itr.nextToken()); // 读取一个单词
context.write(word, one); // For 1 For 1 -> For [1,1]
}
}
}
//规约类 Text, IntWritable输入类型 Text, IntWritable:输出结果类型
class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
// Text key, Iterable<IntWritable> values, 输入类型
//注意: 为什么这里是 iterable? shuffle
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//System.out.println( "reduce端: key"+ key.toString() );
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum); /// 存这个key 对应的和
context.write(key, result);
}
}
运行该程序 配置参数:
运行成功可以查看hdfs上的文件是有两个的:
来看一下结果:
注意,如果是windows用户会报错:
解决方法为:https://blog.csdn.net/qq_45016628/article/details/107090922
更多推荐
已为社区贡献1条内容
所有评论(0)