2017-10-20 89 views
0

我们计划使用Flink处理来自kafka主题的数据流(Logs in Json格式)。Kafka-Flink-Stream处理:有没有办法将输入文件重新加载到流处理中使用的变量中?

但是,对于那个处理,我们需要使用每天都在变化的输入文件,而内部的信息可以完全改变(不是格式,而是内容)。

每当其中一个输入文件发生更改时,我们将不得不将这些文件重新加载到程序中并保持流处理正在进行。数据

重新加载可以做同样的方式,因为它现在已经完成:

DataSet<String> globalData = env.readTextFile("file:///path/to/file"); 

但到目前为止,我无法找到的例子或想出一个办法来触发流处理作业重装。

作为额外的信息,我们不会使用HDFS,而是在每个节点上使用本地文件系统,因此必须在每个节点上从本地文件中重新加载。 这是因为我们需要HDFS的唯一原因是这个输入文件总共只有100 MB,而使用HDFS会是一个矫枉过正的问题。

到目前为止,我一直在尝试使用RichMapFunction,试图找到一个可以提供此功能(重新加载文件)的kafka主题,并试图找到没有运气的例子。


编辑:

读了很多之后,我在几个地方发现,这是要走的路:DataArtisans examples

试图做一个简单的代码,将来自控制流做一个简单的变化在流,我得到了下面的代码:

public class RichCoFlatMapExample extends EventTimeJoinHelper { 

private String config_source_path = "NOT_INITIALIZED"; 

@Override 
public void open(Configuration conf) { 
    config_source_path = "first_file_path"; 

} 

public abstract void processElement1(String one, String two, Collector<String> out) { 

    config_source_path = one; 
} 


public abstract void processElement2(String one, String two, Collector<String> out) { 
    String three = two + config_source_path; 
    out.collect(three); 
} 

} 

现在有就是,不管我怎么努力林的问题,我得到以下错误:

Class 'RichCoFlatMapExample' must either be declared abstract or implement abstract method 'processElement1(String, String, Collector)' in 'RichCoFlatMapExample'

问题是,所要求的方法来实现,但我不能让一个非抽象类(我从IDE错误)这些“抽象”。 如果我创建类RichCoFlatMapExample,我将无法从Flink方法(dataStream方法)调用它。

林不知道发生了什么,但我认为这必须接近。如果我做这个工作,我会继续尝试和更新。

+0

我想澄清“输入文件”应该如何工作:它们将被用作过滤数据,例如“有效的国家”,然后我们将过滤我们的流与那些“有效的国家”。因此,在我们继续处理流之前,我们需要一个完全加载的“输入文件”。 –

回答

2

Flink可以监视一个目录并在文件被移入该目录时摄取文件;也许这就是你要找的。请参阅PROCESS_CONTINUOUSLY option for readfile in the documentation

但是,如果数据在Kafka中,那么使用Flink的Kafka使用者直接将数据流式传输到Flink会更加自然。还有documentation about using the Kafka connector。 Flink培训包括an exercise on using Kafka with Flink

+0

缺少的链接是在重新加载新文件(假设您使用“PROCESS_CONTINUOUSLY”)以及变量如何重新加载并开始影响流处理的值之后发生的情况。我找不到有关这方面的文献。非常感谢你,我会尝试模拟它,并在这里报告使用的代码和结果。我还澄清了我们应该如何处理输入文件,我之前认识到这些文件不是很清楚。 –

+0

谢谢。我发现我无法将数据集与数据流混合,但发现了一些编辑问题中描述的示例。有关更新的任何建议将非常感激。 –

+0

经过深入研究(数据工匠培训示例)[https://github.com/dataArtisans/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/exercises/datastream_java/ process/EventTimeJoinHelper.java],这个问题就解决了。因此,我将使用这些示例,直到我能够吸收到我们的代码中。我将这个问题标记为答案,因为没有比RichCoFunction完整的例子更多的信息。 –

相关问题