2016-01-22 65 views
2

我正在更新一些实验性喷雾代码,以使阿卡流(2.0.2),所以我一点一点地通过他的文档。我的一个需求是,如果我在流中检测到协议违规,我需要立即关闭流并启动客户端。如何关闭阿卡流?

什么是立即关闭(终止)流内流的正确方法?

回答

3

使用PushStage

import akka.stream.stage._ 

val closeStage = new PushStage[Tpe, Tpe] { 
    override def onPush(elem: Tpe, ctx: Context[Tpe]) = elem match { 
    case elem if shouldCloseStream ⇒ 
     // println("stream closed") 
     ctx.finish() 
    case elem ⇒ 
     ctx.push(elem) 
    } 
} 

你可以通过transform方法与Flow一个PushStage结合:

Flow[Tpe] 
.map(...) 
.transform(() ⇒ closeStage) 
.map(...) 
+1

如果你还在读这些文字,我刚才注意到,在2.4.2 PushStage已弃用。你如何在GraphStage中完成这项工作?我试过了completeStage,但那不是。 –

+0

我添加了一个新问题:http://stackoverflow.com/questions/35544856/closing-an-akka-stream-2-4-2 –