由于 Flink 没有提供 hadoop 相关依赖,需要 pom 中添加相关依赖:

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>3.1.3</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-hadoop-compatibility_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>

笔者用的hadoop3.1.3 ,读者可自行配置。

代码如下:

        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //读取hdfs文件路径
        DataStreamSource<String> hdfsSource = env.readTextFile("hdfs://hadoop102:8020/input/README.txt");
        //将hdfs文件路径打印输出
        hdfsSource.print();
        //执行
        env.execute("HDFSSourceTest");

说明:hdfs://hadoop102:8020 为 core-site.xml 中 指定的namanode地址,后面为文件目录,文件名。

源代码如下

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐