你可以试着所以首先从卡夫卡的所有事件使用这种类型的风格
public void runStartFromLatestOffsets() throws Exception {
// 50 records written to each of 3 partitions before launching a latest-starting consuming job
final int parallelism = 3;
final int recordsInEachPartition = 50;
// each partition will be written an extra 200 records
final int extraRecordsInEachPartition = 200;
// all already existing data in the topic, before the consuming topology has started, should be ignored
final String topicName = writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, parallelism, 1);
// the committed offsets should be ignored
KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler();
kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
这非常有用,谢谢。在窗口正在积累事件时,此示例中的减少是否会执行? whateverSource.windowByEventTime()。reduce(someReduceFunc).toSomeSink(foo) – RoyB
是的,只会存储'reduce'函数的结果。 –