2017-01-02 27 views
1

我想加入2个KTables。卡夫卡流 - 加入两个ktables调用两次加入函数

KTable<String, RecordBean> recordsTable = builder.table(Serdes.String(), 
    new JsonPOJOSerde<>(RecordBean.class), 
    bidTopic, RECORDS_STORE); 

KTable<String, ImpressionBean> impressionsTable = builder.table(Serdes.String(), 
    new JsonPOJOSerde<>(ImpressionBean.class), 
    impressionTopic, IMPRESSIONS_STORE); 

KTable<String, RecordBean> mergedByTxId = recordsTable 
    .join(impressionsTable, merge()); 

合并函数非常简单,我只是将值从一个bean复制到另一个。

public static <K extends BidInfo, V extends BidInfo> ValueJoiner<K, V, K> merge() { 
return (v1, v2) -> { 
    v1.setRtbWinningBidAmount(v2.getRtbWinningBidAmount()); 
    return v1; 
}; 

但由于某些原因,连接函数在单个生成的记录上调用两次。 请参考下面

Properties streamsConfiguration = new Properties(); 
streamsConfiguration 
    .put(StreamsConfig.APPLICATION_ID_CONFIG, "join-impressions"); 
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); 

streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zookeeperConnect()); 
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, folder.newFolder("kafka-streams-tmp") 
    .getAbsolutePath()); 

return streamsConfiguration; 

生产者配置流/生产配置 -

Properties producerConfig = new Properties(); 
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); 
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); 
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); 
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 

return producerConfig; 

下一页我提交每个数据流单纪录。两个记录具有相同的密钥。 我期待收到单个记录作为输出。

IntegrationTestUtils.produceKeyValuesSynchronously(bidsTopic, 
    Arrays.asList(new KeyValue("1", getRecordBean("1"))), 
    getProducerProperties()); 

IntegrationTestUtils.produceKeyValuesSynchronously(impressionTopic, 
    Arrays.asList(new KeyValue("1", getImpressionBean("1"))), 
    getProducerProperties()); 

List<KeyValue<String, String>> parsedRecord = 
    IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(getConsumerProperties(), 
     outputTopic, 1); 

但ValueJoiner触发2次,而我得到2个相同的输出记录,而不是一个。在触发时间内 - 两个数据流的值都存在 - 我无法得到触发第二次执行的内容。

不加入 - 我无法重现此行为。 我无法找到2 ktable join的任何工作示例 - 所以无法理解我的方法有什么问题。

添加简单的代码演示相同的行为

KStreamBuilder builder = new KStreamBuilder(); 

KTable<String, String> first = builder.table("stream1", "storage1"); 
KTable<String, String> second = builder.table("stream2", "storage2"); 

KTable<String, String> joined = first.join(second, (value1, value2) -> value1); 

joined.to("output"); 

KafkaStreams streams = new KafkaStreams(builder, getStreamingProperties()); 

streams.start(); 

IntegrationTestUtils.produceKeyValuesSynchronously("stream1", 
    Arrays.asList(new KeyValue("1", "first stream")), 
    getProducerProperties()); 

IntegrationTestUtils.produceKeyValuesSynchronously("stream2", 
    Arrays.asList(new KeyValue("1", "second stream")), 
    getProducerProperties()); 

List<KeyValue<String, String>> parsedRecord = 
    IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(getConsumerProperties(), 
     "output", 1); 
+0

卡夫卡流加入语义是棘手的。请参阅文档以获取更多详细信息:https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics 为了更好地理解您的问题,您可以提供一个数据示例吗? (即输入记录与时间戳以及预期和观察结果) –

+0

我在原始文章中添加了更多细节,我如何生成记录。一般来说,我每个流生产1条记录,并期望收到1条记录作为输出操作,出于某种原因,我收到2条。我仔细阅读文档,但似乎我错过了一些东西。 –

+0

你收到两个相同或两个不同的输出记录吗?你是否尝试调试,即在你的'ValueJoiner'代码中设置一个断点? –

回答

1

我被张贴类似的问题,以汇合邮件组后,下面的解释。

我认为这可能与缓存有关。 2个表的缓存被独立刷新,因此您有可能获得两次相同的记录。如果stream1和stream2都接收到同一个密钥的记录,并且缓存刷新,则:

stream1中的缓存将刷新,执行联接并生成记录。

来自stream2的缓存将刷新,执行联接并生成记录。

从技术上讲,这是可以的,因为连接的结果是另一个KTable,所以KTable中的值将是正确的值。

将以下变量设置为0 StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG之后,0 - 问题已解决。 我仍然有2条记录 - 但现在有一条记录与null结合 - 根据上面提供的连接语义文档,它的行为非常清晰。

+0

这很有道理 - 这也是我问你是否得到相同记录两次或两次不同输出记录的原因。 (你可能想接受你自己的答案。) –