2015-09-19 17 views
0

我玩过Akka-Streams,我试图通过执行我自己的PushPullStage来自定义Flow。我希望Flow将它从上游接收到的对象累积到列表中,并在上游完成时向下游发送组之前,根据某些功能对它们进行分组。从PushPullStage发出多个对象

这似乎是一个非常简单的事情来实现,但我无法弄清楚如何做到这一点!似乎没有办法从PushPullStage发出多个对象。

这是到目前为止我的执行:

class Accumulate[A] extends PushPullStage[A, List[A]] { 
    private var groups: List[List[A]] = Nil 

    private def group(x: A): List[List[A]] = ... 

    override def onPush(elem: A, ctx: Context[A]): SyncDirective = { 
     groups = group(elem) 
     ctx.pull() 
    } 

    override def onPull(ctx: Context[A]): SyncDirective = 
     if (ctx.isFinishing) { 
     for(group <- groups) 
      ctx.push(group) // this doesn't work 

     ctx.finish() 
     } else { 
     ctx.pull() 
     } 

    override def onUpstreamFinish(ctx: Context[A]): TerminationDirective = 
     ctx.absorbTermination() 
    } 
} 

编辑

我改变了代码占brackpressure和它现在所有工作。基本上我只是需要让下游Flow的做他们意味着什么,并保持牵引要素:

class Accumulate[A] extends PushPullStage[A, List[A]] { 
    private var groups: List[List[A]] = Nil 

    private def group(x: A): List[List[A]] = ... 

    override def onPush(elem: A, ctx: Context[A]): SyncDirective = { 
     groups = group(elem) 
     ctx.pull() 
    } 

    override def onPull(ctx: Context[A]): SyncDirective = 
     if (ctx.isFinishing) { 
     groups match { 
      case Nil => ctx.finish() 

      case head :: tail => 
      groups = tail 
      ctx.push(head) 
     } 
     } else { 
     ctx.pull() 
     } 

    override def onUpstreamFinish(ctx: Context[A]): TerminationDirective = 
     ctx.absorbTermination() 
    } 
} 

回答

2

你不能推超过了要求,因为这将违反反压。 另外,值得注意的是,我不会推荐你正在尝试做什么,因为这将爆炸与OutOfMemoryError大或无界流。

class Accumulate[A] extends PushPullStage[A, List[A]] { 
    private var groups: List[List[A]] = Nil 

    private def group(x: A): List[List[A]] = ... 

    override def onPush(elem: A, ctx: Context[A]): SyncDirective = { 
     groups = group(elem) 
     ctx.pull() 
    } 

    override def onPull(ctx: Context[A]): SyncDirective = 
     if (ctx.isFinishing) { 
     groups match { 
      case Nil => ctx.finish() 
      case group :: rest => 
      groups = rest 
      ctx.push(group) 
     } 
     } else { 
     ctx.pull() 
     } 

    override def onUpstreamFinish(ctx: Context[A]): TerminationDirective = 
     ctx.absorbTermination() 
    } 
} 
+0

啊,我没有想到背压。我稍微更改了代码,现在按预期工作(请参阅我的编辑)。谢谢 :) – Oli