0
我玩过Akka-Streams,我试图通过执行我自己的PushPullStage
来自定义Flow
。我希望Flow
将它从上游接收到的对象累积到列表中,并在上游完成时向下游发送组之前,根据某些功能对它们进行分组。从PushPullStage发出多个对象
这似乎是一个非常简单的事情来实现,但我无法弄清楚如何做到这一点!似乎没有办法从PushPullStage
发出多个对象。
这是到目前为止我的执行:
class Accumulate[A] extends PushPullStage[A, List[A]] {
private var groups: List[List[A]] = Nil
private def group(x: A): List[List[A]] = ...
override def onPush(elem: A, ctx: Context[A]): SyncDirective = {
groups = group(elem)
ctx.pull()
}
override def onPull(ctx: Context[A]): SyncDirective =
if (ctx.isFinishing) {
for(group <- groups)
ctx.push(group) // this doesn't work
ctx.finish()
} else {
ctx.pull()
}
override def onUpstreamFinish(ctx: Context[A]): TerminationDirective =
ctx.absorbTermination()
}
}
编辑
我改变了代码占brackpressure和它现在所有工作。基本上我只是需要让下游Flow
的做他们意味着什么,并保持牵引要素:
class Accumulate[A] extends PushPullStage[A, List[A]] {
private var groups: List[List[A]] = Nil
private def group(x: A): List[List[A]] = ...
override def onPush(elem: A, ctx: Context[A]): SyncDirective = {
groups = group(elem)
ctx.pull()
}
override def onPull(ctx: Context[A]): SyncDirective =
if (ctx.isFinishing) {
groups match {
case Nil => ctx.finish()
case head :: tail =>
groups = tail
ctx.push(head)
}
} else {
ctx.pull()
}
override def onUpstreamFinish(ctx: Context[A]): TerminationDirective =
ctx.absorbTermination()
}
}
啊,我没有想到背压。我稍微更改了代码,现在按预期工作(请参阅我的编辑)。谢谢 :) – Oli