我有一个kafka流 - 说博客和卡夫卡表 - 说与这些博客相关的评论。卡夫卡流的关键字可以映射到卡夫卡表中的多个值,即一个博客可以有多个评论。我想要对这两者进行连接,并用一系列注释ID创建一个新对象。但是当我进行连接时,流只包含最后一个注释ID。是否有任何文档或示例代码可以指出我如何实现这一目标?基本上,是否有任何文档阐述如何使用Kafka流和Kafka表进行一对多关系连接?卡夫卡流和卡夫卡表一对多关系加入
KStream<Integer, EnrichedBlog> joinedBlogComments = blogsStream.join(commentsTbl,
(blogId, blog) -> blog.getBlogId(),
(blog, comment) -> new EnrichedBlog(blog, comment));
因此,而不是评论 - 我需要有一个评论ID数组。
为了便于阅读,我想补充一点,“yourCommentTopic”的关键是相应的博客帖子ID。然后'groupByKey'步骤确保随后的聚合步骤可以访问特定博客文章的所有评论(因此可以创建所有评论的列表)。 –
谢谢!修改了这个效果的答案 –