2015-12-02 71 views
1

我的问题与此类似:Apache Spark SQL issue : java.lang.RuntimeException: [1.517] failure: identifier expected但我无法弄清楚问题出在哪里。我使用SQLite作为数据库后端。连接和简单的选择语句正常工作。Apache Spark SQL标识符预期异常

的问题的行:

val df = tableData.selectExpr(tablesMap(t).toSeq:_*).map(r => myMapFunc(r)) 

tablesMap包含表名作为关键字和字符串作为表达式的阵列。打印后,阵列如下所示:

WrappedArray([My Col A], [ColB] || [Col C] AS ColB) 

表名也包含在方括号中,因为它包含空格。我得到的例外:

Exception in thread "main" java.lang.RuntimeException: [1.1] failure: identifier expected 

我已经确保不要使用任何Spark Sql关键字。在我看来,这个代码失败的原因有两种:1)我以某种方式处理列名中的空格错误。 2)我处理串接错误。

我正在使用类似CSV的资源文件,其中包含我想在我的表格上进行评估的表达式。除了这个文件,我希望允许用户在运行时指定附加表和它们各自的列表达式。该文件看起来像这样:

TableName,`Col A`,`ColB`,CONCAT(`ColB`, ' ', `Col C`) 

Appartently这是行不通的。不过,我想重新使用这个文件,当然修改。我的想法是将列与来自字符串数组的表达式(比如现在)映射到一系列的火花列。 (这对我来说是唯一可以想到的解决方案,因为我希望避免为所有这些特性拉入所有的Hive依赖关系。)我将为我的表达式引入一个小的语法,以将原始列名称标记为$和一些关键字功能如concatas。但我怎么能这样做?我尝试过这样的事情,但它甚至远离编译。

def columnsMapFunc(expr: String) : Column = { 
    if(expr(0) == '$') 
     return expr.drop(1) 
    else 
     return concat(extractedColumnNames).as(newName) 
} 

回答

3

使用包含空格的名称一般来说是要求的问题,但与反引号替换方括号中应该解决的问题:

val df = sc.parallelize(Seq((1,"A"), (2, "B"))).toDF("f o o", "b a r") 
df.registerTempTable("foo bar") 

df.selectExpr("`f o o`").show 

// +-----+ 
// |f o o| 
// +-----+ 
// | 1| 
// | 2| 
// +-----+ 

sqlContext.sql("SELECT `b a r` FROM `foo bar`").show 

// +-----+ 
// |b a r| 
// +-----+ 
// | A| 
// | B| 
// +-----+ 

对于你必须使用concat功能串联:

df.selectExpr("""concat(`f o o`, " ", `b a r`)""").show 

// +----------------------+ 
// |'concat(f o o, ,b a r)| 
// +----------------------+ 
// |     1 A| 
// |     2 B| 
// +----------------------+ 

,但它需要Spark 1.4.0中的HiveContext

在实践中的

df.toDF("foo", "bar") 
// org.apache.spark.sql.DataFrame = [foo: int, bar: string] 

和使用功能的,而不是表达的字符串加载数据之后,我只想重新命名列(concat功能仅在星火> = 1.5.0可用,1.4和更早版本,您需要一个UDF):

import org.apache.spark.sql.functions.concat 

df.select($"f o o", concat($"f o o", lit(" "), $"b a r")).show 

// +----------------------+ 
// |'concat(f o o, ,b a r)| 
// +----------------------+ 
// |     1 A| 
// |     2 B| 
// +----------------------+ 

还有concat_ws函数,它接受分离器的第一个参数:

df.selectExpr("""concat_ws(" ", `f o o`, `b a r`)""") 
df.select($"f o o", concat_ws(" ", $"f o o", $"b a r")) 
+0

concat_ws完美地工作。谢谢 – flowit