1

我希望通过窗口期执行结构化流式聚合。鉴于以下数据模式。目标是根据用户最近发生的事件进行过滤。然后汇总每个位置的每种事件类型的计数。Spark结构化流式传输 - 如何通过最新和聚合计数进行重复数据删除

time location user type 
1  A   1  one 
2  A   1  two 
1  B   2  one 
2  B   2  one 
1  A   3  two 
1  A   4  one 

输出示例:

​​

类似如下:

val aggTypes = df 
    .select($"location", $"time", $"user", $"type") 
    .groupBy($"user") 
    .agg(max($"timestamp") as 'timestamp) 
    .select("*") 
    .withWatermark("timestamp", conf.kafka.watermark.toString + " seconds") 
    .groupBy(functions.window($"timestamp", DataConstant.t15min.toString + " seconds", DataConstant.t1min.toString + " seconds", $"location") 
    .agg(count(when($"type" === "one", $"type")) as 'countOne, count(when($"type" === "two", $"type" as 'countTwo))) 
    .drop($"window") 

结构化数据流不支持多聚合,并且不支持流DataFrames非基于时间窗/数据集。我不确定是否有可能在1个流式查询中实现所需的输出。

任何帮助表示赞赏。

回答

0

好像你正在尝试做无状态聚合。 https://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/sql/KeyValueGroupedDataset.html#flatMapGroups(org.apache.spark.api.java.function.FlatMapGroupsFunction,%20org.apache.spark.sql.Encoder)

flatMapGroups是一个聚合API,它将一个函数应用于数据集中的每个组。它仅适用于分组数据集.flatMapGroups不支持增加混洗开销的部分聚合。因此,请仅使用此API来执行适合内存的小批量聚合。还建议使用reduce函数或聚合器。 https://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/sql/expressions/Aggregator.html

val count = words.groupByKey(x => x) 
      .flatMapGroups 
      { 
       case (x, iterator) ⇒ Iterator((x, iterator.length)) 
       }.toDF("x", "count")   


count.writeStream.format("console").outputMode(OutputMode.Append()) 
相关问题