hadoop2.7.1 Intellj idea 远程提交job到linux集群
本文章主要说明windows环境下使用idea远程提交job到linux的hadoop集群。hadoop集群的部署网上有很多,我自己在虚拟机上搭建了伪集群用于个人学习使用。下面以wordcount例子为例:先附上目录结构:ConfigUtil为配置工具类,FileSystemUtil为文件工具类。ConfigUtil.java内容如下:FileSystemU
本文章主要说明windows环境下使用idea远程提交job到linux的hadoop集群。
hadoop集群的部署网上有很多,我自己在虚拟机上搭建了伪集群用于个人学习使用。
下面以wordcount例子为例:
先附上目录结构:
ConfigUtil为配置工具类,FileSystemUtil为文件工具类。
ConfigUtil.java内容如下:
static Configuration getConfigured(){
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://10.60.20.111:9000");
conf.set("mapreduce.framework.name", "yarn");
conf.set("yarn.resourcemanager.address", "http://10.60.20.111:8032");
conf.set("mapreduce.app-submission.cross-platform", "true");
//conf.set("mapreduce.job.jar", "E:\\workspace\\idea\\hadoop_wordcount_demo\\classes\\wordcount.jar");
return conf;
}
FileSystemUtil.java文件内容如下:
static void deleteDir(Configuration conf, String dirPath) throws IOException {
//FileSystem fs = FileSystem.get(conf);
Path targetPath = new Path(dirPath);
FileSystem fs = targetPath.getFileSystem(conf);
if (fs.exists(targetPath)) {
boolean delResult = fs.delete(targetPath, true);
if (delResult) {
System.out.println(targetPath + " has been deleted sucessfullly.");
} else {
System.out.println(targetPath + " deletion failed.");
}
}
}
static File createJar(Class<?> clazz) throws Exception {
String fqn = clazz.getName();
String base = fqn.substring(0, fqn.lastIndexOf("."));
base = base.replaceAll("\\.", Matcher.quoteReplacement("/"));
URL root = clazz.getResource("");
JarOutputStream out = null;
final File jar = File.createTempFile("HadoopRunningJar-", ".jar", new File(System.getProperty("java.io.tmpdir")));
System.out.println(jar.getAbsolutePath());
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
jar.delete();
}
});
try {
File path = new File(root.toURI());
Manifest manifest = new Manifest();
manifest.getMainAttributes().putValue("Manifest-Version", "1.0");
manifest.getMainAttributes().putValue("Created-By", "RemoteHadoopUtil");
manifest.getMainAttributes().putValue("Main-Class", fqn);
out = new JarOutputStream(new FileOutputStream(jar), manifest);
writeBaseFile(out, path, base);
} finally {
if (out != null) {
out.flush();
out.close();
}
}
return jar;
}
/**
* 递归添加.class文件
*/
private static void writeBaseFile(JarOutputStream out, File file, String base) throws IOException {
if (file.isDirectory()) {
File[] fl = file.listFiles();
if (base.length() > 0) {
base = base + "/";
}
for (File aFl : fl) {
writeBaseFile(out, aFl, base + aFl.getName());
}
} else {
out.putNextEntry(new JarEntry(base));
try (FileInputStream in = new FileInputStream(file)) {
byte[] buffer = new byte[1024];
int n = in.read(buffer);
while (n != -1) {
out.write(buffer, 0, n);
n = in.read(buffer);
}
}
}
}
WordCount.java文件内容如下:
private static class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
private static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
System.setProperty("hadoop.home.dir", "D:\\Program Files\\hadoop-2.7.1");
Configuration conf = ConfigUtil.getConfigured();
String hdfs = conf.get("fs.defaultFS");
args = new String[] { hdfs + "/tzwang/input/wordcount/", hdfs + "/tzwang/output/wordcount"};
//delete output dir
deleteDir(conf, args[args.length - 1]);
Job job = Job.getInstance(conf, "word count");
File jarFile = createJar(WordCount.class);
((JobConf) job.getConfiguration()).setJar(jarFile.toString());
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(IntSumReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
ConfigUtil.java中的属性可以配置在xml文件中,具体可以参照linux环境下的配置文件,再使用conf.addResource()导入。该类中的所有属性必须设置,其余的可以不加。
因为我使用的是yarn,所以该属性;不适用yarn的使用mapreduce.jobtracker.address属性。
这个属性可以允许windows提交应用到linux。
如果遇到权限禁止访问的问题,mapred-site.xml中添加
<property>
<name>dfs.permissions.enabled</name>
<value>false</value>
</property>
System.setProperty("hadoop.home.dir", “");这个用来设置本地hadoop的路径。
FileSystemUtil.java中的createJar方法用来自动生成jar包提交,若不适用该方法,可以手动指定jar的绝对路径,conf.set("mapreduce.job.jar",”“);
对于输入和输出路径可以在程序中指定,或者在运行配置中的Program arguments中配置。
更多推荐
所有评论(0)