2016-04-22 24 views
0

我想创建一个Trigger,它在20秒内第一次触发并且在此后每隔5秒触发一次。我已经使用GlobalWindows和一个自定义TriggerFlink自定义触发器给予意想不到的输出

val windowedStream = valueStream 
          .keyBy(0) 
          .window(GlobalWindows.create()) 
          .trigger(TradeTrigger.of()) 

这里是TradeTrigger代码:

@PublicEvolving 
public class TradeTrigger<W extends Window> extends Trigger<Object, W> { 

    private static final long serialVersionUID = 1L; 

    static boolean flag=false; 
    static long ctime = System.currentTimeMillis(); 

    private TradeTrigger() { 
    } 

    @Override 
    public TriggerResult onElement(
      Object arg0, 
      long arg1, 
      W arg2, 
      org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg3) 
      throws Exception { 
     // TODO Auto-generated method stub 

     if(flag == false){ 
      if((System.currentTimeMillis()-ctime) >= 20000){ 
       flag = true; 
       ctime = System.currentTimeMillis(); 
       return TriggerResult.FIRE; 
      } 
      return TriggerResult.CONTINUE; 
     } else { 
      if((System.currentTimeMillis()-ctime) >= 5000){ 
       ctime = System.currentTimeMillis(); 
       return TriggerResult.FIRE; 
      } 
      return TriggerResult.CONTINUE; 
     } 

    } 

    @Override 
    public TriggerResult onEventTime(
      long arg0, 
      W arg1, 
      org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2) 
      throws Exception { 
     // TODO Auto-generated method stub 
     return TriggerResult.CONTINUE; 
    } 

    @Override 
    public TriggerResult onProcessingTime(
      long arg0, 
      W arg1, 
      org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2) 
      throws Exception { 
     // TODO Auto-generated method stub 
     return TriggerResult.CONTINUE; 
    } 


    public static <W extends Window> TradeTrigger<W> of() { 
     return new TradeTrigger<>(); 
    } 

} 

因此,基本上,当flagfalse,即首次Trigger应该在20秒内被解雇并将flag设置为true。从下一次开始,它应该每5秒发射一次。

我面临的问题是,每次触发Trigger时,我在输出中只收到一条消息。也就是说,我在20秒后收到一条消息,每五秒收到一条消息。 我期待在每次触发的输出中有20条消息。

如果我使用.timeWindow(Time.seconds(5))并创建一个五秒钟的时间窗口,我每5秒钟会得到20条输出消息。 请帮我拿这段代码吧。有什么我失踪?

回答

1

得到它在Fabian和Flink邮件列表的答案帮助下工作。 通过TriggerContext将状态存储在ValueState变量中。检查onEvent()方法中的变量,如果它是第一次,则注册一个processingTimeTimer比当前时间多20秒并更新状态。在onProcessingTime方法中,注册另一个ProcessingTimeTimer比当前时间多5秒,更新状态并开启Window

+0

你能否用更新的代码更新答案?这将非常有帮助 – Akash

3

有你的触发执行的几个问题:

  1. 你应该函数的状态从来没有存储在一个静态变量。 Flink不会隔离JVM中的用户进程。相反,它使用每个TaskManager的单个JVM并启动多个线程。因此,您的静态布尔标志在多个触发器实例之间共享。您应该存储Flink的ValueState接口,该接口可从TriggerContext访问。如果发生故障,Flink会小心检查您的状态并恢复。

  2. Trigger.onEvent()仅在新事件到达时调用。所以它不能用于在特定时间触发窗口计算。相反,您应该注册事件时间计时器或处理时间计时器(再次通过TriggerContext)。计时器将分别呼叫Trigger.onEventTime()Trigger.onProcessingTime()。是否使用事件或处理时间取决于您的使用情况。

+0

再次谢谢Fabian。但我需要知道如何以编程方式完成此任务。 1.如何将我的标志变量存储在'ValueState'中,以及如何访问它? 2.我是否需要在我的'Trigger'的'onProcessingTime()'或'onEventTime()'重写方法中写入逻辑? 3.什么是通过触发上下文注册计时器的正确语法? –