2016-06-07 23 views
0

我有一个RDD [行]有每行的以下数据星火:转换RDD [行]到数据帧,其中行中的一列是一个列表

[guid, List(peopleObjects)] 
["123", List(peopleObjects1, peopleObjects2, peopleObjects3)] 

我想将其转换为一个数据帧
我使用下面的代码

val personStructureType = new StructType() 
    .add(StructField("guid", StringType, true)) 
    .add(StructField("personList", StringType, true)) 
val personDF = hiveContext.createDataFrame(personRDD, personStructureType) 

我应该使用不同的数据类型为我的架构,而不是StringType?

如果我的名单只是它工作的字符串,但是当它是一个列表,我得到以下错误

scala.MatchError: List(personObject1, personObject2, personObject3) (of class scala.collection.immutable.$colon$colon) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:295) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:294) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) 
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401) 
    at org.apache.spark.sql.SQLContext$$anonfun$7.apply(SQLContext.scala:445) 
    at org.apache.spark.sql.SQLContext$$anonfun$7.apply(SQLContext.scala:445) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:219) 
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
+0

什么类型'peopleObject'?如果它是'case class',你能否包含它的定义?更好的办法是创建你的'RDD'的一些示例代码。 –

回答

2

这不是完全清楚你正在尝试做的,但更好的方法,你是什么试图做的是创建一个case class,然后将您的RDD行映射到case class,然后调用toDF

喜欢的东西:

case class MyClass(guid: Int, peopleObjects: List[String]) 

val rdd = sc.parallelize(Array((123,List("a","b")),(1232,List("b","d")))) 

val df = rdd.map(r => MyClass(r._1, r._2)).toDF 
df.show 
+----+-------------+ 
|guid|peopleObjects| 
+----+-------------+ 
| 123|  [a, b]| 
|1232|  [b, d]| 
+----+-------------+ 

或者你也可以做到这一点的长手的方式,但不使用的情况下类,像这样:

val df = sqlContext.createDataFrame(
    rdd.map(r => Row(r._1, r._2)), 
    StructType(Array(
    StructField("guid",IntegerType), 
    StructField("peopleObjects", ArrayType(StringType)) 
)) 
) 
+0

谢谢大卫。这有助于我获得一些理解。 peopleObjects类具有名称和位置等属性。我希望能够将peopleObjects列表作为类型传递给数据框。这样当我想创建我的最终输出格式时,我不必拆卸和重建对象。现在我有一个guid与一个peopleObject关联,我在guid上获得一个peopleObject列表的groupbykey。我打算在众多桌子上做同样的行为,然后通过指导加入他们。然后以特定格式创建最终输出。 –

+0

现在我只是创建json对象,将其作为字符串传递并完成所有联接。然后重建对象,修改json并创建我的最终输出。 –

+0

这是一个正确和有用的答案。 @JohnEngelhart你应该接受它。 – Sim