我希望通过窗口期执行结构化流式聚合。鉴于以下数据模式。目标是根据用户最近发生的事件进行过滤。然后汇总每个位置的每种事件类型的计数。Spark结构化流式传输 - 如何通过最新和聚合计数进行重复数据删除
time location user type
1 A 1 one
2 A 1 two
1 B 2 one
2 B 2 one
1 A 3 two
1 A 4 one
输出示例:
类似如下:
val aggTypes = df
.select($"location", $"time", $"user", $"type")
.groupBy($"user")
.agg(max($"timestamp") as 'timestamp)
.select("*")
.withWatermark("timestamp", conf.kafka.watermark.toString + " seconds")
.groupBy(functions.window($"timestamp", DataConstant.t15min.toString + " seconds", DataConstant.t1min.toString + " seconds", $"location")
.agg(count(when($"type" === "one", $"type")) as 'countOne, count(when($"type" === "two", $"type" as 'countTwo)))
.drop($"window")
结构化数据流不支持多聚合,并且不支持流DataFrames非基于时间窗/数据集。我不确定是否有可能在1个流式查询中实现所需的输出。
任何帮助表示赞赏。