2016-09-28 63 views
0

假设我有以下RDD:星火:一个RDD的格式转换成数据帧

val rdd = sc.parallelize(Seq(('a'.toString,1.1,Array(1.1,2.2),0), 
          ('b'.toString,1.5,Array(1.4,4.2),3), 
          ('d'.toString,2.1,Array(3.3,7.4),4))) 

>>>rdd: org.apache.spark.rdd.RDD[(String,Double,Array[Double],Int)] 

而且我想用.write.format("com.databricks.spark.csv")这需要一个数据框写入输出csv格式。

因此,首先我需要将当前模式转换为 - >rdd[(String, String, String, String, String)],并将其转换为df后。我试过如下:

rdd.map { case((a,b,c,d)) => (a,b,c.mkString(","),d)} 

但这输出:

rdd[(string,double,string,int)] 

任何想法,该怎么办呢?

+1

有一个用于此目的[spark-csv]的库(https://github.com/databricks/spark-csv/),它适用于2.0之前的版本。 –

回答

0

UPDATE

要与元组的工作,你必须知道你要多少元素摆在他们自己定义的使用情况。因此,要处理可变数量的元素,您可能需要使用一些集合。 为您的使用情况下,像这样可以工作:

rdd.map { case((a,b,c,d)) => a +: (b +: c) :+ d}.map(_.mkString(",")) 

这将导致RDD[String]对应CSV文件的每一行。 您正在预先考虑并将其他元素附加到Array“c”以产生单个Array

+0

这个答案很明显。我正在寻找更优雅的东西。 btw'c'有300多个值。 –

+0

应该在问题中提到。更新了答案。 – septra