2015-12-08 113 views
9

我遇到了一些麻烦,理解事件时间窗口周围的语义。以下程序会生成一些带时间戳的元组,用作事件时间并进行简单的窗口聚合。我希望输出的顺序与输入相同,但输出顺序不同。为什么输出与事件时间无关?Flink流事件时间窗口排序

import java.util.concurrent.TimeUnit 
import org.apache.flink.streaming.api.TimeCharacteristic 
import org.apache.flink.streaming.api.windowing.time.Time 
import org.apache.flink.streaming.api.scala._ 

object WindowExample extends App { 
    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
    env.getConfig.enableTimestamps() 
    env.setParallelism(1) 

    val start = 1449597577379L 
    val tuples = (1 to 10).map(t => (start + t * 1000, t)) 

    env.fromCollection(tuples) 
     .assignAscendingTimestamps(_._1) 
     .timeWindowAll(Time.of(1, TimeUnit.SECONDS)) 
     .sum(1) 
     .print() 

    env.execute() 
} 

输入:

(1449597578379,1) 
(1449597579379,2) 
(1449597580379,3) 
(1449597581379,4) 
(1449597582379,5) 
(1449597583379,6) 
(1449597584379,7) 
(1449597585379,8) 
(1449597586379,9) 
(1449597587379,10) 

结果:

[info] (1449597579379,2) 
[info] (1449597581379,4) 
[info] (1449597583379,6) 
[info] (1449597585379,8) 
[info] (1449597587379,10) 
[info] (1449597578379,1) 
[info] (1449597580379,3) 
[info] (1449597582379,5) 
[info] (1449597584379,7) 
[info] (1449597586379,9) 

回答

10

这样做的原因的行为是在弗林克元件(相对于时间戳)的顺序不考虑帐户。只有水印的正确性及其与元素时间戳的关系对于考虑时间的操作很重要,因为水印通常会触发基于时间的操作中的计算。

在您的示例中,窗口操作符将来自源的所有元素存储在内部窗口缓冲区中。然后,源发出一个水印,表示将来不会有具有较小时间戳的元素到达。反过来,这又会告诉窗口操作员处理所有低于水印的结束时间戳的窗口(对于所有窗口都是如此)。因此,它发出所有窗口(以任意排序),然后它自己发出一个水印。下游的操作本身会接收到这些元素,并且可以在接收到水印后进行处理。

默认情况下,从源发出水印的时间间隔为200毫秒。由于源头发出的少量元素会在发出第一个水印之前发出。在现实世界的用例中,水印发射间隔比窗口大小要小很多,您可以按照时间戳的顺序获得预期的窗口行为。例如,如果每500毫秒有1小时窗口和水印。

+1

您可以给出还是指向下游操作的示例,它可以在收到水印后根据事件时间对元素重新排序?谢谢! –

+1

@MaximKolchin这样的重排序发生在例如在CEP库中。你可以看看这里:https://github.com/apache/flink/blob/master/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java –