你可以使用GROUPBY /聚集功能和窗口功能的组合实现这一目标。
我们认为这是ordersDf:
+-------+--------------+-----------+
|orderId| orderDetails|destination|
+-------+--------------+-----------+
| 1|[11,abc,item1]| loc1|
| 2|[12,abc,item2]| loc1|
| 3|[13,abc,item1]| loc1|
| 4|[14,abc,item1]| loc2|
| 5|[15,abc,item2]| loc2|
| 6|[11,abc,item2]| loc2|
| 7|[11,abc,item2]| loc2|
+-------+--------------+-----------+
首先,按目的地和项目的数据并计算每个项目的频率。
val dfWithCount = ordersDf
.groupBy("destination","orderDetails.item")
.agg(count("orderDetails.item").alias("itemCount"))
聚集的数据帧,然后看起来像这样
+-----------+-----+---------+
|destination| item|itemCount|
+-----------+-----+---------+
| loc1|item2| 1|
| loc2|item1| 1|
| loc2|item2| 3|
| loc1|item1| 2|
+-----------+-----+---------+
因为我们想找出每个位置最常见的项目,让我们分区的目的,并通过ITEMCOUNT列应用最大聚集。
val maxWindowSpec = Window.partitionBy("destination")
val maxColumn = max($"itemCount").over(maxWindowSpec)
val dfWithMax = dfWithCount.withColumn("maxItemCount",maxColumn)
得到的数据帧具有每个目的地的项目无论是itemCounts和MAXCOUNT
+-----------+-----+---------+------------+
|destination| item|itemCount|maxItemCount|
+-----------+-----+---------+------------+
| loc1|item2| 1| 2|
| loc1|item1| 2| 2|
| loc2|item1| 1| 3|
| loc2|item2| 3| 3|
+-----------+-----+---------+------------+
最后,我们筛选出列,其中ITEMCOUNT对于给定的(目标,项目)的组合是不是最大的项目数为那个目的地。
val result = dfWithMax
.filter("maxItemCount - itemCount == 0")
.drop("maxItemCount","itemCount")
result.show()
+-----------+-----+
|destination| item|
+-----------+-----+
| loc1|item1|
| loc2|item2|
+-----------+-----+