我们计划使用Flink处理来自kafka主题的数据流(Logs in Json格式)。 但是,对于那个处理,我们需要使用每天都在变化的输入文件,而内部的信息可以完全改变(不是格式,而是内容)。 每当其中一个输入文件发生更改时,我们将不得不将这些文件重新加载到程序中并保持流处理正在进行。数据 重新加载可以做同样的方式,因为它现在已经完成: DataSet<String> globalData = e
我想提取由FlinkKafkaConsumer010生成的消息的时间戳作为数据流中的值。 我知道AssignerWithPeriodicWatermarks类,但这似乎只是通过DataStream API为时间聚合的目的提取时间戳。 我想在后面的Table中提供该卡夫卡消息时间戳,我可以在其上使用SQL。 编辑:尝试这样: val consumer = new FlinkKafkaConsumer