2017-05-31 170 views
2

我有一个kafka流 - 说博客和卡夫卡表 - 说与这些博客相关的评论。卡夫卡流的关键字可以映射到卡夫卡表中的多个值,即一个博客可以有多个评论。我想要对这两者进行连接,并用一系列注释ID创建一个新对象。但是当我进行连接时,流只包含最后一个注释ID。是否有任何文档或示例代码可以指出我如何实现这一目标?基本上,是否有任何文档阐述如何使用Kafka流和Kafka表进行一对多关系连接?卡夫卡流和卡夫卡表一对多关系加入

KStream<Integer, EnrichedBlog> joinedBlogComments = blogsStream.join(commentsTbl, 
       (blogId, blog) -> blog.getBlogId(), 
       (blog, comment) -> new EnrichedBlog(blog, comment)); 

因此,而不是评论 - 我需要有一个评论ID数组。

回答

3

我无法找到相匹配的是,在你的代码示例中的签名的连接方法,但在这里,我认为这是什么问题:

KTables被解释为changlog,也就是说,每一个后续消息相同的键被解释为对记录的更新,而不是新的记录。这就是为什么你只看到给定键(博客ID)的最后一条“评论”消息,以前的值将被覆盖。 为了克服这个问题,您需要首先改变如何填充KTable。您可以做的是将您的评论主题作为KStream添加到您的拓扑中,然后执行聚合,只需构建一个数组或共享相同博客ID的注释列表。该聚合返回一个KTable,您可以加入您的博客KStream。

这里有一个草图,你如何能做到这一点,构建一个列表值KTable:

builder.stream("yourCommentTopic") // where key is blog id 
.groupByKey() 
.aggregate(() -> new ArrayList(), 
    (key, value, agg) -> new KeyValue<>(key, agg.add(value)), 
    yourListSerde); 

名单更容易比数组中的聚合来使用,所以我建议你把它转换成数组下游如果需要的话。您还需要在上面的示例中为您的列表“yourListSerde”提供一个serde实现。

+0

为了便于阅读,我想补充一点,“yourCommentTopic”的关键是相应的博客帖子ID。然后'groupByKey'步骤确保随后的聚合步骤可以访问特定博客文章的所有评论(因此可以创建所有评论的列表)。 –

+0

谢谢!修改了这个效果的答案 –