2016-09-28 26 views
12

我有一个数据帧,我试图展平。作为该过程的一部分,我想分解它,所以如果我有一列数组,数组的每个值将用于创建一个单独的行。例如,Spark sql如何在不丢失空值的情况下如何爆炸

id | name | likes 
_______________________________ 
1 | Luke | [baseball, soccer] 

应该成为

id | name | likes 
_______________________________ 
1 | Luke | baseball 
1 | Luke | soccer 

这是我的代码

private DataFrame explodeDataFrame(DataFrame df) { 
    DataFrame resultDf = df; 
    for (StructField field : df.schema().fields()) { 
     if (field.dataType() instanceof ArrayType) { 
      resultDf = resultDf.withColumn(field.name(), org.apache.spark.sql.functions.explode(resultDf.col(field.name()))); 
      resultDf.show(); 
     } 
    } 
    return resultDf; 
} 

的问题是,在我的数据,一些阵列列值为空。在这种情况下,整个行被删除。所以这个数据帧:

id | name | likes 
_______________________________ 
1 | Luke | [baseball, soccer] 
2 | Lucy | null 

成为

id | name | likes 
_______________________________ 
1 | Luke | baseball 
1 | Luke | soccer 

,而不是

id | name | likes 
_______________________________ 
1 | Luke | baseball 
1 | Luke | soccer 
2 | Lucy | null 

我怎么能爆炸我的阵列,这样我就不会失去空行?

我使用星火1.5.2和Java 8

回答

20

星火2.2+

可以使用explode_outer功能:

import org.apache.spark.sql.functions.explode_outer 

df.withColumn("likes", explode_outer($"likes")).show 

// +---+----+--------+ 
// | id|name| likes| 
// +---+----+--------+ 
// | 1|Luke|baseball| 
// | 1|Luke| soccer| 
// | 2|Lucy| null| 
// +---+----+--------+ 

星火< = 2.1

在斯卡拉,但Java应该是几乎相同(导入个人功能使用import static)。

import org.apache.spark.sql.functions.{array, col, explode, lit, when} 

val df = Seq(
    (1, "Luke", Some(Array("baseball", "soccer"))), 
    (2, "Lucy", None) 
).toDF("id", "name", "likes") 

df.withColumn("likes", explode(
    when(col("likes").isNotNull, col("likes")) 
    // If null explode an array<string> with a single null 
    .otherwise(array(lit(null).cast("string"))))) 

这里的想法是基本上具有所需类型的array(NULL)取代NULL。对于复杂型(又名structs),你必须提供完整的模式:

val dfStruct = Seq((1L, Some(Array((1, "a")))), (2L, None)).toDF("x", "y") 

val st = StructType(Seq(
    StructField("_1", IntegerType, false), StructField("_2", StringType, true) 
)) 

dfStruct.withColumn("y", explode(
    when(col("y").isNotNull, col("y")) 
    .otherwise(array(lit(null).cast(st))))) 

dfStruct.withColumn("y", explode(
    when(col("y").isNotNull, col("y")) 
    .otherwise(array(lit(null).cast("struct<_1:int,_2:string>"))))) 

注意

如果阵列Column已经与containsNull集创建false你应该首先改变(用Spark 2.1测试):

df.withColumn("array_column", $"array_column".cast(ArrayType(SomeType, true))) 
+0

看起来不错,谢谢!我有一个后续问题:如果我的列类型是一个StructType?我尝试使用cast(新的StructType()),但是我得到了'数据类型不匹配:THEN和ELSE表达式应该都是相同的类型或强制到一个常见的类型;'我试图使我的方法尽可能通用,所以它适合所有列类型。 – alexgbelov

+0

此外,要获取列类型,我使用DataFrame.dtypes()。有没有更好的方法来获取列类型? – alexgbelov

+1

a)您必须提供所有字段的完整模式。 b)'dtypes'或'schema'。 – zero323

0

根据已接受的答案,当数组元素是复杂类型时,可能很难用手来定义它(例如,使用大型结构)。

来自动执行我写了下面的辅助方法:

相关问题