2017-07-04 52 views
0

我有卡夫卡,看起来像这样的数据:窗口操作如何与Flink中的keyBy一起使用?

{ 
    "account": "iOS", //Possible values: iOS, android, web, windows 
    "events": [ 
    { 
     "timestamp": "2017-07-03T20:19:35Z" 
    } 
    ] 
} 

时间戳从2017-07-03T20:19:35Z2017-07-03T20:22:30Z(约3分钟)去。我有这个弗林克程序,它从上面的卡夫卡话题提取数据:

object TestWindow { 
    def main(args: Array[String]) = { 
    val props = new Properties() 
    props.put(...) 

    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 

    env.addSource(new FlinkKafkaConsumer010[String]("topic", new SimpleStringSchema(), props)). 
     keyBy(jsonStr => { 
     val jsonParser = new JsonParser() 
     val jsonObject = jsonParser.parse(jsonStr).getAsJsonObject() 
     jsonObject.get("account") 
    }).window(TumblingEventTimeWindows.of(Time.seconds(10))) 
     .reduce({(v1, v2) => 
     println(v1 + " " + v2) 
     "" 
     }) 
    env.execute() 
    } 
} 

我无法理解窗口如何与keyBy操作工作。

我知道上面的keyBy操作会创建包含相同键的元素的不同分区。但是我对创建窗口的时间以及每个分区如何添加到哪个窗口感到困惑?

我的猜测是,因为每个JSON对象到达时,它会被keyBy操作键,然后根据该JSON的事件timeStamp对象将10秒创建一个窗口?

因此,例如,如果与"2017-07-03T20:19:35Z"时间戳的JSON对象到达和具有account类型的iOS然后通过分区为iOS键控将被创建和窗口也将被用于"2017-07-03T20:19:35Z""2017-07-03T20:19:45Z"创建?或者,无论来自Kafka的JSON对象的事件时间戳是否每隔10秒创建一个窗口?

回答

2

您对keyBy的假设是正确的。 keyBy在定义的关键属性上划分流,并且按照每个关键字计算窗口。

您在示例中使用的TumblingEventTimeWindow具有固定的窗口边框,即边框不依赖于数据的时间戳。 10秒翻滚窗口将创建来自[00:00:00.000, 00:00:10.000),[00:00:10.000, 00:00:20.000)等的窗口。到达窗口操作员的记录将被分配到与其时间戳相交的窗口。当窗口关闭时(操作员的本地时间通过窗口的结束时间戳),窗口的结果被计算并发出。请注意,窗口仅用第一个记录实例化,即空窗口不触发计算并且不能发射数据,例如零计数。

其他窗口类型(如会话窗口)具有数据驱动边界。在会话窗口的情况下,所有不超过一定时间间隔的记录将被分组到一个窗口中。

+0

噢好吧,因为我设置'env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)'为'EventTime',我的JSON对象的EventTime范围从'2017-07-03T20:19:35Z'到'2017-07 -03T20:22:30Z'所以窗口的创建就像'2017-07-03T20:19:35Z'到'2017-07-03T20:19:45Z'和'2017-07-03T20:19:45Z'一样'2017-07-03T20:19:55Z'? – CapturedTree

+1

翻滚窗口与时代对齐,即'1970-01-01T00:00:00'。因此,从[2017-07-03T20:19:30Z,2017-07-03T20:19:40Z]和''2017-03-03T20:19:35Z''的10秒钟翻转窗口将会进入窗口, 2017-07-03T20:22:30Z'会变成'[2017-07-03T20:22:30Z,2017-07-03T20:22:40Z]'。 –

+0

你碰巧知道是否有办法在flink中做到这一点:https://stackoverflow.com/questions/45203168/how-to-attach-a-hashmap-to-a-configuration-object-in-flink – CapturedTree