这是simple example on how to use state stores,取自Confluent Platform documentation on Kafka Streams。
第1步:定义StateStore
/StateStoreSupplier
:
StateStoreSupplier countStore = Stores.create("Counts")
.withKeys(Serdes.String())
.withValues(Serdes.Long())
.persistent()
.build();
- 我不明白的方式向StateStore对象添加到我的拓扑结构。尽管如此,它也需要一个StateStoreSupplier。
第2步:添加状态存储到拓扑。
选项A - 当使用处理器API:
TopologyBuilder builder = new TopologyBuilder();
// add the source processor node that takes Kafka topic "source-topic" as input
builder.addSource("Source", "source-topic")
.addProcessor("Process",() -> new WordCountProcessor(), "Source")
// Add the countStore associated with the WordCountProcessor processor
.addStateStore(countStore, "Process")
.addSink("Sink", "sink-topic", "Process");
选项B - 当使用卡夫卡流DSL:
在这里,你需要调用KStreamBuilder#addStateStore("name-of-your-store")
的状态存储添加到您的处理器拓扑。然后,在调用KStream#process()
或KStream#transform()
等方法时,还必须传入状态存储的名称 - 否则应用程序将在运行时失败。
截至KStream#transform()
的例子:
KStreamBuilder builder = new KStreamBuilder();
// Add the countStore that will be used within the Transformer[Supplier]
// that we pass into `transform()` below.
builder.addStateStore(countStore);
KStream<byte[], String> input = builder.stream("source-topic");
KStream<String, Long> transformed =
input.transform(/* your TransformerSupplier */, countStore.name());
为什么需要一个处理器有一个状态存储?看来这对于无状态且不维护状态的处理器来说可能是可选的。
你说得对 - 如果你的处理器没有维护状态,你不需要状态存储。
使用DSL时,您只需调用KStreamBuilder#addStateStore("name-of-your-store")
即可将状态存储添加到处理器拓扑中,并在稍后参考。
您是否已阅读http://docs.confluent.io/3.0.0/streams/developer-guide.html#processor-api上的Processor API章节?它包括使用州商店的章节。 –