2017-07-03 29 views
0

我不能让Spark读取json(或csv)作为Dataset案例类Option[_]字段其中并非所有字段都在源中定义。Spark默认空列DataSet

这是一个有点神秘,但让我们说我有一个名为的情况下,类CustomData

考虑下面的JSON文件(customA.json):

{"id":123, "colA": "x", "colB": "z"} 
{"id":456, "colA": "y"} 
{"id":789,    "colB": "a"} 

以下代码:

import org.apache.spark.sql.SparkSession 

val spark = SparkSession.builder() 
    .master("local[2]") 
    .appName("test") 
    .getOrCreate() 

import spark.implicits._ 

case class CustomData(id: BigInt, colA: Option[String], colB: Option[String]) 
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this) 

val ds = spark 
    .read 
    .option("mode", "PERMISSIVE") 
    .json("src/main/resources/customA.json") 
    .as[CustomData] 
    .show() 

输出与预期的一样 - :

+----+----+---+ 
|colA|colB| id| 
+----+----+---+ 
| x| z|123| 
| y|null|456| 
|null| a|789| 
+----+----+---+ 

尽管并非所有的列都是定义的。 但是,如果我想使用相同的代码读取其中的一列出现无处一个文件,我不能做到这一点:

对于其他JSON文件(customB.json):

{"id":321, "colA": "x"} 
{"id":654, "colA": "y"} 
{"id":987} 

和附加代码:

val ds2 = spark 
    .read 
    .option("mode", "PERMISSIVE") 
    .json("src/main/resources/customB.json") 
    .as[CustomData] 
    .show() 

的输出是一个错误:

org.apache.spark.sql.AnalysisExcept ion:无法解析'colB'给定的输入列:[colA,id];

这是有道理的,但我很想重复使用这两个文件相同的案例类。特别是如果我不知道在摄取它之前甚至会在json文件中出现colB

当然,我可以检查,但有没有办法将不存在的列转换为null(与customA.json一样)。将readmode设置为Permissive似乎没有改变任何内容。

我错过了什么吗?

回答

1

我会把答案在这里。向你展示什么(某种)的作品,但看起来 hacky恕我直言。

通过使用一种方法扩展DataFrame来强制案例类的StructType在已有的StructType的顶端,它实际上可行,但也许(我真的希望)有更好的/更干净的解决方案。

这里所说:

import org.apache.spark.sql.types.StructType 
import org.apache.spark.sql.DataFrame 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.catalyst.ScalaReflection 
import scala.reflect.runtime.universe._ 

case class DataFrameExtended(dataFrame: DataFrame) { 

    def forceMergeSchema[T: TypeTag]: DataFrame = { 
    ScalaReflection 
     .schemaFor[T] 
     .dataType 
     .asInstanceOf[StructType] 
     .filterNot(
     field => dataFrame.columns.contains(field.name) 
    ) 
     .foldLeft(dataFrame){ 
     case (newDf, field) => newDf.withColumn(field.name, lit(null).cast(field.dataType)) 
     } 
    } 
} 

implicit def dataFrameExtended(df: DataFrame): DataFrameExtended = { 
    DataFrameExtended(df) 
} 

val ds2 = spark 
    .read 
    .option("mode", "PERMISSIVE") 
    .json("src/main/resources/customB.json") 
    .forceMergeSchema[CustomData] 
    .as[CustomData] 
    .show() 

现在显示的结果,我希望:

+----+---+----+ 
|colA| id|colB| 
+----+---+----+ 
| x|321|null| 
| y|654|null| 
|null|987|null| 
+----+---+----+ 

我只用标量类型,如(智力,字符串等),我想更多尝试这样做复杂的结构将会失败。所以我仍然在寻找更好的答案。