2017-12-03 164 views
0

我想在基于历史事件的流中计算Flink中基于窗口的平均值(或由我定义的任何其他函数),因此流必须是事件时间(不处理基于时间):使用Flink和基于事件时间的流计算平均值

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment 
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 

我已经找到了如何在摄入添加时间戳:

ctx.collectWithTimestamp(Datapoint(instrument, bid, ask), time.getMillis) 

但是,当我做计算(一个应用函数),它不工作时我只是按照我没有使用EventTime的方式进行操作。我已经读了一些关于我必须设置的水印:

val avg = stream 
    .keyBy("instrument") 
    .timeWindow(Time.seconds(10)) 
    .apply((key: Tuple, window: TimeWindow, values: Iterable[Datapoint], out: Collector[Datapoint])=>{ 
    val avg = values.map(_.val).sum/values.size 
    val dp = Datapoint(key.getField[String](0), avg) 
    out.collect(dp) 
    }) 

avg.print() 
env.execute() 

有人有一个简单的Scala例子吗?

问候,
安德烈亚斯

回答

0

水印是一种有效地与早期的时间戳的所有事件都(可能)已经抵达断言时间戳。基于事件时间的Windows依赖水印来知道窗口何时完成。到目前为止,最常见的水印策略是假定事件以一定的有限延迟到达。

如果要发射的数据源水印(服用时),见Source Functions with Timestamps and Watermarks,但它是那样简单

ctx.emitWatermark(new Watermark(datapoint.getWatermarkTime)) 

如果,另一方面,要解决这个问题之外来源,见Timestamp Assigners/Watermark GeneratorsAssigners allowing a fixed amount of lateness。你可以简单地做这样的事情:

stream 
    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Datapoint](Time.seconds(10))(_.getTimestamp)) 
    .keyBy("instrument") 
    ... 

我链接到的文档有更详细的例子在斯卡拉。

+0

非常感谢!顺便说一句:你知道Apache Flink的好书或者教程吗? –

+0

@AndreasVogler我认为这本书对你很有用:https://data-artisans.com/download-introduction-apache-flink-book –

+0

http://training.data-artisans.com/上的练习应该是很有帮助。 –