2017-03-21 87 views
12

我有一个Kafka主题,我发送位置事件(key = user_id,value = user_location)。我能够读取和处理它作为一个KStreamKafka Streams API:KStream到KTable

KStreamBuilder builder = new KStreamBuilder(); 

KStream<String, Location> locations = builder 
     .stream("location_topic") 
     .map((k, v) -> { 
      // some processing here, omitted form clarity 
      Location location = new Location(lat, lon); 
      return new KeyValue<>(k, location); 
     }); 

行之有效,但我想有一个与每个用户的最后已知位置的KTable。我怎么能这样做?

我能做到这一点写入和中间话题阅读:

// write to intermediate topic 
locations.to(Serdes.String(), new LocationSerde(), "location_topic_aux"); 

// build KTable from intermediate topic 
KTable<String, Location> table = builder.table("location_topic_aux", "store"); 

有一个简单的方法来获得从KStream一个KTable?这是我第一个使用Kafka Streams的应用程序,所以我可能错过了一些明显的东西。

回答

12

目前没有直接的方法来做到这一点。您的方法绝对有效,如Confluen常见问题解答中所述:http://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step

这是关于代码的最简单的方法。然而,它的缺点是(a)你需要管理一个额外的主题,并且(b)它会导致额外的网络流量,因为数据被写入并重新读取卡夫卡。

还有一个替代方案中,使用“伪减少”:

KStreamBuilder builder = new KStreamBuilder(); 
KStream<String, Long> stream = ...; // some computation that creates the derived KStream 

KTable<String, Long> table = stream.groupByKey().reduce(
    new Reducer<Long>() { 
     @Override 
     public Long apply(Long aggValue, Long newValue) { 
      return newValue; 
     } 
    }, 
    "dummy-aggregation-store"); 

这种方法在某种程度上相对于代码相比选项1更复杂,但是具有这样的优点是:(a )不需要手动管理主题,并且(b)不需要重新读取来自Kafka的数据。

总体而言,你需要自己来决定,哪种方法你更喜欢:

在选项2中,卡夫卡流将创建一个内部的changelog话题备份KTable容错。因此,这两种方法都需要卡夫卡额外的存储空间,并导致额外的网络流量。总的来说,这是在选项2中稍微更复杂的代码与选项1中的手动主题管理之间的折衷。

+0

我试图用你的方法通过做一个'KStream'来构建'KTable'哑巴'groupByKey',但'groupByKey'方法无法解析。你有什么想法可能会出错吗? (我是java生态系统和kafkas的新手) – LetsPlayYahtzee

+1

什么是您的Streams版本?对于旧版本,它应该是'stream.reduceByKey(...)'而不是'stream.groupByKey()。reduce(...)'。请参阅http://docs.confluent.io/3.1.0/streams/upgrade-guide.html#stream-grouping-and-aggregation –

+1

我以为我使用的是最新版本,但我在使用'0.10.0'时查看'0.10.1'版本的文档。所以我修正了它:) thnx – LetsPlayYahtzee

相关问题