2016-08-22 23 views
5

我正在构建拓扑结构,并希望使用KStream.process()将一些中间值写入数据库。这一步不会改变数据的性质,并且是完全无状态的。如何注册一个无状态处理器(似乎也需要一个StateStore)?

添加Processor需要创建一个并将此实例与状态存储的名称一起传递给KStream.process()函数。这是我不明白的。

如何将StateStore对象添加到拓扑中,因为它需要StateStoreSupplier

未能添加一个表示StateStore给人当应用程序启动此错误:在线程“主要” org.apache.kafka.streams.errors.TopologyBuilderException

异常:无效的拓扑结构建筑:StateStore我的 - 国有商店尚未添加。

为什么处理器需要一个状态存储?看来这对于无状态且不维护状态的处理器来说可能是可选的。

通过应用处理器一次处理此流中的所有元素,一次一个元素。

+0

您是否已阅读http://docs.confluent.io/3.0.0/streams/developer-guide.html#processor-api上的Processor API章节?它包括使用州商店的章节。 –

回答

11

这是simple example on how to use state stores,取自Confluent Platform documentation on Kafka Streams

第1步:定义StateStore/StateStoreSupplier

StateStoreSupplier countStore = Stores.create("Counts") 
             .withKeys(Serdes.String()) 
             .withValues(Serdes.Long()) 
             .persistent() 
             .build(); 
  1. 我不明白的方式向StateStore对象添加到我的拓扑结构。尽管如此,它也需要一个StateStoreSupplier。

第2步:添加状态存储到拓扑。

选项A - 当使用处理器API:

TopologyBuilder builder = new TopologyBuilder(); 

// add the source processor node that takes Kafka topic "source-topic" as input 
builder.addSource("Source", "source-topic") 
     .addProcessor("Process",() -> new WordCountProcessor(), "Source") 
     // Add the countStore associated with the WordCountProcessor processor 
     .addStateStore(countStore, "Process") 
     .addSink("Sink", "sink-topic", "Process"); 

选项B - 当使用卡夫卡流DSL:

在这里,你需要调用KStreamBuilder#addStateStore("name-of-your-store")的状态存储添加到您的处理器拓扑。然后,在调用KStream#process()KStream#transform()等方法时,还必须传入状态存储的名称 - 否则应用程序将在运行时失败。

截至KStream#transform()的例子:

KStreamBuilder builder = new KStreamBuilder(); 

// Add the countStore that will be used within the Transformer[Supplier] 
// that we pass into `transform()` below. 
builder.addStateStore(countStore); 

KStream<byte[], String> input = builder.stream("source-topic"); 

KStream<String, Long> transformed = 
    input.transform(/* your TransformerSupplier */, countStore.name()); 

为什么需要一个处理器有一个状态存储?看来这对于无状态且不维护状态的处理器来说可能是可选的。

你说得对 - 如果你的处理器没有维护状态,你不需要状态存储。

使用DSL时,您只需调用KStreamBuilder#addStateStore("name-of-your-store")即可将状态存储添加到处理器拓扑中,并在稍后参考。

+0

我记得看到这个(和类似),但我没有使用** TopologyBuilder **。我试图通过使用'.foreach()'来解决这个问题。 – ethrbunny

+0

您不能在DSL的'foreach()'运算符中使用状态存储,因为根据运算符的定义,'foreach()'将*无状态*处理器节点添加到拓扑中。但是你可以在DSL中使用'transform()','transformValues()'和'process()'来进行有状态操作。请参阅https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams%3A+Internal+Data+Management。 –

+0

对不起,我错过了您使用DSL。在DSL和Processor API中都有一个'process()'方法。 :-) –

相关问题