2017-02-27 68 views
0

试图找出如何写一个数据流作业,依赖于一些第三方的模块和查表工作,具体如下:尝试理解火花流工作

# custom.py 
# this is the 3rd party or user defined python module, 
# there're some module-level variables 
# and some functions which rely on the moduel-level variables to work 
VAR_A = ... 
VAR_B = ... 


# load external data files to initialize VAR_A and VAR_B 
def init_var(external_file): 
    with open(external_file, 'r') as f: 
    for l in f: 
     VAR_A.append(l) 
     VAR_B.check(l) 
     .... 

# relies on VAR_A and VAR_B to work 
def process(x): 
    if x in VAR_A: 
    ... 
    if VAR_B.check(x): 
    ... 

流驱动器如下,基本上,每个RDD我想通过handle申请customprocess功能,但是在process功能依赖于某些查找变量工作,即VAR_AVAR_B,所以我必须明确地播放这些查找瓦尔Spark contenxt?

# driver.py 
import custom 

def handle(x): 
    ... 
    custom = shared.value 
    return custom.process(x) 


if __name__ == '__main__': 
    sc = SparkContext(appName='porn_score_on_name') 
    ssc = StreamingContext(sc, 2) 

    custom.init('/path/to/external_file') 

    # since each task node will use custom, so I try to make it a shared one 
    # HOWEVER, this won't work, since module cannot be pickled 
    shared = sc.broadcast(custom) 

    # get stream data 
    data = ... 
    processed = data.map(handle) 

    # further processing 
    ... 

    ssc.start() 
    ssc.awaitTermination() 

我不知道如何使它工作,如果我不得不使用第三方模块?

UPDATE

假设实时流输入文字,例如线

word1 word2 
word3 
word5 word7 word1 
... 

我想找出预先计算好的词汇表中的单词(V)。

所以我有这样的想法: 写一个数据流作业来处理输入数据,这意味着我有多个执行并行运行消耗数据,并为每一个执行者,预先计算vocabular V,则所有可用时间。 现在的问题是如何让它发生?

这是我INTIAL采取在此: 我做包含的词汇和我的自定义代码,例如一个zip包pack.zip,然后我通过​​提交此pack.zip,使这一pack.zip可用每个执行人的机器上,那么我应该做些什么,以使每个执行人从pack.zip词汇加载到内存中的样子-up表,以至于现在每次执行访问的词汇,使他们能够正确处理实时流数据时,司机开始运行。

但事实证明,上述想法可行,但每个执行者一次又一次地加载词汇表,这是不可接受的。 所以这里是我的第二个看法: 我应该在驱动程序中加载词汇表(所以这发生在本地机器上,而不是执行者上),然后向所有执行者广播词汇表查找表,然后做这项工作。

+0

我不明白的问题,但肯定的,所有的变量和模块必须是可序列化到星火(或者通过Python腌) –

+0

@ cricket_007,基本上,我只是婉打电话'custom' 'handle'中的'process'函数。 – avocado

+0

@ cricket_007,更新后的一些细节。顺便说一句,如果所有需要的模块必须在星火被序列化,那么什么是通过'--py-files'或'--archives'等上传包的地步? – avocado

回答

0

您的例子并不真的似乎是一个流动问题,只是如何加载一个全局变量...

我不会试图播放一个整体模块,只是个别需要的变量。

例如,你应该能够使用广播变量像这样。 (未测试的代码)

# One of the first things you do 
vocab = sc.broadcast(open('vocab.txt').readlines()) # broadcast to all executors 

def vocab_filter(line): 
    words = line.split() 
    return [w for w in words if w in vocab.value] 

ssc = StreamingContext(sc, 1) # Some streaming context 
lines = ssc.socketTextStream("localhost", 9999) # Some stream 
# remove extraneous words from the lines and flatten all words in the stream 
lines_filtered = lines.flatMap(vocab_filter) 
+0

是的,谢谢,我完全理解你的代码示例。然而,我最担心的问题是我需要使用第三方模块的查找功能,例如, 'process',这个函数是以一种不友好的方式编写的,就像这样'def process(line_of_string):for line in line_of_string.split():yield word if if in GLOBAL_VOCAB',就是假设这是一个模块在这个包含词汇的第三方模块中包含变量GLOBAL_VOCAB,所以现在即使我可以像你那样做'broadcast',那么我可以打电话给第三方'process'吗? – avocado

+0

'GLOBAL_VOCAB'本身必须是广播变量。否则,我不明白为什么你刚才提到是行不通的 –

+0

BTW什么,该词汇装载并不像你的代码那么简单,除了阅读它们,一个'trie'需求正在兴建对这些词汇的话,那么如何加载词汇表文件并在每个执行程序上建立'trie'一次,而不是一次又一次重建'trie'(这是我的代码中的一个问题) – avocado