2015-10-14 86 views
4

的RDD以格式Array[Array[String]]被创建并具有以下值:转换RDD在星火数据帧/斯卡拉

Array[Array[String]] = Array(Array(4580056797, 0, 2015-07-29 10:38:42, 0, 1, 1), Array(4580056797, 0, 2015-07-29 10:38:42, 0, 1, 1), Array(4580056797, 0, 2015-07-29 10:38:42, 0, 1, 1), Array(4580057445, 0, 2015-07-29 10:40:37, 0, 1, 1), Array(4580057445, 0, 2015-07-29 10:40:37, 0, 1, 1)) 

我想创建一个模式的数据帧:

val schemaString = "callId oCallId callTime duration calltype swId" 

后续步骤:

scala> val rowRDD = rdd.map(p => Array(p(0), p(1), p(2),p(3),p(4),p(5).trim)) 
rowRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[14] at map at <console>:39 
scala> val calDF = sqlContext.createDataFrame(rowRDD, schema) 

给出以下错误:

console:45: error: overloaded method value createDataFrame with alternatives: (rdd: org.apache.spark.api.java.JavaRDD[],beanClass: Class[])org.apache.spark.sql.DataFrame (rdd: org.apache.spark.rdd.RDD[],beanClass: Class[])org.apache.spark.sql.DataFrame (rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame (rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame cannot be applied to (org.apache.spark.rdd.RDD[Array[String]],
org.apache.spark.sql.types.StructType) val calDF = sqlContext.createDataFrame(rowRDD, schema)

回答

7

只需粘贴到spark-shell

val a = 
    Array(
    Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"), 
    Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1")) 

val rdd = sc.makeRDD(a) 

case class X(callId: String, oCallId: String, 
    callTime: String, duration: String, calltype: String, swId: String) 

然后map()在RDD创建案例类的实例,然后创建一个使用toDF()数据帧:

scala> val df = rdd.map { 
    case Array(s0, s1, s2, s3, s4, s5) => X(s0, s1, s2, s3, s4, s5) }.toDF() 
df: org.apache.spark.sql.DataFrame = 
    [callId: string, oCallId: string, callTime: string, 
    duration: string, calltype: string, swId: string] 

这推断案例类的模式。

然后你就可以着手:

scala> df.printSchema() 
root 
|-- callId: string (nullable = true) 
|-- oCallId: string (nullable = true) 
|-- callTime: string (nullable = true) 
|-- duration: string (nullable = true) 
|-- calltype: string (nullable = true) 
|-- swId: string (nullable = true) 

scala> df.show() 
+----------+-------+-------------------+--------+--------+----+ 
| callId|oCallId|   callTime|duration|calltype|swId| 
+----------+-------+-------------------+--------+--------+----+ 
|4580056797|  0|2015-07-29 10:38:42|  0|  1| 1| 
|4580056797|  0|2015-07-29 10:38:42|  0|  1| 1| 
+----------+-------+-------------------+--------+--------+----+ 

如果你想在一个正常的程序(而不是在spark-shell),请务必使用toDF()(从here引用):

  • import sqlContext.implicits._创建后立即SQLContext
  • 定义该方法以外的案例类使用toDF()
+0

真棒答案,我从这里得到了一切所需。非常感谢 – sparkDabbler

1

我假设你schema是,像在Spark Guide,如下:

val schema = 
    StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) 

如果你看一下createDataFrame的签名,这里是接受StructType作为第二个参数(一为斯卡拉)

def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

Creates a DataFrame from an RDD containing Rows using the given schema.

因此它接受作为第一个参数a RDD[Row]rowRDD中有什么是RDD[Array[String]],因此存在不匹配问题。

你需要一个RDD[Array[String]]

否则,您可以使用以下方法来创建数据帧:

val rowRDD = rdd.map(p => Row(p(0), p(1), p(2),p(3),p(4),p(5).trim)) 
+0

谢谢@ccheneson,这也可以用于我的问题 – sparkDabbler

4

您需要首先Array转换你进入,然后定义模式。我做了假设,即大部分的字段是Long

val rdd: RDD[Array[String]] = ??? 
    val rows: RDD[Row] = rdd map { 
     case Array(callId, oCallId, callTime, duration, swId) => 
     Row(callId.toLong, oCallId.toLong, callTime, duration.toLong, swId.toLong) 
    } 

    object schema { 
     val callId = StructField("callId", LongType) 
     val oCallId = StructField("oCallId", StringType) 
     val callTime = StructField("callTime", StringType) 
     val duration = StructField("duration", LongType) 
     val swId = StructField("swId", LongType) 

     val struct = StructType(Array(callId, oCallId, callTime, duration, swId)) 
    } 

    sqlContext.createDataFrame(rows, schema.struct) 
+0

谢谢@Eugene,这个解决方案也适用于我,而且非常优雅 – sparkDabbler