2016-11-11 67 views
3

我遇到了一些关于最近开发flink作业的问题,它介绍了spring和hibernate,并且作业将在flink集群上运行。所以我需要在任务管理器而不是作业管理器上运行flink运算符之前初始化spring资源。但是我找不到任何合适的StreamExecutionEnvironment方法来实现这一点。如何在flink环境中初始化flink作业的spring资源

我已经试过这样如下一些方法:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
ApplicationContext context = new ClassPathXmlApplicationContext("classpath:applicationContext.xml"); 
// etl business logic as flink operators 
FlinkOperators.run(); 
env.execute(); 

然而,当弗林克工作,其并行不止一个执行,弹簧初始化将在每个任务管理器进程。所以我不能在flink工作中使用spring。

是否有任何方法来初始化Flink作业的spring资源?

谢谢。

最好的问候。

阿尔文

+0

我现在有问题,你找到解决方案吗? – zt1983811

回答

0

每当你需要有某种形式的上下文初始化每个任务管理器中,建立一个静态函数(如果你使用Scala的对象内的函数)存储静态变量中的初始化值。

这应该够了,因为静态值存储在每个任务管理器的内存中。

我使用这个aproach加载每个任务管理器中的属性文件,这些属性文件包含每个作业配置。如果您正在加载文件,请检查每个taskmanager是否有要加载的文件的副本。