2017-10-18 84 views
1

当我尝试使用窗口和折叠功能聚合元素时,元素的某些 从获取聚合中错过。使用来自卡夫卡(value:0, value:1, value:2, value:3)的元素 ,并将它们合并为奇数和偶数值 。使用Flink窗口和折叠功能,元素缺失?

输出为:10-13之间

{even=[0, 2, 4], odd=[1, 3]} 
{even=[6, 8], odd=[5, 7, 9]} 
{even=[14, 16, 18], odd=[15, 17]} 
{even=[20, 22], odd=[19, 21, 23]} 
{even=[24, 26, 28], odd=[25, 27]} 

号已丢失,发生这种情况的一组随机 数字。有人可以建议从下面的代码中漏掉了什么, 我该如何确保处理所有元素?

public static class Splitter implements FlatMapFunction<String, 
    Tuple3<String, String, List<String>>{ 
    private static final long serialVersionUID = 1L; 

    @Override 
    public void flatMap(String value, Collector<Tuple3<String, String, 
     List<String>>out) throws Exception { 
     String[] vals = value.split(":"); 

     if(vals.length 1 && Integer.parseInt(vals[1]) % 2 == 0){ 
      out.collect(new Tuple3<String, String, List<String>> 
      ("test","even", Arrays.asList(vals[1]))); 
     }else{ 
      out.collect(new Tuple3<String, String, List<String>> 
      ("test","odd", Arrays.asList(vals[1]))); 
     } 
    } 
} 


    DataStream<Map<String, List<String>>streamValue = 
    kafkaStream.flatMap(new Splitter()).keyBy(0) 
    .window(TumblingEventTimeWindows.of(Time.milliseconds(3000))). 
    trigger(CustomizedCountTrigger.of(5L))//.trigger(CountTrigger.of(2)) 
    .fold(new HashMap<String, List<String>>(), new 
    FoldFunction<Tuple3<String, String, List<String>>, Map<String, 
    List<String>>>() { 
     private static final long serialVersionUID = 1L; 

     @Override 
     public Map<String, List<String>fold(Map<String, 
     List<String>accumulator, 
     Tuple3<String, String, List<String>value) throws 
     Exception { 
      if(accumulator.get(value.f1) != null){ 
       List<Stringlist = new ArrayList<>(); 
       list.addAll(accumulator.get(value.f1)); 
       list.addAll(value.f2); 
       accumulator.put(value.f1, list); 
      }else{ 
       accumulator.put(value.f1, value.f2); 
      } 
      return accumulator; 
     } 
    }); 

    streamValue.print(); 
    env.execute("window test"); 
} 


public class CustomizedCountTrigger<W extends Windowextends 
Trigger<Object, W{ 

    private static final long serialVersionUID = 1L; 
    private final long maxCount; 

    private final ReducingStateDescriptor<LongstateDesc = 
    new ReducingStateDescriptor<>("count", new Sum(), 
    LongSerializer.INSTANCE); 

    private CustomizedCountTrigger(long maxCount) { 
     this.maxCount = maxCount; 
    } 

    @Override 
    public TriggerResult onElement(Object element, long timestamp, W window, 
    TriggerContext ctx) throws Exception { 
     ReducingState<Longcount = ctx.getPartitionedState(stateDesc); 
     count.add(1L); 
     if (count.get() >= maxCount) { 
      count.clear(); 
      return TriggerResult.FIRE_AND_PURGE; 
     } 
     return TriggerResult.CONTINUE; 
    } 

    @Override 
    public TriggerResult onProcessingTime(long time, W window, 

    org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext 

    ctx) throws Exception { 
     return TriggerResult.CONTINUE; 
    } 

    @Override 
    public TriggerResult onEventTime(long time, W window, 

    org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext 

    ctx) throws Exception { 
     return TriggerResult.CONTINUE; 
    } 

    @Override 
    public void clear(W window, 
    org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext 

    ctx) 
    throws Exception { 
     ctx.getPartitionedState(stateDesc).clear(); 
    } 

    @Override 
    public String toString() { 
     return "CountTrigger(" + maxCount + ")"; 
    } 

    public static <W extends WindowCustomizedCountTrigger<Wof(long 
    maxCount) { 
     return new CustomizedCountTrigger<>(maxCount); 
    } 

    private static class Sum implements ReduceFunction<Long{ 
     private static final long serialVersionUID = 1L; 

     @Override 
     public Long reduce(Long value1, Long value2) throws Exception { 
      return value1 + value2; 
     } 

    } 
} 

回答

1

于是,我开始注意到您的自定义触发使您正在使用的不相关的一个TumblingEventTime窗口排序之前其实写这第一部分,但我想反正包括我原来的想法,因为我不完全确定为什么你不使用EventTime窗口。我意识到这一点后的反应是低于原来的。

你是在单个并行还是多个上运行它?我之所以问,是因为如果它是多重并行(并且kafka主题也是由多个分区组成),那么消息可能以非连续的顺序接收和处理。这可能导致带有时间戳的消息导致水印前进,导致窗口处理消息。然后,下一个消息具有在当前水印时间之前的事件时间(a.k.a为“迟到”)并且将导致该消息被丢弃。

因此,例如:如果你有20元和每一个的情况下,时间就像这样:

MESSAGE1:EVENTTIME:1000个 MESSAGE1:EVENTTIME:2000 等等

以及活动时间窗口是5001ms。

现在消息message1到message9依次通过。这第一个窗口将被处理并包含消息1-5(消息6将导致窗口被处理)。现在,如果message11在message10之前进入,它将导致包含消息6-9的窗口被处理。而当message10接下来时,水印已经超过了message10的事件时间,导致它被作为“延迟事件”丢弃。

合适的回答

而不是使用EVENTTIME窗口和一个自定义触发的,请尝试使用countWindow。

所以替换此:

.window(TumblingEventTimeWindows.of(Time.milliseconds(3000))). 
trigger(CustomizedCountTrigger.of(5L))//.trigger(CountTrigger.of(2)) 

有了这个:

.countWindow(5L) 
+0

非常感谢您的时间和解释。我同意使用eventTimeWindow并导致消息被丢弃。但我的用例如下所示。在此之前,我会澄清说,我试着使用并行(1)和并行(2),但问题保持不变,有些事件被丢弃。 – Sharath

+0

我的用例是当一个业务逻辑评​​估为真时处理一组事件。例如,如果事件总数大于3或者偶数的事件总数大于5或预定义的时间窗被超过(例如2秒)。另外,如果你用我们自己的一个重写窗口触发我的理解,则实际触发器将不再被考虑。在这种情况下,窗口的时间的流逝。 env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); env.setParallelism(1); @Jicaar我感谢你的意见。 – Sharath

+0

包含自定义触发器会覆盖默认触发器。但什么触发自定义触发器仍然有效。因此,当3000毫秒的TumblingEventTime窗口完成后,它将触发自定义触发器中的自定义onEventTime方法。但你有onEventTime方法设置为只继续不火和/或清洗(而默认的触发将返回FIRE_AND_PURGE),使得事件的时间窗口基本上是没有意义的,从我可以告诉。 – Jicaar