0

假设我们为一个主题设置了3个卡夫卡分区,并且希望使用事件时间按小时对事件进行窗口化。Flink如何使用来自多个分区的卡夫卡主题的消息,而不会产生偏斜?

kafka使用者在当前窗口之外时是否停止从分区读取数据?或者它打开一个新窗口?如果它打开新窗口,那么如果一个分区的事件时间与其他事件时间相比是非常歪斜的,那么理论上可能会打开无限量的窗口并因此耗尽内存?当我们重放一些历史时,这种情况尤其是可能的。

我一直在试图从阅读文档中得到这个答案,但在分区上找不到有关Flink与Kafka的内部结构。关于这个特定主题的一些很好的文档将非常受欢迎。

谢谢!

回答

0

不断阅读和进一步开窗操作有没有影响。谈到内存不足时,有更多的事情需要考虑。

  • 通常你不必每一个事件存储窗口,但只是一些总的事件
  • 每当窗口关闭相应的内存被释放。

一些更卡夫卡消费者如何与特别EVENTTIME(水印交互您可以检查here

+0

这非常有用,谢谢。在窗口正在积累事件时,此示例中的减少是否会执行? whateverSource.windowByEventTime()。reduce(someReduceFunc).toSomeSink(foo) – RoyB

+0

是的,只会存储'reduce'函数的结果。 –

0

你可以试着所以首先从卡夫卡的所有事件使用这种类型的风格

public void runStartFromLatestOffsets() throws Exception { 
     // 50 records written to each of 3 partitions before launching a latest-starting consuming job 
     final int parallelism = 3; 
     final int recordsInEachPartition = 50; 

     // each partition will be written an extra 200 records 
     final int extraRecordsInEachPartition = 200; 

     // all already existing data in the topic, before the consuming topology has started, should be ignored 
     final String topicName = writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, parallelism, 1); 

     // the committed offsets should be ignored 
     KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(); 
     kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); 
     kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); 
kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); 
0

可能,这将帮助您获得清晰

https://github.com/apache/flink/blob/release-1.2/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java

一个分区确保每个内部Flink分区结束于一个Kafka分区。

尽管完整注释掉了部分。他们已经解释了var还有一些情况。

+0

这是为了生产朝向卡夫卡的活动?不是为了消费?这似乎没有为我的用例提供任何价值。 – RoyB

相关问题