我有一个Kafka主题,我发送位置事件(key = user_id,value = user_location)。我能够读取和处理它作为一个KStream
:Kafka Streams API:KStream到KTable
KStreamBuilder builder = new KStreamBuilder();
KStream<String, Location> locations = builder
.stream("location_topic")
.map((k, v) -> {
// some processing here, omitted form clarity
Location location = new Location(lat, lon);
return new KeyValue<>(k, location);
});
行之有效,但我想有一个与每个用户的最后已知位置的KTable
。我怎么能这样做?
我能做到这一点写入和中间话题阅读:
// write to intermediate topic
locations.to(Serdes.String(), new LocationSerde(), "location_topic_aux");
// build KTable from intermediate topic
KTable<String, Location> table = builder.table("location_topic_aux", "store");
有一个简单的方法来获得从KStream
一个KTable
?这是我第一个使用Kafka Streams的应用程序,所以我可能错过了一些明显的东西。
我试图用你的方法通过做一个'KStream'来构建'KTable'哑巴'groupByKey',但'groupByKey'方法无法解析。你有什么想法可能会出错吗? (我是java生态系统和kafkas的新手) – LetsPlayYahtzee
什么是您的Streams版本?对于旧版本,它应该是'stream.reduceByKey(...)'而不是'stream.groupByKey()。reduce(...)'。请参阅http://docs.confluent.io/3.1.0/streams/upgrade-guide.html#stream-grouping-and-aggregation –
我以为我使用的是最新版本,但我在使用'0.10.0'时查看'0.10.1'版本的文档。所以我修正了它:) thnx – LetsPlayYahtzee