1
假设我具有形式(每行一个事件)的一个文件:如何通过Apache Flink的属性和时间窗口来计数?
Source,Timestamp aa,2014-05-02 22:12:11 bb,2014-05-02 22:22:11
我想总结通过源与5分钟的连续时间窗分组事件的数量。我如何用Flink做到这一点?
我有现在的问题是:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<Event> stream = env.fromCollection(new EventFileReader(new File("path/to/file")), Event.class);
stream
.keyBy("getSource()")
.timeWindow(Time.minutes(5))
.sum("getTimestamp()");
env.execute();
public class Event {
private final String source;
private final long timestamp;
public Event(String source, long timestamp) {
this.source = source;
this.timestamp = timestamp;
}
public String getSource() {
return source;
}
public long getTimestamp() {
return timestamp;
}
}
我错过了两件事情。首先,这失败了,并说Event
类不是POJO。其次,我不知道如何计算窗口中的事件数量。现在我正在使用.sum("getTimestamp()")
,但我确定不是这样。有什么想法吗?