我们计划使用Flink处理来自kafka主题的数据流(Logs in Json格式)。Kafka-Flink-Stream处理:有没有办法将输入文件重新加载到流处理中使用的变量中?
但是,对于那个处理,我们需要使用每天都在变化的输入文件,而内部的信息可以完全改变(不是格式,而是内容)。
每当其中一个输入文件发生更改时,我们将不得不将这些文件重新加载到程序中并保持流处理正在进行。数据
重新加载可以做同样的方式,因为它现在已经完成:
DataSet<String> globalData = env.readTextFile("file:///path/to/file");
但到目前为止,我无法找到的例子或想出一个办法来触发流处理作业重装。
作为额外的信息,我们不会使用HDFS,而是在每个节点上使用本地文件系统,因此必须在每个节点上从本地文件中重新加载。 这是因为我们需要HDFS的唯一原因是这个输入文件总共只有100 MB,而使用HDFS会是一个矫枉过正的问题。
到目前为止,我一直在尝试使用RichMapFunction,试图找到一个可以提供此功能(重新加载文件)的kafka主题,并试图找到没有运气的例子。
编辑:
读了很多之后,我在几个地方发现,这是要走的路:DataArtisans examples。
试图做一个简单的代码,将来自控制流做一个简单的变化在流,我得到了下面的代码:
public class RichCoFlatMapExample extends EventTimeJoinHelper {
private String config_source_path = "NOT_INITIALIZED";
@Override
public void open(Configuration conf) {
config_source_path = "first_file_path";
}
public abstract void processElement1(String one, String two, Collector<String> out) {
config_source_path = one;
}
public abstract void processElement2(String one, String two, Collector<String> out) {
String three = two + config_source_path;
out.collect(three);
}
}
现在有就是,不管我怎么努力林的问题,我得到以下错误:
Class 'RichCoFlatMapExample' must either be declared abstract or implement abstract method 'processElement1(String, String, Collector)' in 'RichCoFlatMapExample'
问题是,所要求的方法来实现,但我不能让一个非抽象类(我从IDE错误)这些“抽象”。 如果我创建类RichCoFlatMapExample,我将无法从Flink方法(dataStream方法)调用它。
林不知道发生了什么,但我认为这必须接近。如果我做这个工作,我会继续尝试和更新。
我想澄清“输入文件”应该如何工作:它们将被用作过滤数据,例如“有效的国家”,然后我们将过滤我们的流与那些“有效的国家”。因此,在我们继续处理流之前,我们需要一个完全加载的“输入文件”。 –