试图找出如何写一个数据流作业,依赖于一些第三方的模块和查表工作,具体如下:尝试理解火花流工作
# custom.py
# this is the 3rd party or user defined python module,
# there're some module-level variables
# and some functions which rely on the moduel-level variables to work
VAR_A = ...
VAR_B = ...
# load external data files to initialize VAR_A and VAR_B
def init_var(external_file):
with open(external_file, 'r') as f:
for l in f:
VAR_A.append(l)
VAR_B.check(l)
....
# relies on VAR_A and VAR_B to work
def process(x):
if x in VAR_A:
...
if VAR_B.check(x):
...
流驱动器如下,基本上,每个RDD我想通过handle
申请custom
的process
功能,但是在process
功能依赖于某些查找变量工作,即VAR_A
和VAR_B
,所以我必须明确地播放这些查找瓦尔Spark contenxt?
# driver.py
import custom
def handle(x):
...
custom = shared.value
return custom.process(x)
if __name__ == '__main__':
sc = SparkContext(appName='porn_score_on_name')
ssc = StreamingContext(sc, 2)
custom.init('/path/to/external_file')
# since each task node will use custom, so I try to make it a shared one
# HOWEVER, this won't work, since module cannot be pickled
shared = sc.broadcast(custom)
# get stream data
data = ...
processed = data.map(handle)
# further processing
...
ssc.start()
ssc.awaitTermination()
我不知道如何使它工作,如果我不得不使用第三方模块?
UPDATE
假设实时流输入文字,例如线
word1 word2
word3
word5 word7 word1
...
我想找出预先计算好的词汇表中的单词(V)。
所以我有这样的想法: 写一个数据流作业来处理输入数据,这意味着我有多个执行并行运行消耗数据,并为每一个执行者,预先计算vocabular V,则所有可用时间。 现在的问题是如何让它发生?
这是我INTIAL采取在此: 我做包含的词汇和我的自定义代码,例如一个zip包pack.zip,然后我通过提交此pack.zip,使这一pack.zip可用每个执行人的机器上,那么我应该做些什么,以使每个执行人从pack.zip词汇加载到内存中的样子-up表,以至于现在每次执行访问的词汇,使他们能够正确处理实时流数据时,司机开始运行。
但事实证明,上述想法可行,但每个执行者一次又一次地加载词汇表,这是不可接受的。 所以这里是我的第二个看法: 我应该在驱动程序中加载词汇表(所以这发生在本地机器上,而不是执行者上),然后向所有执行者广播词汇表查找表,然后做这项工作。
我不明白的问题,但肯定的,所有的变量和模块必须是可序列化到星火(或者通过Python腌) –
@ cricket_007,基本上,我只是婉打电话'custom' 'handle'中的'process'函数。 – avocado
@ cricket_007,更新后的一些细节。顺便说一句,如果所有需要的模块必须在星火被序列化,那么什么是通过'--py-files'或'--archives'等上传包的地步? – avocado