2017-06-15 32 views
0

我需要应用一个聚合函数的数据流与apache火花流(没有APACHE SPARK流传输SQL)。应用聚合函数与火花流scala

在我的情况下,我有一个kafka生产者发送JSON格式的消息。 格式是{'a': String, 'b': String, 'c': Integer, 'd': Double}

我需要聚合在属性'a''b'每5秒,我必须对其他2个属性(例如平均的,或点心,或最小值,或最大值)应用聚合函数。

我该怎么做?

感谢

+0

你是否已经尝试了'reduce'功能? https://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations-on-dstreams – maasg

+0

问题是reduce函数需要2个参数并返回1.我需要具有相同的模式。换句话说,如果我的初始模式是'{'a':String,'b':String,'c':Integer,'d':Double}'结果模式(带有AVG聚合函数)应该是'{' GROUPBYa':String,'GROUPBYb':String,'AVGc':Integer,'AVGd':Double}' –

+0

您也可以使用'transform'或'foreachRDD'并应用任意RDD函数,或者转换为Dataframes并使用数据帧聚合API – maasg

回答

1

为了让您一开始,你可以接近聚集这样的:

import sparkSession.implicits._ 

jsonDstream.foreachRDD{jsonRDD => 
    val df = sparkSession.read.json(jsonRDD) 
    val aggr = df.groupBy($"a", $"b").agg(avg($"c")) 
    ... do something with aggr ... 
}