Springboot集成Flink——本地方式运行Flink jar包
前言首先有一个问题,Flink程序可以在idea中正常运行,但是将Flink打成jar包,使用java -jar的方式运行jar包就会报错。这个问题一直没有解决。正好最近遇到这样一个场景,需要将flink程序集成到Springboot程序中(不要问为啥要在本地执行flink程序,不上集群),集成完以后没想到效果出奇的好,Springboot程序可以正常运行,flink程序也可以通过jar的方式启动
·
前言
首先有一个问题,Flink程序可以在idea中正常运行,但是将Flink打成jar包,使用java -jar的方式运行jar包就会报错。这个问题一直没有解决。
正好最近遇到这样一个场景,需要将flink程序集成到Springboot程序中(不要问为啥要在本地执行flink程序,不上集群),集成完以后没想到效果出奇的好,Springboot程序可以正常运行,flink程序也可以通过jar的方式启动了。
下面是集成过程。
Springboot集成Flink
第一步,创建Springboot项目。
第二步,引入flink依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
第三步,pom文件导入打包插件
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>2.4.3</version>
</plugin>
</plugins>
</build>
第四步,编写flink程序
/**
* flink主处理函数
*/
public class FlinkMainProcess {
/**
* 得到运行环境
*
* @return
*/
public static StreamExecutionEnvironment getEnv() {
return StreamExecutionEnvironment.getExecutionEnvironment();
}
public static void startFlinkJob() {
StreamExecutionEnvironment env = getEnv();
SingleOutputStreamOperator<JSONObject> sourceStream = env.addSource(new RandomSource())
.filter(e -> e.getInteger("id") % 2 == 0)
.map(e -> {
if (e.getString("name").contains("2")) {
e.replace("2", "two");
}
return e;
});
sourceStream.print("print-console");
sourceStream.addSink(new LocalFileSink()).name("file sink");
try {
env.execute("random process");
} catch (Exception e) {
e.printStackTrace();
}
}
}
第五步,在Springboot主类中,启动flink程序线程
@SpringBootApplication
public class HelloApplication {
public static void main(String[] args) {
SpringApplication.run(HelloApplication.class);
//在Springboot主类中,启动flink程序线程
new Thread(FlinkMainProcess::startFlinkJob).start();
}
}
第六步,编译打包Springboot程序,并执行
java -jar flink-demo-1.0-SNAPSHOT.jar
更多推荐
已为社区贡献2条内容
所有评论(0)