2017-09-27 28 views
0

我与DataFrames工作的哪些元素都得到了类似的模式:星火 - 递归函数为UDF生成异常

root 
|-- NPAData: struct (nullable = true) 
| |-- NPADetails: struct (nullable = true) 
| | |-- location: string (nullable = true) 
| | |-- manager: string (nullable = true) 
| |-- service: array (nullable = true) 
| | |-- element: struct (containsNull = true) 
| | | |-- serviceName: string (nullable = true) 
| | | |-- serviceCode: string (nullable = true) 
|-- NPAHeader: struct (nullable = true) 
| | |-- npaNumber: string (nullable = true) 
| | |-- date: string (nullable = true) 

在我的数据帧我想组具有相同NPAHeader.code的所有元素,所以做到这一点,我用下面一行:

val groupedNpa = orderedNpa.groupBy($"NPAHeader.code").agg(collect_list(struct($"NPAData",$"NPAHeader")).as("npa")) 

我有以下模式的数据帧之后:

StructType(StructField(npaNumber,StringType,true), StructField(npa,ArrayType(StructType(StructField(NPAData...))))) 

每一行的一个例子是类似的东西:

[1234,WrappedArray([npaNew,npaOlder,...npaOldest])] 

现在,我要的是生成与他又拿起另一个数据框只是在WrappedArray的元素之一,所以我想类似的输出:

[1234,npaNew] 

注意:从WrappedArray中选择的元素是遍历整个WrappedArray后匹配完整的逻辑的元素。但为了简化这个问题,我会总是拾取WrappedArray的最后一个元素(,在遍历它后重复)。

要做到这一点,我想定义一个recurside UDF

import org.apache.spark.sql.functions.udf 

def returnRow(elementList : Row)(index:Int): Row = { 
    val dif = elementList.size - index 
    val row :Row = dif match{ 
    case 0 => elementList.getAs[Row](index) 
    case _ => returnRow(elementList)(index + 1) 
    } 
    row 
} 

val returnRow_udf = udf(returnRow _) 


groupedNpa.map{row => (row.getAs[String]("npaNumber"),returnRow_udf(groupedNpa("npa")(0)))} 

但我收到以下错误的地图:

异常线程 “main” java.lang.UnsupportedOperationException : 类型Int的架构=>不支持单元

我在做什么错?

顺便说一下,我不确定我是否正确传递npagroupedNpa("npa")。我accesing的WrappedArray作为行,因为我不知道如何通过Array[Row]迭代(在get(index)方法不存在于数组[行])

回答

1

TL; DR只需使用的中描述的方法之一How to select the first row of each group?

如果你想使用复杂的逻辑,并返回可以跳过SQL API和使用groupByKey

val f: (String, Iterator[org.apache.spark.sql.Row]) => Row 
val encoder: Encoder 
df.groupByKey(_.getAs[String]("NPAHeader.code")).mapGroups(f)(encoder) 

或更好:

val g: (Row, Row) => Row 

df.groupByKey(_.getAs[String]("NPAHeader.code")).reduceGroups(g) 

其中encoder是有效的RowEncoderEncoder error while trying to map dataframe row to updated row)。

你的代码是在多个方面有缺陷:

  • groupBy不保证值的顺序。所以:

    orderBy(...).groupBy(....).agg(collect_list(...)) 
    

    可以有非确定性输出。如果你真的决定走这条路线,你应该跳过orderBy并明确排列收集的数组。

  • 您无法将咖喱功能传递给udf。你必须先解除它,但它需要不同的参数顺序(见下面的例子)。

  • 如果你能,这可能是正确的方法来调用它(请注意,你忽略第二个参数):

    returnRow_udf(groupedNpa("npa")(0)) 
    

    更糟糕的是,你怎么称呼它里面map,其中udfs不完全适用。

  • udf不能返回。它必须返回external Scala type

  • array<struct>的外部表示是Seq[Row]。你不能用来代替它。
  • SQL阵列可以通过索引与apply来访问:

    df.select($"array"(size($"array") - 1)) 
    

    但它不是一个正确的方法由于非确定性。您可以申请sort_array,但正如开头所指出的那样,有更有效的解决方案。

  • 令人惊讶的是,递归并不那么相关。你可以设计功能是这样的:

    def size(i: Int=0)(xs: Seq[Any]): Int = xs match { 
        case Seq() => i 
        case null => i 
        case Seq(h, t @ _*) => size(i + 1)(t) 
    } 
    
    val size_ = udf(size() _) 
    

    ,它会工作得很好:

    Seq((1, Seq("a", "b", "c"))).toDF("id", "array") 
        .select(size_($"array")) 
    

    虽然递归是一种矫枉过正,如果你可以遍历Seq

+0

我想改变我的代码尽可能地按照您的指示,但我是被迫向上移动到我的一些原始的办法,因为在那里我需要运行这个具有星火1.6,按我的理解服务器groupByKey,mapGroups和reduceGroups,这将很容易我的生活很多,不能在该版本中使用。 –

+0

这是新的情况,如果你想要checl。 https://stackoverflow.com/q/46463931/1773841我做了几个更新,这就是为什么我更愿意问一个不同的问题,而不是一次又一次地更新。我在Window()中添加partitionBy和orderBy以避免您指出的问题。我使用了一个“普通”函数,希望能够从地图中调用,所以我不会在返回类型中有限制。我知道RDD可以完成这件事,但我对DF并不乐观。 –