我有卡夫卡,看起来像这样的数据:窗口操作如何与Flink中的keyBy一起使用?
{
"account": "iOS", //Possible values: iOS, android, web, windows
"events": [
{
"timestamp": "2017-07-03T20:19:35Z"
}
]
}
时间戳从2017-07-03T20:19:35Z
至2017-07-03T20:22:30Z
(约3分钟)去。我有这个弗林克程序,它从上面的卡夫卡话题提取数据:
object TestWindow {
def main(args: Array[String]) = {
val props = new Properties()
props.put(...)
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.addSource(new FlinkKafkaConsumer010[String]("topic", new SimpleStringSchema(), props)).
keyBy(jsonStr => {
val jsonParser = new JsonParser()
val jsonObject = jsonParser.parse(jsonStr).getAsJsonObject()
jsonObject.get("account")
}).window(TumblingEventTimeWindows.of(Time.seconds(10)))
.reduce({(v1, v2) =>
println(v1 + " " + v2)
""
})
env.execute()
}
}
我无法理解窗口如何与keyBy
操作工作。
我知道上面的keyBy
操作会创建包含相同键的元素的不同分区。但是我对创建窗口的时间以及每个分区如何添加到哪个窗口感到困惑?
我的猜测是,因为每个JSON对象到达时,它会被keyBy
操作键,然后根据该JSON的事件timeStamp
对象将10秒创建一个窗口?
因此,例如,如果与"2017-07-03T20:19:35Z"
时间戳的JSON对象到达和具有account
类型的iOS然后通过分区为iOS键控将被创建和窗口也将被用于"2017-07-03T20:19:35Z"
到"2017-07-03T20:19:45Z"
创建?或者,无论来自Kafka的JSON对象的事件时间戳是否每隔10秒创建一个窗口?
噢好吧,因为我设置'env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)'为'EventTime',我的JSON对象的EventTime范围从'2017-07-03T20:19:35Z'到'2017-07 -03T20:22:30Z'所以窗口的创建就像'2017-07-03T20:19:35Z'到'2017-07-03T20:19:45Z'和'2017-07-03T20:19:45Z'一样'2017-07-03T20:19:55Z'? – CapturedTree
翻滚窗口与时代对齐,即'1970-01-01T00:00:00'。因此,从[2017-07-03T20:19:30Z,2017-07-03T20:19:40Z]和''2017-03-03T20:19:35Z''的10秒钟翻转窗口将会进入窗口, 2017-07-03T20:22:30Z'会变成'[2017-07-03T20:22:30Z,2017-07-03T20:22:40Z]'。 –
你碰巧知道是否有办法在flink中做到这一点:https://stackoverflow.com/questions/45203168/how-to-attach-a-hashmap-to-a-configuration-object-in-flink – CapturedTree