我只是跟着ActorPublisher例如阿卡流,有时我得到这个消息:阿卡流OnNext不允许
java.lang.IllegalStateException:当流 没有要求的元素onNext是不允许的,totalDemand在0
看文档,他们解释:
通过调用onNext发送元素的流。您可以发送 发送与流客户请求的元素一样多的元素。 这个金额可以用totalDemand查询。当isActive和totalDemand> 0时,只允许 使用onNext,否则onNext将 抛出IllegalStateException。
当流订阅者请求更多元素时,ActorPublisherMessage.Request消息被传递给此actor,并且 您可以对该事件进行操作。总需求会自动更新。
如何防止totalDemand为零?当我得到这个错误时,我失去了我想发送的信息。
这是我一直在关注的例子:
http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-integrations.html
,这是我的课测试
object Test extends App {
implicit val actorSystem = ActorSystem("ReactiveKafka")
implicit val materializer = ActorFlowMaterializer()
val kafka = new ReactiveKafka(host = "localhost:9092", zooKeeperHost = "localhost:2181")
val publisher = kafka.consume("test", "groupName", new StringDecoder())
val workerActor = actorSystem.actorOf(Props[Worker], name = "workerActor")
Source(publisher).map(WorkerPool.Msg(_, workerActor)).runWith(Sink.actorSubscriber(WorkerPool.props))
}
嗯,我收到了来自卡夫卡的消息,我传递给然而,WorkerActor在向Kafka发送每秒10条消息时,其中一些由于此错误而丢失。
UPDATE
我正面临着这里所描述的错误(使用相同的库):
https://github.com/softwaremill/reactive-kafka/issues/11
我解决了矿用缓冲区,但看起来是这样的公关就能解决问题。
https://github.com/softwaremill/reactive-kafka/pull/13
隐VAL materializer = ActorFlowMaterializer( ActorFlowMaterializerSettings(actorSystem) .withInputBuffer(INITIALSIZE = 1024,MAXSIZE = 1024)) –
实现上述代码,为我工作。谢谢。 –
好听。我记得有一些警告使用外部缓冲区,而不是将缓冲区保留在Actor本身内。密切关注...... –