flink-streaming

    0热度

    1回答

    我想在基于历史事件的流中计算Flink中基于窗口的平均值(或由我定义的任何其他函数),因此流必须是事件时间(不处理基于时间): val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteri

    1热度

    1回答

    当我尝试使用窗口和折叠功能聚合元素时,元素的某些 从获取聚合中错过。使用来自卡夫卡(value:0, value:1, value:2, value:3)的元素 ,并将它们合并为奇数和偶数值 。 输出为:10-13之间 {even=[0, 2, 4], odd=[1, 3]} {even=[6, 8], odd=[5, 7, 9]} {even=[14, 16, 18], odd=[15, 1

    1热度

    1回答

    我是Flink的新手。我有这个代码来映射,组和输入JSON的总和。 这与词计数示例非常相似。 我有望获得(vacant,1) (occupied,2) 但是,由于某种原因,我得到(occupied,1) (vacant,1) (occupied,2) public static void main(String[] args) throws Exception { String s =

    0热度

    1回答

    我们计划使用Flink处理来自kafka主题的数据流(Logs in Json格式)。 但是,对于那个处理,我们需要使用每天都在变化的输入文件,而内部的信息可以完全改变(不是格式,而是内容)。 每当其中一个输入文件发生更改时,我们将不得不将这些文件重新加载到程序中并保持流处理正在进行。数据 重新加载可以做同样的方式,因为它现在已经完成: DataSet<String> globalData = e

    1热度

    1回答

    我正在调查Apache Flink如何工作并试图了解Flink中的时间窗口。

    1热度

    1回答

    我想提取由FlinkKafkaConsumer010生成的消息的时间戳作为数据流中的值。 我知道AssignerWithPeriodicWatermarks类,但这似乎只是通过DataStream API为时间聚合的目的提取时间戳。 我想在后面的Table中提供该卡夫卡消息时间戳,我可以在其上使用SQL。 编辑:尝试这样: val consumer = new FlinkKafkaConsumer

    0热度

    2回答

    引起弗林克仪表盘版本1.3.2 CEP模式我已经写了这样 Pattern<JoinedEvent, ?> pattern = Pattern.<JoinedEvent>begin("start") .where(new SimpleCondition<JoinedEvent>() { @Override public boolean filter(JoinedEv

    0热度

    1回答

    我正在做一个Flink项目。该项目的主要思想是读取JSON(网络日志)的数据流,将它们关联起来,并生成一个新的JSON,它是不同JSON信息的组合。 此时,我可以读取JSON,生成一个KeyedStream(基于生成日志的机器),然后生成一个5秒的窗口流。 我想要执行的下一步是将apply函数应用到窗口并合并每个JSON的信息。我对如何去做有点困惑。 我现在有该代码是以下各项之一: DataStr

    2热度

    1回答

    我正在评估Flink专门为流式窗口支持生成可能的警报。我关心的是内存使用情况,所以如果有人可以帮助,这将不胜感激。 例如,该应用程序将在给定的翻滚窗口(比如说5分钟)内从该流潜在地消耗大量的数据。在评估的时候,如果说有一百万份符合条件的文档,它们是否都会被加载到内存中? 的一般流程是: producer -> kafka -> flinkkafkaconsumer -> table.window(

    2热度

    2回答

    我最近尝试从Flink 1.3.2升级到1.4.0,我遇到了一些问题,无法再导入org.apache.hadoop.fs.{FileSystem, Path}。问题是发生在两个地方: ParquetWriter: import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.apach