2017-03-09 61 views
0

我想使用结构化流式传输将文本文件附加到文本文件中。此代码导致SparkException:任务不可序列化。我认为toDF是不允许的。我怎么能得到这个代码工作?使用foreach附加到文本文件的Spark结构化流式传输

df.writeStream 
    .foreach(new ForeachWriter[Row] { 
    override def open(partitionId: Long, version: Long): Boolean = { 
     true 
    } 

    override def process(row: Row): Unit = { 
     val df = Seq(row.getString(0)).toDF 

     df.write.format("text").mode("append").save(output) 
    } 

    override def close(errorOrNull: Throwable): Unit = { 
    }  
    }).start 
+0

我想你应该试试这个 df.write.mode(SaveMode.Append).textFile(“Path_To_Store”) –

+0

编译器抱怨文本文件不是成员DataFramerWriter。我正在使用spark 2.1。 –

+0

对不起,它只有'文本(“Path_To_Save”)' 'df.write.mode(SaveMode.Append).text(“Path_To_Save”)' –

回答

1

不能调用df.write.format("text").mode("append").save(output)process方法。它将运行在执行者一方。您可以使用文件接收器代替,如

df.writeStream.format("text").... 
+0

虽然我打算改变foreach代码中的输出路径,这取决于行中的内容。我需要找到一种方法来做到这一点。 –

+0

不确定您是否可以使用'DataStreamWriter.partitionBy' – zsxwing

+0

美丽,是的。非常感谢你。 –