2015-08-08 77 views
19

我有一个生成的RDD labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)。这在这种格式输出:如何将生成的RDD写入Spark python的csv文件中

[(0.0, 0.08482142857142858), (0.0, 0.11442786069651742),.....] 

我要的是创造一个CSV与labels一列(在上面的输出元组的第一部分)和一个用于predictions(元组输出的第二部分)文件。但我不知道如何使用Python在Spark中写入CSV文件。

如何使用上述输出创建CSV文件?

回答

30

只需将map RDD的行(labelsAndPredictions)转换为字符串(行的CSV),然后使用rdd.saveAsTextFile()

def toCSVLine(data): 
    return ','.join(str(d) for d in data) 

lines = labelsAndPredictions.map(toCSVLine) 
lines.saveAsTextFile('hdfs://my-node:9000/tmp/labels-and-predictions.csv') 
+0

如何映射为字符串?我的意思是在'toLine(data)'里写什么? –

+0

对不起,我以为你知道基本的Python。我已将它添加到答案中。 –

+0

正如有疑问,那么这将保存'csv'文件吗?在代码所在的同一个目录中?我可以将它保存到其他目录(使用'saveAsTextFile('/ home/files/labels-and-predictions.csv')')吗? –

6

这是不好的,只是用逗号连接,因为如果字段包含逗号,他们将不能正确地引用,例如','.join(['a', 'b', '1,2,3', 'c'])给你a,b,1,2,3,c当你想要a,b,"1,2,3",c。相反,你应该使用Python的CSV模块,每个列表转换在RDD的正确格式的CSV字符串:

# python 3 
import csv, io 

def list_to_csv_str(x): 
    """Given a list of strings, returns a properly-csv-formatted string.""" 
    output = io.StringIO("") 
    csv.writer(output).writerow(x) 
    return output.getvalue().strip() # remove extra newline 

# ... do stuff with your rdd ... 
rdd = rdd.map(list_to_csv_str) 
rdd.saveAsTextFile("output_directory") 

由于csv模块只写入文件的对象,我们要创建一个空的“文件”与io.StringIO("")并告诉csv.writer写入csv格式的字符串。然后,我们使用output.getvalue()来获取我们刚刚写入“文件”的字符串。为了使这个代码能够与Python 2一起工作,只需用StringIO模块替换io即可。

如果您使用Spark DataFrames API,还可以查看DataBricks save function,它具有csv格式。

+0

我使用这段代码得到一个TypeError。 TypeError:不能将str写入文本流。 –

+0

@Moe Chughtai您使用的是Spark/Python的哪个版本?哪一行给你的类型错误,以及什么输入? –

10

我知道这是一个旧帖子。但是,为了帮助别人寻找同样的,这里是我怎么写的两列RDD到一个CSV文件中PySpark 1.6.2

的RDD:

>>> rdd.take(5) 
[(73342, u'cells'), (62861, u'cell'), (61714, u'studies'), (61377, u'aim'), (60168, u'clinical')] 

现在代码:

# First I convert the RDD to dataframe 
from pyspark import SparkContext 
df = sqlContext.createDataFrame(rdd, ['count', 'word']) 

的DF:

>>> df.show() 
+-----+-----------+ 
|count|  word| 
+-----+-----------+ 
|73342|  cells| 
|62861|  cell| 
|61714| studies| 
|61377|  aim| 
|60168| clinical| 
|59275|   2| 
|59221|   1| 
|58274|  data| 
|58087|development| 
|56579|  cancer| 
|50243| disease| 
|49817| provided| 
|49216| specific| 
|48857|  health| 
|48536|  study| 
|47827| project| 
|45573|description| 
|45455| applicant| 
|44739| program| 
|44522| patients| 
+-----+-----------+ 
only showing top 20 rows 

现在写为CSV

# Write CSV (I have HDFS storage) 
df.coalesce(1).write.format('com.databricks.spark.csv').options(header='true').save('file:///home/username/csv_out') 

P.S:我只是一个初学者从这里的帖子学习Stackoverflow。所以我不知道这是否是最好的方法。但它对我有用,我希望它能帮助别人!

+0

这是为我工作的解决方案。干杯! – Indra

+0

我不得不编码一些列W/JSON,但否则这个工程 –