这里是akka documentation水槽倍的阿卡流Source.actorRef缓冲和OverflowStrategy
val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right)
val (ref, future) = Source.actorRef(3, OverflowStrategy.fail)
.toMat(sinkUnderTest)(Keep.both).run()
ref ! 1
ref ! 2
ref ! 3
ref ! akka.actor.Status.Success("done")
val result = Await.result(future, 3.seconds)
assert(result == "123")
的代码片段是一个工作的代码片段,但是,如果我用裁判来告诉另一个消息像ref ! 4
,我一个例外如akka.stream.BufferOverflowException: Buffer overflow (max capacity was: 3)
我想缓冲区大小3应该足够了。原因在于折叠操作是(acc,ele)=> acc,因此需要累加器和元素返回新值累加器。
所以我改变了代码让另一个演员等待3秒。它正在重新工作。
val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right)
private val (ref, future): (ActorRef, Future[String]) = Source.actorRef(3, OverflowStrategy.backpressure).toMat(sinkUnderTest)(Keep.both).run()
ref ! 1
ref ! 2
ref ! 3
Thread.sleep(3000)
ref ! 4
ref ! akka.actor.Status.Success("done")
val result = Await.result(future, 10.seconds)
println(result)
但是,我的问题是,有没有一种方法可以告诉阿卡流减慢或等待水槽可用。我也使用OverflowStrategy.backpressure
,但它表示Backpressure overflowStrategy not supported
。
还有其他的选择吗?
感谢您的回复。你也可以提供代码片段吗?我对来自'Source.queue'的'Enqueued'和'offer'有点困惑。 –
增加了一个例子 –