我与DataFrames工作的哪些元素都得到了类似的模式:星火 - 递归函数为UDF生成异常
root
|-- NPAData: struct (nullable = true)
| |-- NPADetails: struct (nullable = true)
| | |-- location: string (nullable = true)
| | |-- manager: string (nullable = true)
| |-- service: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- serviceName: string (nullable = true)
| | | |-- serviceCode: string (nullable = true)
|-- NPAHeader: struct (nullable = true)
| | |-- npaNumber: string (nullable = true)
| | |-- date: string (nullable = true)
在我的数据帧我想组具有相同NPAHeader.code
的所有元素,所以做到这一点,我用下面一行:
val groupedNpa = orderedNpa.groupBy($"NPAHeader.code").agg(collect_list(struct($"NPAData",$"NPAHeader")).as("npa"))
我有以下模式的数据帧之后:
StructType(StructField(npaNumber,StringType,true), StructField(npa,ArrayType(StructType(StructField(NPAData...)))))
每一行的一个例子是类似的东西:
[1234,WrappedArray([npaNew,npaOlder,...npaOldest])]
现在,我要的是生成与他又拿起另一个数据框只是在WrappedArray的元素之一,所以我想类似的输出:
[1234,npaNew]
注意:从WrappedArray中选择的元素是遍历整个WrappedArray后匹配完整的逻辑的元素。但为了简化这个问题,我会总是拾取WrappedArray的最后一个元素(,在遍历它后重复)。
要做到这一点,我想定义一个recurside UDF
import org.apache.spark.sql.functions.udf
def returnRow(elementList : Row)(index:Int): Row = {
val dif = elementList.size - index
val row :Row = dif match{
case 0 => elementList.getAs[Row](index)
case _ => returnRow(elementList)(index + 1)
}
row
}
val returnRow_udf = udf(returnRow _)
groupedNpa.map{row => (row.getAs[String]("npaNumber"),returnRow_udf(groupedNpa("npa")(0)))}
但我收到以下错误的地图:
异常线程 “main” java.lang.UnsupportedOperationException : 类型Int的架构=>不支持单元
我在做什么错?
顺便说一下,我不确定我是否正确传递npa
列groupedNpa("npa")
。我accesing的WrappedArray作为行,因为我不知道如何通过Array[Row]
迭代(在get(index)
方法不存在于数组[行])
我想改变我的代码尽可能地按照您的指示,但我是被迫向上移动到我的一些原始的办法,因为在那里我需要运行这个具有星火1.6,按我的理解服务器groupByKey,mapGroups和reduceGroups,这将很容易我的生活很多,不能在该版本中使用。 –
这是新的情况,如果你想要checl。 https://stackoverflow.com/q/46463931/1773841我做了几个更新,这就是为什么我更愿意问一个不同的问题,而不是一次又一次地更新。我在Window()中添加partitionBy和orderBy以避免您指出的问题。我使用了一个“普通”函数,希望能够从地图中调用,所以我不会在返回类型中有限制。我知道RDD可以完成这件事,但我对DF并不乐观。 –