2016-03-20 114 views
1

下面的代码片断尝试应用一个简单的函数来一个PySpark RDD对象应用自定义功能时:使用外部模块PySpark

import pyspark 
conf = pyspark.SparkConf() 
conf.set('spark.dynamicAllocation.minExecutors', 5) 
sc = SparkContext(appName="tmp", conf=conf) 
sc.setLogLevel('WARN') 

fn = 'my_csv_file' 
rdd = sc.textFile(fn) 
rdd = rdd.map(lambda line: line.split(",")) 
header = rdd.first() 
rdd = rdd.filter(lambda line:line != header) 
def parse_line(line): 
    ret = pyspark.Row(**{h:line[i] for (i, h) in enumerate(header)}) 
    return ret 
rows = rdd.map(lambda line: parse_line(line)) 
sdf = rows.toDF() 

如果我开始python my_snippet.py程序,它失败通过抱怨:

File "<ipython-input-27-8e46d56b2984>", line 6, in <lambda> 
File "<ipython-input-27-8e46d56b2984>", line 3, in parse_line 
NameError: global name 'pyspark' is not defined 

我更换了parse_line功能如下:

def parse_line(line): 
    ret = h:line[i] for (i, h) in enumerate(header) 
    ret['dir'] = dir() 
    return ret 

现在,创建数据框并且dir列显示 中的名称空间该函数仅包含两个对象:lineret。我如何将其他模块和对象作为函数的一部分? 不仅pyspark,还有其他人。

编辑请注意,pyspark在程序中可用。只有在函数被map(并且我假设为filter,reduce等)调用时,它才会看到任何导入的模块。

+0

以下是否回答你的问题? http://stackoverflow.com/questions/23256536/importing-pyspark-in-python-shell – Yaron

回答

0

1)答案原来的问题: 这似乎是问题的根源运行蟒蛇my_snippet.py 你应该使用执行代码的火花提交my_snippet.py

2)回答IPython的笔记本问题: 在我IPython的笔记本配置以下行不存在:

import pyspark 
conf = pyspark.SparkConf() 
conf.set('spark.dynamicAllocation.minExecutors', 5) 
sc = SparkContext(appName="tmp", conf=conf) 

“SC”的范围之外定义我的计划

3)答案就每一个问题就需要被安装numpy的(或其他模块) 为了使用numpy的,你需要安装numpy的(使用apt-get或者PIP或从源代码安装)节点集群

+0

你是对的,运行'spark-submit'确实解决了独立程序中的问题。另一方面,如果我想运行IPython笔记本,则这不起作用。我可以用'IPYTHON_OPTS =“笔记本”pyspark“启动IPython,但是我不能在运行时更改'SparkContext'选项(我可以吗?)。此外,如果函数使用'numpy'或其他模块,而不是'pyspark','spark-submit'也不会帮助 –