2015-06-10 32 views
4

我需要使用sql方法在SparkSQL中嵌套结构的帮助。我创建了这样的结构在现有RDD(dataRDD)的顶部上的数据帧:SparkSQL - 访问嵌套结构Row(field1,field2 = Row(..))

schema=StructType([ StructField("m",LongType()) , 
        StructField("field2", StructType([ 
        StructField("st",StringType()), 
        StructField("end",StringType()), 
        StructField("dr",IntegerType()) ])) 
        ]) 

printSchema()返回此:

root 
|-- m: long (nullable = true) 
|-- field2: struct (nullable = true) 
| |-- st: string (nullable = true) 
| |-- end: string (nullable = true) 
| |-- dr: integer (nullable = true) 

创建从数据RDD数据帧和应用了架构运作良好。

df= sqlContext.createDataFrame(dataRDD, schema) 
df.registerTempTable("logs") 

但检索数据是不工作:

res = sqlContext.sql("SELECT m, field2.st FROM logs") # <- This fails 

...org.apache.spark.sql.AnalysisException: cannot resolve 'field.st' given input columns msisdn, field2; 

res = sqlContext.sql("SELECT m, field2[0] FROM logs") # <- Also fails 
...org.apache.spark.sql.AnalysisException: unresolved operator 'Project [field2#1[0] AS c0#2]; 

res = sqlContext.sql("SELECT m, st FROM logs") # <- Also not working 
...cannot resolve 'st' given input columns m, field2; 

所以,我怎么能访问嵌套结构的SQL语法? 感谢

回答

3

你有别的事情发生在你的测试,因为field2.st是正确的语法:

case class field2(st: String, end: String, dr: Int) 

val schema = StructType(
    Array(
    StructField("m",LongType), 
    StructField("field2", StructType(Array(
     StructField("st",StringType), 
     StructField("end",StringType), 
     StructField("dr",IntegerType) 
    ))) 
) 
) 

val df2 = sqlContext.createDataFrame(
    sc.parallelize(Array(Row(1,field2("this","is",1234)),Row(2,field2("a","test",5678)))), 
    schema 
) 

/* df2.printSchema 
root 
|-- m: long (nullable = true) 
|-- field2: struct (nullable = true) 
| |-- st: string (nullable = true) 
| |-- end: string (nullable = true) 
| |-- dr: integer (nullable = true) 
*/ 

val results = sqlContext.sql("select m,field2.st from df2") 

/* results.show 
m st 
1 this 
2 a 
*/ 

回头看看你的错误信息:cannot resolve 'field.st' given input columns msisdn, field2 - fieldfield2。再次检查您的代码 - 名称不对齐。

+0

是的,这是一个错字。但仍然field2.st引发此错误: 18:19:05 WARN TaskSetManager:在阶段1.0(TID 2,BICHDP2.TD)中丢失的任务1.0:java.lang.ClassCastException:java.util.ArrayList不能转换为组织。 apache.spark.sql.Row at org.apache.spark.sql.catalyst.expressions.StructGetField.eval(complexTypes.scala:93) at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions .scala:113) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68) at – frengel

相关问题