下面的代码片断尝试应用一个简单的函数来一个PySpark RDD对象应用自定义功能时:使用外部模块PySpark
import pyspark
conf = pyspark.SparkConf()
conf.set('spark.dynamicAllocation.minExecutors', 5)
sc = SparkContext(appName="tmp", conf=conf)
sc.setLogLevel('WARN')
fn = 'my_csv_file'
rdd = sc.textFile(fn)
rdd = rdd.map(lambda line: line.split(","))
header = rdd.first()
rdd = rdd.filter(lambda line:line != header)
def parse_line(line):
ret = pyspark.Row(**{h:line[i] for (i, h) in enumerate(header)})
return ret
rows = rdd.map(lambda line: parse_line(line))
sdf = rows.toDF()
如果我开始python my_snippet.py
程序,它失败通过抱怨:
File "<ipython-input-27-8e46d56b2984>", line 6, in <lambda>
File "<ipython-input-27-8e46d56b2984>", line 3, in parse_line
NameError: global name 'pyspark' is not defined
我更换了parse_line
功能如下:
def parse_line(line):
ret = h:line[i] for (i, h) in enumerate(header)
ret['dir'] = dir()
return ret
现在,创建数据框并且dir
列显示 中的名称空间该函数仅包含两个对象:line
和ret
。我如何将其他模块和对象作为函数的一部分? 不仅pyspark,还有其他人。
编辑请注意,pyspark在程序中可用。只有在函数被map
(并且我假设为filter
,reduce
等)调用时,它才会看到任何导入的模块。
以下是否回答你的问题? http://stackoverflow.com/questions/23256536/importing-pyspark-in-python-shell – Yaron