2016-06-07 25 views
1

我运行以下文件(称为test_func.py)与py.testPyspark - 使用在地图自定义函数变换

import findspark 
findspark.init() 
from pyspark.context import SparkContext 

def filtering(data): 
    return data.map(lambda p: modif(p)).count() 

def modif(row): 
    row.split(",") 

class Test(object): 
    sc = SparkContext('local[1]') 

    def test_filtering(self): 
     data = self.sc.parallelize(['1','2', '']) 
     assert filtering(data) == 2 

而且,由于modif功能的map内部使用变换时,出现以下错误:

org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/home/osboxes/spark-1.5.2-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main 
    command = pickleSer._read_with_length(infile) 
    File "/home/osboxes/spark-1.5.2-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length 
    return self.loads(obj) 
    File "/home/osboxes/spark-1.5.2-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads 
    return pickle.loads(obj) 
ImportError: No module named clustering.test_func 

pyspark无法找到modif函数。请注意,文件test_func.py位于目录clustering中,我从clustering目录内运行py.test

令我吃惊的是,如果我在map之外使用modif函数,它工作正常。例如,如果我这样做:modif(data.first())

任何想法,为什么我得到这样的importErrors,我该如何解决它?


编辑

  1. 我已经测试了什么已经提出Avihoo Mamka的回答,即以test_func.py添加到复制到所有劳动者的文件。但是,它没有效果。对我来说这并不奇怪,因为我认为从创建Spark应用程序的主文件始终发送给所有工作人员。
  2. 我认为它可能来自一个事实,即pyspark正在寻找clustering.test_func,而不是test_func
+0

只是一个想法,但地图上执行每一个工人的代码块。这可能与工作人员不识别'modif'功能有关吗? –

+0

是的,那是我的赌注。但我不知道如何让工人认可它(这里只有一个,因为主人是“本地[1]”)...... – Pop

回答

0

这里的关键是你得到的Traceback

PySpark告诉你工作进程无权访问clustering.test_func.py。当初始化SparkContext可以传递应该被复制到工人的文件列表:

sc = SparkContext("local[1]", "App Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg']) 

的更多信息:https://spark.apache.org/docs/1.5.2/programming-guide.html

+0

我已经试过这个了:它不会改变任何东西。感谢您的输入,无论如何:) – Pop

+0

您是否已将'test_func.py'添加到SparkContext? –

+0

是的,我已经尝试过了:'sc = SparkContext(“local [1]”,pyFiles = ['test_func.py'])'和'os.environ [“PYSPARK_SUBMIT_ARGS”] =“--files test_func.py - py-files test_func.py pyspark-shell“' – Pop