我有2周卡夫卡的话题 - recommendations
和clicks
。第一个主题具有由唯一ID(称为recommendationsId
)键入的推荐对象。每个产品都有一个用户可以点击的URL。卡夫卡流加入
的clicks
主题得到由上向用户推荐产品的那些的URL的点击生成的消息。它已经这样设置,这些点击消息也被recommendationId
键入。
注意,建议和点击之间
关系是一对多。建议可能会导致多次点击,但点击总是与单个建议相关联。
每个点击对象都会有相应的推荐对象。
点击对象有一个时间戳晚于建议反对。
的建议和相应的点击(S)之间的差距可能是几秒钟至几天(比如说,7天最多)。
我的目标是使用卡夫卡流加入来加入这两个主题。我不清楚的是我应该使用KStream x KStream加入还是KStream x KTable加入。
我实施了KStream x KTable
加入clicks
流通过recommendations
表。但是,如果建议在生成之前之前生成了连接器,并且在连接器启动之后点击到达,我无法看到任何加入的点击 - 建议对。
我使用正确的连接吗?我应该使用KStream x KStream
加入吗?如果是这样,为了能够在过去至多7天内加入推荐的点击,我应该将窗口大小设置为7天吗?在这种情况下,我是否还需要设置“保留期”?
我的代码来执行KStream x KTable
加入如下。请注意,我已经定义了类Recommendations
和Click
及其相应的serde。点击消息只是简单的String
(url)。此URL字符串与Recommendations
对象合并,以创建发送到jointTopic
的Click
对象。
public static void main(String[] args){
if(args.length!=4){
throw new RuntimeException("Expected 3 params: bootstraplist clickTopic recsTopic jointTopic");
}
final String booststrapList = args[0];
final String clicksTopic = args[1];
final String recsTopic = args[2];
final String jointTopic = args[3];
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "my_joiner_id");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, booststrapList);
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, JoinSerdes.CLICK_SERDE.getClass().getName());
KStreamBuilder builder = new KStreamBuilder();
// load clicks as KStream
KStream<String, String> clicksStream = builder.stream(Serdes.String(), Serdes.String(), clicksTopic);
// load recommendations as KTable
KTable<String, Recommendations> recsTable = builder.table(Serdes.String(), JoinSerdes.RECS_SERDE, recsTopic);
// join the two
KStream<String, Click> join = clicksStream.leftJoin(recsTable, (click, recs) -> new Click(click, recs));
// emit the join to the jointTopic
join.to(Serdes.String(), JoinSerdes.CLICK_SERDE, jointTopic);
// let the action begin
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
}
这工作得很好,只要双方的建议和点击的夹板(以上程序)后,已产生运行。但是,如果点击到达,建议生成之前运行了连接器,我看不到任何连接发生。我该如何解决?
如果解决方案是使用KStream x KSTream
加入,那么请帮助我了解什么窗口大小,我应该选择和选择什么样的保留期限。
这可能有帮助:https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/ –