2
我正在更新一些实验性喷雾代码,以使阿卡流(2.0.2),所以我一点一点地通过他的文档。我的一个需求是,如果我在流中检测到协议违规,我需要立即关闭流并启动客户端。如何关闭阿卡流?
什么是立即关闭(终止)流内流的正确方法?
我正在更新一些实验性喷雾代码,以使阿卡流(2.0.2),所以我一点一点地通过他的文档。我的一个需求是,如果我在流中检测到协议违规,我需要立即关闭流并启动客户端。如何关闭阿卡流?
什么是立即关闭(终止)流内流的正确方法?
使用PushStage
:
import akka.stream.stage._
val closeStage = new PushStage[Tpe, Tpe] {
override def onPush(elem: Tpe, ctx: Context[Tpe]) = elem match {
case elem if shouldCloseStream ⇒
// println("stream closed")
ctx.finish()
case elem ⇒
ctx.push(elem)
}
}
你可以通过transform
方法与Flow
一个PushStage
结合:
Flow[Tpe]
.map(...)
.transform(() ⇒ closeStage)
.map(...)
如果你还在读这些文字,我刚才注意到,在2.4.2 PushStage已弃用。你如何在GraphStage中完成这项工作?我试过了completeStage,但那不是。 –
我添加了一个新问题:http://stackoverflow.com/questions/35544856/closing-an-akka-stream-2-4-2 –