我运行以下文件(称为test_func.py
)与py.test
:Pyspark - 使用在地图自定义函数变换
import findspark
findspark.init()
from pyspark.context import SparkContext
def filtering(data):
return data.map(lambda p: modif(p)).count()
def modif(row):
row.split(",")
class Test(object):
sc = SparkContext('local[1]')
def test_filtering(self):
data = self.sc.parallelize(['1','2', ''])
assert filtering(data) == 2
而且,由于modif
功能的map
内部使用变换时,出现以下错误:
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/osboxes/spark-1.5.2-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
command = pickleSer._read_with_length(infile)
File "/home/osboxes/spark-1.5.2-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/home/osboxes/spark-1.5.2-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
return pickle.loads(obj)
ImportError: No module named clustering.test_func
pyspark无法找到modif
函数。请注意,文件test_func.py
位于目录clustering
中,我从clustering
目录内运行py.test
。
令我吃惊的是,如果我在map
之外使用modif
函数,它工作正常。例如,如果我这样做:modif(data.first())
任何想法,为什么我得到这样的importErrors,我该如何解决它?
编辑
- 我已经测试了什么已经提出Avihoo Mamka的回答,即以
test_func.py
添加到复制到所有劳动者的文件。但是,它没有效果。对我来说这并不奇怪,因为我认为从创建Spark应用程序的主文件始终发送给所有工作人员。 - 我认为它可能来自一个事实,即
pyspark
正在寻找clustering.test_func
,而不是test_func
。
只是一个想法,但地图上执行每一个工人的代码块。这可能与工作人员不识别'modif'功能有关吗? –
是的,那是我的赌注。但我不知道如何让工人认可它(这里只有一个,因为主人是“本地[1]”)...... – Pop