前言

首先有一个问题,Flink程序可以在idea中正常运行,但是将Flink打成jar包,使用java -jar的方式运行jar包就会报错。这个问题一直没有解决。

正好最近遇到这样一个场景,需要将flink程序集成到Springboot程序中(不要问为啥要在本地执行flink程序,不上集群),集成完以后没想到效果出奇的好,Springboot程序可以正常运行,flink程序也可以通过jar的方式启动了。

下面是集成过程。

Springboot集成Flink

第一步,创建Springboot项目。

第二步,引入flink依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>

第三步,pom文件导入打包插件

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-resources-plugin</artifactId>
            <version>2.4.3</version>
        </plugin>
    </plugins>
</build>

第四步,编写flink程序

/**
 * flink主处理函数
 */
public class FlinkMainProcess {

    /**
     * 得到运行环境
     *
     * @return
     */
    public static StreamExecutionEnvironment getEnv() {
        return StreamExecutionEnvironment.getExecutionEnvironment();
    }

    public static void startFlinkJob() {
        StreamExecutionEnvironment env = getEnv();
        SingleOutputStreamOperator<JSONObject> sourceStream = env.addSource(new RandomSource())
                .filter(e -> e.getInteger("id") % 2 == 0)
                .map(e -> {
                    if (e.getString("name").contains("2")) {
                        e.replace("2", "two");
                    }
                    return e;
                });

        sourceStream.print("print-console");
        sourceStream.addSink(new LocalFileSink()).name("file sink");

        try {
            env.execute("random process");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

第五步,在Springboot主类中,启动flink程序线程

@SpringBootApplication
public class HelloApplication {
    public static void main(String[] args) {
        SpringApplication.run(HelloApplication.class);

        //在Springboot主类中,启动flink程序线程
        new Thread(FlinkMainProcess::startFlinkJob).start();
    }
}

第六步,编译打包Springboot程序,并执行

 java -jar flink-demo-1.0-SNAPSHOT.jar

 

 

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐