2015-11-05 42 views
1

我正在使用Spark SQL(我提到它在Spark中,以防影响SQL语法 - 我还不够熟悉)和I有一张表,我正在尝试重新构建。我有一个本地工作的方法,但是当我尝试在AWS EC2实例我得到一个错误的报告,我有一个“未解决运营商Spark SQL'explode'命令在AWS EC2上失败,但在本地成功执行

基本上我有数据,看起来像运行相同的命令:

userId someString  varA 
    1  "example1"  [0,2,5] 
    2  "example2"  [1,20,5] 

我在varA的sqlContext中使用'explode'命令。当我在本地运行时,事情会正确返回,但在AWS上它们会失败。

我可以用下面的命令重现此:

val data = List(
    ("1", "example1", Array(0,2,5)), ("2", "example2", Array(1,20,5))) 
val distData = sc.parallelize(data) 
val distTable = distData.toDF("userId", "someString", "varA") 
distTable.registerTempTable("distTable_tmp") 
val temp1 = sqlContext.sql("select userId, someString, varA from distTable_tmp") 
val temp2 = sqlContext.sql(
    "select userId, someString, explode(varA) as varA from distTable_tmp") 

本地,temp1.show()和temp2.​​show()返回我所期望的,即:

scala> temp1.show() 
+------+----------+----------+ 
|userId|someString|  varA| 
+------+----------+----------+ 
|  1| example1| [0, 2, 5]| 
|  2| example2|[1, 20, 5]| 
+------+----------+----------+ 

scala> temp2.show() 
+------+----------+----+ 
|userId|someString|varA| 
+------+----------+----+ 
|  1| example1| 0| 
|  1| example1| 2| 
|  1| example1| 5| 
|  2| example2| 1| 
|  2| example2| 20| 
|  2| example2| 5| 
+------+----------+----+ 

但在AWS上,temp1 sqlContext命令正常工作,但temp2失败并显示以下消息:

scala> val temp2 = sqlContext.sql("select userId, someString, explode(varA) as varA from distTable_tmp") 
15/11/05 22:46:49 INFO parse.ParseDriver: Parsing command: select userId, someString, explode(varA) as varA from distTable_tmp 
15/11/05 22:46:49 INFO parse.ParseDriver: Parse Completed 
org.apache.spark.sql.AnalysisException: unresolved operator 'Project [userId#3,someString#4,HiveGenericUdtf#org.apache.hadoop.hive.ql.udf.generic.GenericUDTFExplode(varA#5) AS varA#6]; 
... 

非常感谢。

+0

你能告诉你如何创建'sqlContext'吗?你用什么版本的Spark? – zero323

+0

@ zero323 - 我只是通过本地“./bin/spark-shell”和AWS上的'MASTER = yarn-client/home/hadoop/spark/bin/spark-shell'打开Spark Scala shell - 我没有认为除了默认的似乎为我加载的sqlContext以外的任何东西。 Spark版本是1.5.1(本地)和1.3.1(AWS)。 – anthr

回答

1

问题的根源是您在EC2上使用的Spark版本。在Spark 1.4中引入了explode函数,因此无法在1.3.1上运行。它可以使用RDDflatMap这样的:

import org.apache.spark.sql.Row 
import org.apache.spark.rdd.RDD 
import org.apache.spark.sql.types.{StructType, StructField, IntegerType} 

val rows: RDD[Row] = distTable.rdd.flatMap(
    row => row.getAs[Seq[Int]](2).map(v => Row.fromSeq(row.toSeq :+ v))) 
val newSchema = StructType(
    distTable.schema.fields :+ StructField("varA_exploded", IntegerType, true)) 

sqlContext.createDataFrame(rows, newSchema).show 

// userId someString varA     varA_exploded 
// 1  example1 ArrayBuffer(0, 2, 5) 0    
// 1  example1 ArrayBuffer(0, 2, 5) 2    
// 1  example1 ArrayBuffer(0, 2, 5) 5    
// 2  example2 ArrayBuffer(1, 20... 1    
// 2  example2 ArrayBuffer(1, 20... 20   
// 2  example2 ArrayBuffer(1, 20... 5  

,但怀疑这是值得大惊小怪。

相关问题