2017-01-17 56 views
1

我一直在尝试使用flink窗口的示例,并验证窗口的时间,我向流事件添加了时间戳。我发现窗口的持续时间少于窗口的长度。此外,如果我要使用滑动窗口并修改事件,则会将修改后的事件导入下一个窗口。窗口未完成其窗口长度

当我指定窗口长度时,是否不等待窗口完成?滑动窗口之间的重叠事件是指同一个实例? (我知道,流是不可改变的结构)

public class WindowDemo { 

public static void main(String[] args) { 
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); 

    Properties prop=PropertyLoader.loadPropertiesForConsumer("WC",0); 
    FlinkKafkaConsumer09<Alarm> consumer= new FlinkKafkaConsumer09<Alarm>("topic_smartEmse", new AlarmSchema(), prop); 
    DataStream<Alarm> inputStream= env.addSource(consumer); 

    inputStream= inputStream.flatMap(new FlatMapFunction<Alarm, Alarm>() { 

     @Override 
     public void flatMap(Alarm value, Collector<Alarm> out) 
       throws Exception { 
      System.out.println("flatMap Started at "+System.currentTimeMillis()); 
      value.setUserDefined10("IN TIME "+System.currentTimeMillis()); 
      out.collect(value); 
      System.out.println("flatMap Ended at "+System.currentTimeMillis()); 
     } 
    }); 

    KeyedStream<Alarm, String> keyedStream= inputStream.keyBy(new KeySelector<Alarm, String>(){ 

     @Override 
     public String getKey(Alarm value) throws Exception { 
      System.out.println("getKey Started at "+System.currentTimeMillis()); 
      return "XX"; 
     }}); 

    DataStream<Alarm> dataStream= keyedStream.timeWindow(Time.of(90, TimeUnit.SECONDS)).apply(new WindowFunction<Alarm, Alarm, String, TimeWindow>() { 

     @Override 
     public void apply(String key, TimeWindow window, 
       Iterable<Alarm> input, Collector<Alarm> out) 
       throws Exception { 
      System.out.println("timeWindow Started at "+System.currentTimeMillis()); 
      int count=0; 
      System.out.println("Key : "+key); 
      System.out.println("Values : "+input); 
      Iterator<Alarm> itr= input.iterator(); 
      while (itr.hasNext()){ 
       Alarm alarm= itr.next(); 
       alarm.setUserDefined1(""+count++); 

       out.collect(alarm); 
      } 
      System.out.println("timeWindow ended at "+System.currentTimeMillis()); 

     } 
    }); 

    dataStream= dataStream.flatMap(new FlatMapFunction<Alarm, Alarm>() { 

     @Override 
     public void flatMap(Alarm value, Collector<Alarm> out) 
       throws Exception { 
      value.setUserDefined11("OUT TIME "+System.currentTimeMillis()); 
      out.collect(value); 
     } 
    }); 
    dataStream.printToErr(); 
    try { 
     env.execute(); 
    } catch (Exception e) { 
     e.printStackTrace(); 
    } 
} 
} 

回答

1

如果我得到你的权利你关注的是,窗口评估板(申请被调用)给定的时间内前结束。我注意到第一次评估窗口的效果相同。这似乎是时间槽以某种方式对齐。我在19:09:13开始处理,第一次评估窗口在19:10:30,77秒后。在第一次通话之后,窗户不是完全闭合,而是非常接近每90秒。

对于TumblingProcessingTimeWindows(您正在使用)这似乎是这样的代码:

public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> { 

    private long size; 

    private TumblingProcessingTimeWindows(long size) { 
     this.size = size; 
    } 

    @Override 
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) { 

     final long now = context.getCurrentProcessingTime(); 
     // here goes the alignment 
     long start = now - (now % size); 
     return Collections.singletonList(new TimeWindow(start, start + size)); 
    } 

这是否是有意义的吗?

+0

您已共享的代码,对时间对齐进行手动修复。我想知道这是否应该通过隐式设置时间特性来完成(我已经尝试过)。 –

+0

这不是我的代码。它来自flink源代码:https://github.com/apache/flink/blob/release-1.1.4-rc1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/窗口/ assigners/TumblingProcessingTimeWindows.java – TobiSH

+0

好的,这是有道理的。我还想知道一个(滑动)窗口所做的更改是否可以在其他窗口中看到? –