2017-09-25 135 views
4

我有2周卡夫卡的话题 - recommendationsclicks。第一个主题具有由唯一ID(称为recommendationsId)键入的推荐对象。每个产品都有一个用户可以点击的URL。卡夫卡流加入

clicks主题得到由上向用户推荐产品的那些的URL的点击生成的消息。它已经这样设置,这些点击消息也被recommendationId键入。

注意,建议和点击之间

  1. 关系是一对多。建议可能会导致多次点击,但点击总是与单个建议相关联。

  2. 每个点击对象都会有相应的推荐对象。

  3. 点击对象有一个时间戳晚于建议反对。

  4. 的建议和相应的点击(S)之间的差距可能是几秒钟至几天(比如说,7天最多)。

我的目标是使用卡夫卡流加入来加入这两个主题。我不清楚的是我应该使用KStream x KStream加入还是KStream x KTable加入。

我实施了KStream x KTable加入clicks流通过recommendations表。但是,如果建议在生成之前之前生成了连接器,并且在连接器启动之后点击到达,我无法看到任何加入的点击 - 建议对。

我使用正确的连接吗?我应该使用KStream x KStream加入吗?如果是这样,为了能够在过去至多7天内加入推荐的点击,我应该将窗口大小设置为7天吗?在这种情况下,我是否还需要设置“保留期”?

我的代码来执行KStream x KTable加入如下。请注意,我已经定义了类RecommendationsClick及其相应的serde。点击消息只是简单的String(url)。此URL字符串与Recommendations对象合并,以创建发送到jointTopicClick对象。

public static void main(String[] args){ 
    if(args.length!=4){ 
     throw new RuntimeException("Expected 3 params: bootstraplist clickTopic recsTopic jointTopic"); 
    } 

    final String booststrapList = args[0]; 
    final String clicksTopic = args[1]; 
    final String recsTopic = args[2]; 
    final String jointTopic = args[3]; 

    Properties config = new Properties(); 
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "my_joiner_id"); 
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, booststrapList); 
    config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); 
    config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, JoinSerdes.CLICK_SERDE.getClass().getName()); 

    KStreamBuilder builder = new KStreamBuilder(); 

    // load clicks as KStream 
    KStream<String, String> clicksStream = builder.stream(Serdes.String(), Serdes.String(), clicksTopic); 

    // load recommendations as KTable 
    KTable<String, Recommendations> recsTable = builder.table(Serdes.String(), JoinSerdes.RECS_SERDE, recsTopic); 

    // join the two 
    KStream<String, Click> join = clicksStream.leftJoin(recsTable, (click, recs) -> new Click(click, recs)); 

    // emit the join to the jointTopic 
    join.to(Serdes.String(), JoinSerdes.CLICK_SERDE, jointTopic); 

    // let the action begin 
    KafkaStreams streams = new KafkaStreams(builder, config); 
    streams.start(); 
    } 

这工作得很好,只要双方的建议和点击的夹板(以上程序)后,已产生运行。但是,如果点击到达,建议生成之前运行了连接器,我看不到任何连接发生。我该如何解决?

如果解决方案是使用KStream x KSTream加入,那么请帮助我了解什么窗口大小,我应该选择和选择什么样的保留期限。

+2

这可能有帮助:https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/ –

回答

5

您的整体观察结果是正确的。从概念上讲,您可以通过两种方式获得正确的结果。如果您使用的流表时,你有

  • 您已经提到的两个缺点(这可能会重新审视和卡夫卡的未来版本虽然改进),如果点击获取的处理相应的建议前,(inner- )加入将失败。然而,正如你知道会有推荐,你可以使用左连接而不是内连接,检查连接结果,并且如果推荐是null(例如,你得到了一个重试逻辑) - 或者当然,单次推荐的连续点击可能会出现乱码,您可能需要在应用程序代码中对此进行解释。
  • KTable的第二个缺点是,它会随着时间的推移而永远长大,无限制,因为您将为其添加越来越多的独特建议。因此,您需要通过向建议主题发送<recommendationsId, null>表格的墓碑记录来实施某些“到期逻辑”,以删除您不再关心的旧建议。
  • 这种方法的优点是,与流式流连接相比,您将需要更少的内存/磁盘空间,因为您只需要缓存应用程序中的所有建议(但无需点击)。

如果您使用流式流连接,并且在推荐后7天内可能发生点击,则窗口大小必须为7天 - 否则,点击将不会与建议一起使用。

  • 这种方法的缺点是,您将需要更多的内存/磁盘,因为您将在应用程序中缓存所有点击和最近7天的所有建议。
  • 优点是,订单或处理(即推荐与点击)无关紧要(即,您不需要执行上述的重试策略)
  • 此外,旧建议将自动更新因此你不需要实现特殊的“过期逻辑”。

对于流式流连接,保留时间答案有点不同。由于窗口大小为7天,它必须至少7天。否则,你会删除你的“运行窗口”的记录。您还可以设置更长的保留期限,以便能够处理“晚期数据”。假设用户在窗口时间范围的结尾处点击(建议的7天时间跨度前5分钟结束),但点击仅在1小时后报告给您的应用程序。如果您的保留期限为7天作为您的窗口大小,则此迟到记录将无法再处理(因为建议已被删除)。如果您设置了较长的保留期,例如8天,您仍然可以处理延迟记录。它取决于你的应用程序/语义需求你想使用什么保留时间。

摘要: 从实现的角度来看,使用stream-stream join比使用stream-table join更简单。不过,预计内存/磁盘节省量可能会很大,具体取决于您的点击流数据速率。

+0

感谢您的解释(和伟大的博客BTW!)。我有一个后续问题。假设我实现了'KStream x KStream' inner-join,运行这个木匠的机器是否会在过去7天内下载并保存所有*建议和点击消息(对于相应的分区,假设机器数量=分区)?这听起来像很多物理内存。 有没有办法扩展它(比如有两倍的机器数量作为分区数量)? – Nik

+2

它需要保存最近7天的所有数据,但不包含在内存中。我们在内部使用RocksDB,可能会泄露到磁盘。所以你可以保存比主存更大的状态。 - 关于缩放:您不能拥有比分区更多的实例。如果你需要更高的并行性来处理,你需要有更多的分区 - 一种方法是创建一个包含所需分区数量的主题,并在执行之前调用'through()'来重新分配输入数据加入。由于这个新主题仅用于缩放,如果可以有较短的保留时间(如1小时?)。 –