2

我有一个应用程序可以并行执行处理要从Google Storage(我的项目存储桶)下载的数据的Python对象。该群集使用Google Dataproc创建。问题是数据永远不会被下载!我写了一个测试程序来试图理解这个问题。 我写了下面的功能将文件从桶复制,看看是否对工人创建文件不工作:使用Spark(Python)和Dataproc从Google Storage下载文件

from subprocess import call 
from os.path import join 

def copyDataFromBucket(filename,remoteFolder,localFolder): 
    call(["gsutil","-m","cp",join(remoteFolder,filename),localFolder] 

def execTouch(filename,localFolder): 
    call(["touch",join(localFolder,"touched_"+filename)]) 

我已经从一个Python外壳调用它测试了这个功能,它的工作原理。但是,当我使用火花提交运行下面的代码,这些文件未下载(但不会引发错误):

# ... 
filesRDD = sc.parallelize(fileList) 
filesRDD.foreach(lambda myFile: copyDataFromBucket(myFile,remoteBucketFolder,'/tmp/output') 
filesRDD.foreach(lambda myFile: execTouch(myFile,'/tmp/output') 
# ... 

的execTouch功能工程(我可以看到每个工人的文件),但copyDataFromBucket功能什么也没做。

那么我做错了什么?

+0

一个精度:我使用Anaconda2包来运行我的应用程序,但我必须将CLOUDSDK_PYTHON变量设置为/ usr/bin/python以使gsutil能够工作 – ma3oun

+0

if你用bash或shell运行'gsutil -m cp ...',目前工作吗? – Kristian

+0

是的,它在主人和每个工人身上都能正常工作。 – ma3oun

回答

1

问题显然是Spark上下文。通过调用“Hadoop的FS”更换呼叫“的gsutil”解决它:

from subprocess import call 
from os.path import join 

def copyDataFromBucket(filename,remoteFolder,localFolder): 
    call(["hadoop","fs","-copyToLocal",join(remoteFolder,filename),localFolder] 

我也做了一个测试将数据发送到桶。一个只需要用“-copyFromLocal”代替“-copyToLocal”

相关问题