2016-08-27 29 views
0

出于某种原因,当我运行电子病历工作在我的管道下面的功能产生错误(使用emr-5.0.0和Spark 2.0.0):星火工作在本地运行,但无法对EMR - 想不通为什么

def autv(self, f=atf): 
    """ 

    Args: 
     f: 

    Returns: 

    """ 
    if not self._utv: 
     raise FileNotFoundError("Data not loaded.") 
    ut = self._utv 
    try: 
     self._utv = (ut 
            .rdd 
            .map(lambda x: (x.id, (x.t, x.w))) 
            .groupByKey() 
            .map(lambda x: Row(id=x[0], 
                 w=len(x[1]), 
                 t=DenseVector(f(x[1])))) 
            .toDF()) 
     return self 
    except AttributeError as e: 
     logging.error(e) 
    return None 

atf是一个很简单的功能:

def atf(iterable): 
    """ 

    Args: 
     iterable: 

    Returns: 

    """ 
    return [stats.mean(t) for t in zip(*list(zip(*iterable))[0])] 

我得到的错误的一个巨大的字符串,但这里是最后一部分:

 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:498) 
     at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) 
     at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 
     at py4j.Gateway.invoke(Gateway.java:280) 
     at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128) 
     at py4j.commands.CallCommand.execute(CallCommand.java:79) 
     at py4j.GatewayConnection.run(GatewayConnection.java:211) 
     at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/mnt/yarn/usercache/hadoop/appcache/application_1472313936084_0003/container_1472313936084_0003_01_000002/pyspark.zip/pyspark/worker.py", line 161, in main 
    func, profiler, deserializer, serializer = read_command(pickleSer, infile) 
    File "/mnt/yarn/usercache/hadoop/appcache/application_1472313936084_0003/container_1472313936084_0003_01_000002/pyspark.zip/pyspark/worker.py", line 54, in read_command 
    command = serializer._read_with_length(file) 
    File "/mnt/yarn/usercache/hadoop/appcache/application_1472313936084_0003/container_1472313936084_0003_01_000002/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length 
    return self.loads(obj) 
    File "/mnt/yarn/usercache/hadoop/appcache/application_1472313936084_0003/container_1472313936084_0003_01_000002/pyspark.zip/pyspark/serializers.py", line 419, in loads 
    return pickle.loads(obj, encoding=encoding) 
ImportError: No module named 'regression' 

     at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193) 
     at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234) 
     at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) 
     at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
     at org.apache.spark.scheduler.Task.run(Task.scala:85) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     ... 1 more 

16/08/27 16:28:43 INFO ShutdownHookManager: Shutdown hook called 
16/08/27 16:28:43 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-429a8665-405e-4a8a-9a0c-7f939020a644 
16/08/27 16:28:43 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-429a8665-405e-4a8a-9a0c-7f939020a644/pyspark-41867521-9dfd-4d8f-8b13-33272063e0c3 

有一个ImportError: No module named 'regression'消息对我来说没有意义,因为我的脚本的其余部分正在运行该模块中的函数,并且当我删除aggregate_user_topic_vectors函数时,脚本运行时没有错误。另外,正如我前面所说的那样,即使使用aggregate_user_topic_vectors,脚本也会在本地计算机上无误地运行。我已经设置了PYTHONPATH来确认我的项目。真的不知道该从哪里出发。任何意见将不胜感激。

+0

您将需要通过火花提供蟒蛇依赖提交..请参阅http://stackoverflow.com/questions/35214231/importerror-no-module-named-numpy-on-spark-workers –

+0

这不是一个外部模块。这是我写的一个模块,叫做“回归”。我只使用Python标准库。 –

+0

好吧,我明白了。它是否可用于Cluster的工作节点?我知道在本地模式下它工作并且集群模式不能解决它。这意味着您的模块不适用于工作人员。不是吗? –

回答

0

好吧,正如我怀疑的,我的问题是从groupByKey(显然是邪恶的)移动到reduceByKey(因此,与我导入模块的方式没有任何关系)。这是修改后的代码。希望这可以帮助别人!

def autv(self): 
    if not self._utv: 
     raise FileNotFoundError("No data loaded.") 
    ut = self._utv 
    try: 
     self._utv = (ut 
            .rdd 
            .map(lambda x: (x.id, (x.t, x.w))) 
            .reduceByKey(lambda accum, x: (accum[0] + x[0], accum[1] + x[1])) 
            .map(lambda row: Row(user_id=row[0], 
                 weight=row[1][1], 
                 topics=row[1][0])) 
            .toDF()).cache() 
     return self 
    except AttributeError as e: 
     logging.error(e) 
    return None 
相关问题