2017-08-30 25 views
0

这里是akka documentation水槽倍的阿卡流Source.actorRef缓冲和OverflowStrategy

val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right) 

val (ref, future) = Source.actorRef(3, OverflowStrategy.fail) 
    .toMat(sinkUnderTest)(Keep.both).run() 

ref ! 1 
ref ! 2 
ref ! 3 
ref ! akka.actor.Status.Success("done") 

val result = Await.result(future, 3.seconds) 
assert(result == "123") 

的代码片段是一个工作的代码片段,但是,如果我用裁判来告诉另一个消息像ref ! 4,我一个例外如akka.stream.BufferOverflowException: Buffer overflow (max capacity was: 3)

我想缓冲区大小3应该足够了。原因在于折叠操作是(acc,ele)=> acc,因此需要累加器和元素返回新值累加器。

所以我改变了代码让另一个演员等待3秒。它正在重新工作。

val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right) 

    private val (ref, future): (ActorRef, Future[String]) = Source.actorRef(3, OverflowStrategy.backpressure).toMat(sinkUnderTest)(Keep.both).run() 

    ref ! 1 
    ref ! 2 
    ref ! 3 
    Thread.sleep(3000) 
    ref ! 4 
    ref ! akka.actor.Status.Success("done") 

    val result = Await.result(future, 10.seconds) 

    println(result) 

但是,我的问题是,有没有一种方法可以告诉阿卡流减慢或等待水槽可用。我也使用OverflowStrategy.backpressure,但它表示Backpressure overflowStrategy not supported

还有其他的选择吗?

回答

4

您应该考虑将Source.queue作为一种将元素从外部注入流中的方式,并以背压感知的方式注入。

Source.queue将实现为您可以提供元素的队列对象,但是当您提供它们时,您将返回Future,当流准备好接受消息时,该对象将完成。下面

实施例:在docs

val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right) 

    val (queue, future): (SourceQueueWithComplete[Int], Future[String]) = 
    Source.queue(3, OverflowStrategy.backpressure).toMat(sinkUnderTest)(Keep.both).run() 

    Future.sequence(Seq(
    queue.offer(1), 
    queue.offer(2), 
    queue.offer(3), 
    queue.offer(4) 
)) 

    queue.complete() 

    val result = Await.result(future, 10.seconds) 

    println(result) 

更多信息。

+0

感谢您的回复。你也可以提供代码片段吗?我对来自'Source.queue'的'Enqueued'和'offer'有点困惑。 –

+0

增加了一个例子 –