0
出于某种原因,当我运行电子病历工作在我的管道下面的功能产生错误(使用emr-5.0.0
和Spark 2.0.0):星火工作在本地运行,但无法对EMR - 想不通为什么
def autv(self, f=atf):
"""
Args:
f:
Returns:
"""
if not self._utv:
raise FileNotFoundError("Data not loaded.")
ut = self._utv
try:
self._utv = (ut
.rdd
.map(lambda x: (x.id, (x.t, x.w)))
.groupByKey()
.map(lambda x: Row(id=x[0],
w=len(x[1]),
t=DenseVector(f(x[1]))))
.toDF())
return self
except AttributeError as e:
logging.error(e)
return None
atf
是一个很简单的功能:
def atf(iterable):
"""
Args:
iterable:
Returns:
"""
return [stats.mean(t) for t in zip(*list(zip(*iterable))[0])]
我得到的错误的一个巨大的字符串,但这里是最后一部分:
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:211)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/mnt/yarn/usercache/hadoop/appcache/application_1472313936084_0003/container_1472313936084_0003_01_000002/pyspark.zip/pyspark/worker.py", line 161, in main
func, profiler, deserializer, serializer = read_command(pickleSer, infile)
File "/mnt/yarn/usercache/hadoop/appcache/application_1472313936084_0003/container_1472313936084_0003_01_000002/pyspark.zip/pyspark/worker.py", line 54, in read_command
command = serializer._read_with_length(file)
File "/mnt/yarn/usercache/hadoop/appcache/application_1472313936084_0003/container_1472313936084_0003_01_000002/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/mnt/yarn/usercache/hadoop/appcache/application_1472313936084_0003/container_1472313936084_0003_01_000002/pyspark.zip/pyspark/serializers.py", line 419, in loads
return pickle.loads(obj, encoding=encoding)
ImportError: No module named 'regression'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
16/08/27 16:28:43 INFO ShutdownHookManager: Shutdown hook called
16/08/27 16:28:43 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-429a8665-405e-4a8a-9a0c-7f939020a644
16/08/27 16:28:43 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-429a8665-405e-4a8a-9a0c-7f939020a644/pyspark-41867521-9dfd-4d8f-8b13-33272063e0c3
有一个ImportError: No module named 'regression'
消息对我来说没有意义,因为我的脚本的其余部分正在运行该模块中的函数,并且当我删除aggregate_user_topic_vectors
函数时,脚本运行时没有错误。另外,正如我前面所说的那样,即使使用aggregate_user_topic_vectors
,脚本也会在本地计算机上无误地运行。我已经设置了PYTHONPATH
来确认我的项目。真的不知道该从哪里出发。任何意见将不胜感激。
您将需要通过火花提供蟒蛇依赖提交..请参阅http://stackoverflow.com/questions/35214231/importerror-no-module-named-numpy-on-spark-workers –
这不是一个外部模块。这是我写的一个模块,叫做“回归”。我只使用Python标准库。 –
好吧,我明白了。它是否可用于Cluster的工作节点?我知道在本地模式下它工作并且集群模式不能解决它。这意味着您的模块不适用于工作人员。不是吗? –