2015-06-18 70 views
1

我只是跟着ActorPublisher例如阿卡流,有时我得到这个消息:阿卡流OnNext不允许

java.lang.IllegalStateException:当流 没有要求的元素onNext是不允许的,totalDemand在0

看文档,他们解释:

通过调用onNext发送元素的流。您可以发送 发送与流客户请求的元素一样多的元素。 这个金额可以用totalDemand查询。当isActive和totalDemand> 0时,只允许 使用onNext,否则onNext将 抛出IllegalStateException。

当流订阅者请求更多元素时,ActorPublisherMessage.Request消息被传递给此actor,并且 您可以对该事件进行操作。总需求会自动更新。

如何防止totalDemand为零?当我得到这个错误时,我失去了我想发送的信息。

这是我一直在关注的例子:

http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-integrations.html

,这是我的课测试

object Test extends App { 

    implicit val actorSystem = ActorSystem("ReactiveKafka") 
    implicit val materializer = ActorFlowMaterializer() 

    val kafka = new ReactiveKafka(host = "localhost:9092", zooKeeperHost = "localhost:2181") 
    val publisher = kafka.consume("test", "groupName", new StringDecoder()) 

    val workerActor = actorSystem.actorOf(Props[Worker], name = "workerActor") 

    Source(publisher).map(WorkerPool.Msg(_, workerActor)).runWith(Sink.actorSubscriber(WorkerPool.props)) 

} 

嗯,我收到了来自卡夫卡的消息,我传递给然而,WorkerActor在向Kafka发送每秒10条消息时,其中一些由于此错误而丢失。

UPDATE

我正面临着这里所描述的错误(使用相同的库):

https://github.com/softwaremill/reactive-kafka/issues/11

我解决了矿用缓冲区,但看起来是这样的公关就能解决问题。

https://github.com/softwaremill/reactive-kafka/pull/13

回答

2

如果往下流水槽没有任何需求那么你唯一的选择是

  1. 告诉数据源供给Worker,没有需求,使得源可停止生产的消息直到有更多需求进入(反应式解决方案)。
  2. 缓冲这些消息,直到您从接收器获得一些可能会填满缓冲区的请求,然后放弃消息。
  3. 当需求为0时(这似乎是您当前的实施),丢弃消息。

但是“back-pressure”的要点是防止onNext在没有需求时被调用。

为了实现buffering选项上面,你可以在你的演员内部还是外部的任何缓冲:

  • 内部缓冲器:看“ActorPublisher”例如,在documentation用于缓冲的一个例子,而助长的一个演员ActorPublisher。
  • 外部缓冲区:在流中使用缓冲实现器或Flow.buffer使用外部缓冲区。
+0

隐VAL materializer = ActorFlowMaterializer( ActorFlowMaterializerSettings(actorSystem) .withInputBuffer(INITIALSIZE = 1024,MAXSIZE = 1024)) –

+0

实现上述代码,为我工作。谢谢。 –

+0

好听。我记得有一些警告使用外部缓冲区,而不是将缓冲区保留在Actor本身内。密切关注...... –