springboot整合flink(一)

欢迎加入扣扣组织,783092701

一、场景

需要使用mongodb,由于数据量很小且单机版,考虑使用flink CDC不太合适;
然后看到使用springboot框架集成mongodb操作起来很方便,所以就有了此篇文章。

二、遇到的问题

现象1:springboot依赖注入与flink冲突
现象2:flink该放在springboot哪一层?
现象3:springboot结合flink如何处理启动类?
现象4:flink作用域,与springboot作用域
现象5:如何在springboot启动类里使用bean

三、springboot结合flink如何处理启动类?

1、springboot启动过程

在这里插入图片描述

2、springboot启动立即执行flink

1) 有10种方法立即执行,我们从中筛选
方法1:静态方法/@PostConstruct(场景不符)
方法2:实现InitializingBean接口(执行在容器前)
方法3:提供initMethod(执行在方法2前)
方法4:实现ApplicationRunner接口(执行在容器前)
方法5:实现CommandLineRunner接口(CommandLineRunner和ApplicationRunner的作用是相同,参数不同)
方法6:在启动类的main里边进行
方法7:@EnableScheduling(场景不符)
方法8:实现ApplicationListener接口(场景不符)
方法9:实现ServletContextAware接口(web项目用)
方法10:实现ServletContextListener接口(web项目用)

2) 我们测试ApplicationRunner执行顺序
是一个接口,常用于项目启动后,(也就是ApringApplication.run()执行结束),立马执行某些逻辑。
可用于项目的准备工作,比如加载配置文件,加载执行流,定时任务等等。
测试与run()的执行先后顺序:
在这里插入图片描述

从图中看出,ApplicationRunner在容器实例化之前执行,flink消费kafka写在此处不合适,因为我们要获取yml里属性配置

所以,我们就把flink执行流就写在main函数的run()后:
在这里插入图片描述

因此上网搜了一下,找到了解决的办法,就是把get,set方法的static 修饰符去掉,然后在set方法上面加@Resource即可,但是不适合解决我们的问题,我们用下面方法解决

3) 通过上下文接口获取属性文件配置

import javax.annotation.Resource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import com.yhzq.config.SystemConfig;
import com.yhzq.util.BeanUtil;
/**
 * <p>Title: Application.java</p>
 * <p>Description: </p>
 * @author wangmoumo
 * @version 1.0
 * @date 2022年10月18日
 * @url www.681vip.com
 */
@SpringBootApplication
public class Application{
	
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
        /* run方法的返回值ConfigurableApplicationContext继承了ApplicationContext上下文接口 */
		ConfigurableApplicationContext applicationContext = SpringApplication.run( Application.class, args );
		/* 将run方法的返回值赋值给工具类中的静态变量 */
		BeanUtil.applicationContext = applicationContext;
		getSystemConfig();
    }
    /* 调用  注意Application是我们SpringBoot的启动类 */
	public static void getSystemConfig(){
		/* 测试获取已经实例化的接口bean,执行bean中方法 */
		SystemConfig systemConfig = BeanUtil.getBean(SystemConfig.class );
		System.out.println(systemConfig);
	}
}

3、sysconfig设置为flink全局参数

env.getConfig().setGlobalJobParameters(systemConfig);

在这里插入图片描述
更多内容:vip资料分享网

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐