2016-01-19 60 views
1

我一直工作在一些星火使用Python,具体textFileStream流,而且我发现一个稍显怪异行为。我想知道有没有人可以帮我解释一下。Python的星火流只运行一次

目前,我有我的代码设置如下:从调试流(Spark Streaming: How to get the filename of a processed file in Python)正在处理的文件的

def fileName(data): 
    debug = data.toDebugString() 
    pattern = re.compile("file:/.*\.txt") 
    files = pattern.findall(debug) 
    return files 

if __name__ == "__main__": 
    sc = SparkContext(appName="PythonStreamingFileNamePrinter") 
    ssc = StreamingContext(sc, 1) 

    lines = ssc.textFileStream("file:///test/input/") 

    files = lines.foreachRDD(fileName) 
    print(files) 

    ssc.start() 
    ssc.awaitTermination() 

,文件名功能简单争夺的名称。但是,此代码仅运行一次,只能打印一次文件。当我修改功能如下:

def fileName(data): 
    debug = data.toDebugString() 
    pattern = re.compile("file:/.*\.txt") 
    files = pattern.findall(debug) 
    print(files); 

它每秒钟检查目录,如预期。似乎唯一的代码是“循环”在foreachRDD中。

我是在这个假设是正确的,并且所有处理(包括循环,条件等)必须内部地图功能等发生的?

感谢, 中号

回答

1

一个DSTREAM是由许多RDDS正在建立随时间推移的。 行是DStream。

当你上线执行foreachRDD您的流中的每个RDD转化为一个字符串。所以当你打印它时,你会得到一个表示流中所有rdds的字符串列表。意思是,这发生在“流的尽头”。

当您打印的文件名函数的字符串,你这样做是为了每个RDD流中,而它正在继续进行。所以当流正在运行时你正在获取它。

此外,正如我在以前的问题中提到你,foreachRDD这里没有必要。这不是针对这种特定需求的“火花流方式”,也许这就是为什么它会让你感到困惑。

这里的更直接的方式是使用在DSTREAM本身的地图(这将影响所有的RDD中的),然后使用pprint。

请记住,不同于一般的RDD,你不能仅仅收取(或任何类似)RDDS在流和流运行时返回结果。你需要做一些与数据,将其保存到一些外部源(如果需要)或处理它作为整个数据流的状态的一部分。

+0

有道理。谢谢。 – swinefish