我想在基于历史事件的流中计算Flink中基于窗口的平均值(或由我定义的任何其他函数),因此流必须是事件时间(不处理基于时间):使用Flink和基于事件时间的流计算平均值
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
我已经找到了如何在摄入添加时间戳:
ctx.collectWithTimestamp(Datapoint(instrument, bid, ask), time.getMillis)
但是,当我做计算(一个应用函数),它不工作时我只是按照我没有使用EventTime的方式进行操作。我已经读了一些关于我必须设置的水印:
val avg = stream
.keyBy("instrument")
.timeWindow(Time.seconds(10))
.apply((key: Tuple, window: TimeWindow, values: Iterable[Datapoint], out: Collector[Datapoint])=>{
val avg = values.map(_.val).sum/values.size
val dp = Datapoint(key.getField[String](0), avg)
out.collect(dp)
})
avg.print()
env.execute()
有人有一个简单的Scala例子吗?
问候,
安德烈亚斯
非常感谢!顺便说一句:你知道Apache Flink的好书或者教程吗? –
@AndreasVogler我认为这本书对你很有用:https://data-artisans.com/download-introduction-apache-flink-book –
http://training.data-artisans.com/上的练习应该是很有帮助。 –