2016-04-21 33 views
1

假设我具有形式(每行一个事件)的一个文件:如何通过Apache Flink的属性和时间窗口来计数?

Source,Timestamp aa,2014-05-02 22:12:11 bb,2014-05-02 22:22:11

我想总结通过源与5分钟的连续时间窗分组事件的数量。我如何用Flink做到这一点?

我有现在的问题是:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
    DataStreamSource<Event> stream = env.fromCollection(new EventFileReader(new File("path/to/file")), Event.class); 

    stream 
     .keyBy("getSource()") 
     .timeWindow(Time.minutes(5)) 
     .sum("getTimestamp()");  

    env.execute(); 

public class Event { 
    private final String source; 
    private final long timestamp; 

    public Event(String source, long timestamp) { 
     this.source = source; 
     this.timestamp = timestamp; 
    } 

    public String getSource() { 
     return source; 
    } 

    public long getTimestamp() { 
     return timestamp; 
    } 
} 

我错过了两件事情。首先,这失败了,并说Event类不是POJO。其次,我不知道如何计算窗口中的事件数量。现在我正在使用.sum("getTimestamp()"),但我确定不是这样。有什么想法吗?

回答

1

我会推荐使用fold函数来进行窗口聚合。以下代码片段应该完成这项工作:

public class Job { 
    public static void main(String[] args) throws Exception { 
     final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
     DataStream<Event> stream = env.fromElements(new Event("a", 1), new Event("b", 2), new Event("a", 2)).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Event>() { 
      @Nullable 
      @Override 
      public Watermark checkAndGetNextWatermark(Event event, long l) { 
       return new Watermark(l); 
      } 

      @Override 
      public long extractTimestamp(Event event, long l) { 
       return event.getTimestamp(); 
      } 
     }); 

     DataStream<Tuple2<String, Integer>> count = stream.keyBy(new KeySelector<Event, String>() { 
       @Override 
       public String getKey(Event event) throws Exception { 
        return event.getSource(); 
       } 
      }) 
      .timeWindow(Time.minutes(5)) 
      .fold(Tuple2.of("", 0), new FoldFunction<Event, Tuple2<String, Integer>>() { 
       @Override 
       public Tuple2<String, Integer> fold(Tuple2<String, Integer> acc, Event o) throws Exception { 
        return Tuple2.of(o.getSource(), acc.f1 + 1); 
       } 
      }); 

     count.print(); 

     env.execute(); 
    } 

    public static class Event { 
     private final String source; 
     private final long timestamp; 

     public Event(String source, long timestamp) { 
      this.source = source; 
      this.timestamp = timestamp; 
     } 

     public String getSource() { 
      return source; 
     } 

     public long getTimestamp() { 
      return timestamp; 
     } 
    } 
} 
相关问题