2016-12-19 142 views
2

我试图使用df.write.csv将数据追加到我的csv文件。这就是我下面的火花文件http://spark.apache.org/docs/2.0.1/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter后所做的:如何在pyspark中使用df.write.csv追加到csv文件?

from pyspark.sql import DataFrameWriter 
..... 
df1 = sqlContext.createDataFrame(query1) 
df1.write.csv("/opt/Output/sqlcsvA.csv", append) #also tried 'mode=append' 

执行上面的代码给我错误:

NameError: name 'append' not defined

没有追加,错误:

The path already exists.

+0

是否有一个由sqlcsvA.csv调用的文件? –

+0

是的输出被复制到'sqlcsvA.csv'文件。 – kaks

+0

你可以删除,并再次从代码创建此文件? –

回答

0

我不知道关于Python ,但在Scala和Java中,可以通过以下方式设置保存模式:

df.write.mode("append").csv("pathToFile") 

我认为它应该在Python中类似。 This可能会有所帮助。

+0

我试过你在python中说过的话。但是,我的输出的每一行都被复制到一个名为'sqlcsvA.csv'的文件夹中的独立csv文件中。它们不会被复制到一个单独的csv文件中。 – kaks

+1

@kaks,看起来你将不得不手动合并这些文件。看看这个[问题](http://stackoverflow.com/questions/31674530/write-single-csv-file-using-spark-csv)。例如,人们正在使用[FileUtil.copyMerge](https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/fs/FileUtil.html#copyMerge(org.apache.hadoop.fs。文件系统,%20org.apache.hadoop.fs.Path,%20org.apache.hadoop.fs.FileSystem,%20org.apache.hadoop.fs.Path,%20boolean,%20org.apache.hadoop.conf.Configuration,% 20java.lang.String))在Java中。 –

+0

@kaks,请注意,如果您读取结果(在Spark中),则会合并这些文件,并且您有一个DataFrame,其中包含该目录中所有文件的数据。 –

3
df.write.save(path='csv', format='csv', mode='append', sep='\t') 
+0

这又将输出分割成不同的文件。它被分区。 – kaks

+2

在写入之前包含'.coalesce(1)',它会阻止分区,不确定是否会附加结果! 'df.coalesce(1).write.save(path ='csv',format ='csv',mode ='append',sep ='\ t')' – Jarek

+0

谢谢。这一切都是为了一个文件。 – kaks

2

从文档: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter 由于V1.4

csv(path, mode=None, compression=None, sep=None, quote=None, escape=None, header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None, timestampFormat=None)

例如

from pyspark.sql import DataFrameWriter 
..... 
df1 = sqlContext.createDataFrame(query1) 
df1.write.csv(path="/opt/Output/sqlcsvA.csv", mode="append") 

如果你想要写一个文件,你可以在任意这些线路的使用聚结或repartition。不管哪一行,因为数据帧只是一个DAG执行,在写入csv之前不会执行任何操作。 repartition &​​3210有效地使用相同的代码,但合并只能减少分区的数量,其中repartition也可以增加它们。为了简单起见,我只是坚持使用repartition

例如

df1 = sqlContext.createDataFrame(query1).repartition(1) 

df1.repartition(1).write.csv(path="/opt/Output/sqlcsvA.csv", mode="append") 

我想在文档的例子并不是很大,他们没有表现出比使用路径其他参数的例子。

参考你试过两件事情:

(append)

对于工作,就必须命名追加包含值“追加”一个字符串变量。 DataFrameWriter库中没有字符串常量,名为append。即你可以在你的代码中添加这个,然后它就可以工作。 追加=“追加”

('mode=append')

对于工作的CSV方法必须解析出mode=append字符串以获取模式的价值,这将是额外的工作时,你可以有一个参数与需要提取的值“append”或“overwrite”完全相同。没有一个是特殊情况,Python内置,并不特定于pyspark。

另一方面,我建议尽可能使用命名参数。 例如

csv(path="/path/to/file.csv", mode="append") 

,而不是位置参数

csv("/path/to/file.csv", "append") 

它更清晰,并帮助理解。