4

获取火花此空错误Dataset.filter星火2数据集Null值异常

输入CSV:

name,age,stat 
abc,22,m 
xyz,,s 

工作代码:

case class Person(name: String, age: Long, stat: String) 

val peopleDS = spark.read.option("inferSchema","true") 
    .option("header", "true").option("delimiter", ",") 
    .csv("./people.csv").as[Person] 
peopleDS.show() 
peopleDS.createOrReplaceTempView("people") 
spark.sql("select * from people where age > 30").show() 

失败代码(添加以下行返回错误):

val filteredDS = peopleDS.filter(_.age > 30) 
filteredDS.show() 

返回NULL错误

java.lang.RuntimeException: Null value appeared in non-nullable field: 
- field (class: "scala.Long", name: "age") 
- root class: "com.gcp.model.Person" 
If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int). 

回答

9

异常你应该解释一切,但让我们一步一步:

  • 当使用csv数据源加载数据的所有字段都被标记为nullable

    val path: String = ??? 
    
    val peopleDF = spark.read 
        .option("inferSchema","true") 
        .option("header", "true") 
        .option("delimiter", ",") 
        .csv(path) 
    
    peopleDF.printSchema 
    
    root 
    |-- name: string (nullable = true) 
    |-- age: integer (nullable = true) 
    |-- stat: string (nullable = true) 
    
  • 缺少字段表示为SQL NULL

    peopleDF.where($"age".isNull).show 
    
    +----+----+----+ 
    |name| age|stat| 
    +----+----+----+ 
    | xyz|null| s| 
    +----+----+----+ 
    
  • 接下来转换到Dataset[Row]其中Dataset[Person]使用Long编码age领域。斯卡拉的Long不能是null。因为输入模式为nullable,输出架构保持nullable尽管认为:

    val peopleDS = peopleDF.as[Person] 
    
    peopleDS.printSchema 
    
    root 
    |-- name: string (nullable = true) 
    |-- age: integer (nullable = true) 
    |-- stat: string (nullable = true) 
    

    注意,它as[T]不影响模式可言。

  • 当您使用SQL(在注册表上)或DataFrame API查询Dataset API Spark不会反序列化对象。由于架构仍然是nullable我们可以执行:

    peopleDS.where($"age" > 30).show 
    
    +----+---+----+ 
    |name|age|stat| 
    +----+---+----+ 
    +----+---+----+ 
    

    没有任何问题。这只是一个普通的SQL逻辑,NULL是一个有效的值。

  • 当我们使用静态类型Dataset API:

    peopleDS.filter(_.age > 30) 
    

    星火有反序列化对象。因为Long不能是null(SQL NULL),它会失败,但您看到的异常。

    如果不是因为你会得到NPE。

  • 数据的
  • 正确的静态类型的表示应该用Optional类型:

    case class Person(name: String, age: Option[Long], stat: String) 
    

    与调整过滤功能:

    peopleDS.filter(_.age.map(_ > 30).getOrElse(false)) 
    
    +----+---+----+ 
    |name|age|stat| 
    +----+---+----+ 
    +----+---+----+ 
    

    如果你喜欢,你可以使用模式匹配:

    peopleDS.filter { 
        case Some(age) => age > 30 
        case _   => false  // or case None => false 
    } 
    

    请注意,您不必(但仍会推荐)使用namestat的可选类型。因为斯卡拉String只是一个Java String它可以null。当然,如果你采用这种方法,你必须明确地检查访问值是否为null

相关Spark 2.0 Dataset vs DataFrame