用MapReduce简单实现——单词计数:
注意:这里我使用的是在我的hadoop集群中存取文件,在本地的hadoop中进行mapreduce操作,就没有放在虚拟机的集群上运行了(自己可以打成jar包,放到虚拟机上去运行)
首先准备好读取的文档:(差不多就是这种数据)

实现代码:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * 单词计数  a.txt
 *@author Ryan_Tang
 */
public class App 
{
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		
		
		if(args==null || args.length!=2){
			System.err.println("<Usage>:com.yc.hadoop.project.wcyc74.App <input> <output>");
			System.exit(1);
		}
		
		Configuration conf = new Configuration();
		 //注意是处于active状态的 HDFS
		conf.set("fs.defaultFS", "hdfs://node1:8020");
		//注意是处理active状态的resourcemanager节点名
		conf.set("yarn.resourcemanager.hostname", "node3");
		
		System.err.println(conf);
		System.err.println(conf.get("fs.defaultFS"));
		
		//手动输入在hdfs上的读取地址 和存储地址
		Path input=new Path( args[0] );
	    Path output=new Path(args[1]);
	    
	    FileSystem fs=FileSystem.get(conf);
	    if(fs.exists(output)){
	    	fs.delete(output,true);
	    }
	    
	    Job job = Job.getInstance(conf, "WordCount");
	    job.setJarByClass(App.class);
	    
	    //通过job设置输入输出格式
	    job.setInputFormatClass(TextInputFormat.class);
	    job.setOutputFormatClass(TextOutputFormat.class);
	    
	    //设置输入、输出的路径 
	    FileInputFormat.addInputPath(job, input );
	    FileOutputFormat.setOutputPath(job, output);
	    
	    //设置处理Map/Reduce的类
	    job.setMapperClass(TokenizerMapper.class);
	    job.setReducerClass(IntSumReducer.class);
	    
	    //设置最终输出的键和值的类型
	    job.setOutputKeyClass(Text.class);
	    job.setOutputValueClass(IntWritable.class);
	    
	    //提交作业
	    job.waitForCompletion(true);
	}
	
	
}
 // Object, Text表示 Mapper的输入类型
 //Text, IntWritable 输出类型
class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
	// Integer new Integer("1");
	private final static IntWritable one = new IntWritable(1);
	private Text word = new Text();
	// 回调函数 context是上下文环境
	// Object key, Text value 输入数据类型
	public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
		//System.out.println("mapper输入的键:" + key);
		StringTokenizer itr = new StringTokenizer(value.toString());  // tokenizer : 空格,符号
		while (itr.hasMoreTokens()) {
			word.set(itr.nextToken()); // 读取一个单词
			context.write(word, one); //      For 1        For 1           ->    For [1,1]
		}
	}
}

//规约类         Text, IntWritable输入类型           Text, IntWritable:输出结果类型
class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
	private IntWritable result = new IntWritable();
	// Text key, Iterable<IntWritable> values,  输入类型
	//注意: 为什么这里是   iterable?   shuffle
	public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
		//System.out.println(   "reduce端: key"+ key.toString() );
		int sum = 0;
		for (IntWritable val : values) {
			sum += val.get();
		}
		result.set(sum);   /// 存这个key   对应的和
		context.write(key, result);
	}
}

运行该程序 配置参数:

在这里插入图片描述

运行成功可以查看hdfs上的文件是有两个的:
在这里插入图片描述
来看一下结果:
在这里插入图片描述

注意,如果是windows用户会报错:
在这里插入图片描述

解决方法为:https://blog.csdn.net/qq_45016628/article/details/107090922

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐