本文章主要说明windows环境下使用idea远程提交job到linux的hadoop集群。

hadoop集群的部署网上有很多,我自己在虚拟机上搭建了伪集群用于个人学习使用。

下面以wordcount例子为例:

先附上目录结构:


ConfigUtil为配置工具类,FileSystemUtil为文件工具类。

ConfigUtil.java内容如下:

 static Configuration getConfigured(){
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://10.60.20.111:9000");
        conf.set("mapreduce.framework.name", "yarn");
        conf.set("yarn.resourcemanager.address", "http://10.60.20.111:8032");
        conf.set("mapreduce.app-submission.cross-platform", "true");
        //conf.set("mapreduce.job.jar", "E:\\workspace\\idea\\hadoop_wordcount_demo\\classes\\wordcount.jar");
        return conf;
    }

FileSystemUtil.java文件内容如下:

static void deleteDir(Configuration conf, String dirPath) throws IOException {
        //FileSystem fs = FileSystem.get(conf);
        Path targetPath = new Path(dirPath);
        FileSystem fs = targetPath.getFileSystem(conf);
        if (fs.exists(targetPath)) {
            boolean delResult = fs.delete(targetPath, true);
            if (delResult) {
                System.out.println(targetPath + " has been deleted sucessfullly.");
            } else {
                System.out.println(targetPath + " deletion failed.");
            }
        }
    }

    static File createJar(Class<?> clazz) throws Exception {
        String fqn = clazz.getName();
        String base = fqn.substring(0, fqn.lastIndexOf("."));
        base = base.replaceAll("\\.", Matcher.quoteReplacement("/"));
        URL root = clazz.getResource("");

        JarOutputStream out = null;
        final File jar = File.createTempFile("HadoopRunningJar-", ".jar", new File(System.getProperty("java.io.tmpdir")));
        System.out.println(jar.getAbsolutePath());
        Runtime.getRuntime().addShutdownHook(new Thread() {
            public void run() {
                jar.delete();
            }
        });
        try {
            File path = new File(root.toURI());
            Manifest manifest = new Manifest();
            manifest.getMainAttributes().putValue("Manifest-Version", "1.0");
            manifest.getMainAttributes().putValue("Created-By", "RemoteHadoopUtil");
            manifest.getMainAttributes().putValue("Main-Class", fqn);
            out = new JarOutputStream(new FileOutputStream(jar), manifest);
            writeBaseFile(out, path, base);
        } finally {
            if (out != null) {
                out.flush();
                out.close();
            }
        }
        return jar;
    }

    /**
     * 递归添加.class文件
     */
    private static void writeBaseFile(JarOutputStream out, File file, String base) throws IOException {
        if (file.isDirectory()) {
            File[] fl = file.listFiles();
            if (base.length() > 0) {
                base = base + "/";
            }
            for (File aFl : fl) {
                writeBaseFile(out, aFl, base + aFl.getName());
            }
        } else {
            out.putNextEntry(new JarEntry(base));
            try (FileInputStream in = new FileInputStream(file)) {
                byte[] buffer = new byte[1024];
                int n = in.read(buffer);
                while (n != -1) {
                    out.write(buffer, 0, n);
                    n = in.read(buffer);
                }
            }
        }
    }

WordCount.java文件内容如下:

private static class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    private static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        System.setProperty("hadoop.home.dir", "D:\\Program Files\\hadoop-2.7.1");
        Configuration conf = ConfigUtil.getConfigured();
        String hdfs = conf.get("fs.defaultFS");
        args = new String[] { hdfs + "/tzwang/input/wordcount/", hdfs + "/tzwang/output/wordcount"};
        //delete output dir
        deleteDir(conf, args[args.length - 1]);
        Job job = Job.getInstance(conf, "word count");
        File jarFile = createJar(WordCount.class);
        ((JobConf) job.getConfiguration()).setJar(jarFile.toString());
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setReducerClass(IntSumReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

ConfigUtil.java中的属性可以配置在xml文件中,具体可以参照linux环境下的配置文件,再使用conf.addResource()导入。该类中的所有属性必须设置,其余的可以不加。

因为我使用的是yarn,所以该属性;不适用yarn的使用mapreduce.jobtracker.address属性。

这个属性可以允许windows提交应用到linux。

如果遇到权限禁止访问的问题,mapred-site.xml中添加

<property>
<name>dfs.permissions.enabled</name>
<value>false</value>
</property>

System.setProperty("hadoop.home.dir", “");这个用来设置本地hadoop的路径。

FileSystemUtil.java中的createJar方法用来自动生成jar包提交,若不适用该方法,可以手动指定jar的绝对路径,conf.set("mapreduce.job.jar",”“);

对于输入和输出路径可以在程序中指定,或者在运行配置中的Program arguments中配置。

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐