2017-03-25 38 views
1

我需要编写一个GraphStage,但遇到一些问题。 我把代码压缩到了下面,希望大家可以为我解释一下。Akka Streams的问题GraphStage

下面的示例代码不是我真正的用例,它只是在这里展示我的观点。希望这是我不了解阿卡流,而不是它的局限性。

示例代码使用WrapFlowShape构建一个Graph,并基本上将图的“in”重定向到附加流的in和out的流出。

import akka.actor.ActorSystem 
import akka.stream._ 
import akka.stream.javadsl.RunnableGraph 
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source} 
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler} 

import scala.collection.immutable 
import scala.io.StdIn 

object WrapFlowSandbox extends App { 
    case class WrapFlowShape[I, O](
     in: Inlet[I], 
     out: Outlet[O], 
     flowIn: Inlet[O], 
     flowOut: Outlet[I]) extends Shape { 
    val inlets: immutable.Seq[Inlet[_]] = in :: flowIn :: Nil 
    val outlets: immutable.Seq[Outlet[_]] = out :: flowOut :: Nil 
    def deepCopy = WrapFlowShape(in.carbonCopy, out.carbonCopy, flowIn.carbonCopy, flowOut.carbonCopy) 
    } 
    class WrapFlow[I, O] extends GraphStage[WrapFlowShape[I, O]] { 
    val in: Inlet[I] = Inlet[I]("WrapFlow.in") 
    val out: Outlet[O] = Outlet[O]("WrapFlow.out") 
    val flowIn: Inlet[O] = Inlet[O](s"Select.flowIn") 
    val flowOut: Outlet[I] = Outlet[I](s"Select.flowOut") 
    val shape: WrapFlowShape[I, O] = WrapFlowShape(in, out, flowIn, flowOut) 
    def createLogic(initialAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { 
     var inElem: I = _ 
     setHandler(in, new InHandler { 
     def onPush = { 
      println("2 in.onPush") 
      inElem = grab(in) 
      pull(flowIn) 
     } 
     }) 
     setHandler(out, new OutHandler { 
     def onPull = { 
      println("1 out.onPull") 
      pull(in) 
     } 
     }) 
     setHandler(flowIn, new InHandler { 
     def onPush = { 
      println("4 flowIn.onPush") 
      val outElem = grab(flowIn) 
      push(out, outElem) 
     } 
     }) 
     setHandler(flowOut, new OutHandler { 
     def onPull = { 
      println("3 flowOut.onPull") 
      push(flowOut, inElem) 
     } 
     }) 
    } 
    } 
    implicit val system = ActorSystem() 
    implicit val materializer = ActorMaterializer() 
    val flow = Flow[Int].map(_ + 1) 
    RunnableGraph.fromGraph(GraphDSL.create() { implicit b => 
    import GraphDSL.Implicits._ 
    val select = b.add(new WrapFlow[Int, Int]) 
    Source.single(1) ~> select.in 
    select.out ~> Sink.foreach[Int](r => println(s"result = $r")) 
    select.flowOut ~> flow ~> select.flowIn 
    ClosedShape 
    }).run(materializer) 
    StdIn.readLine 
    system.terminate 
} 

我期望看到输出为:

1 out.onPull 
2 in.onPush 
3 flowOut.onPull 
4 flowIn.onPush 
result = 2 

但实际输出只是第一3行:

1 out.onPull 
2 in.onPush 
3 flowOut.onPull 

InHandler.onPush()为 “此处流动” 是从未打过电话

我知道用这种方式编写GraphStage是非常规的,但我确实需要它。

令我百思不解的是,我生成了附着的流需求进行第二步(拉(此处流动)), 并依次连接的流量就可以拉生成“剂流出”在步骤3

需求

但在步骤3中通过flowOut推送一个元素之后,该元素从未被推送,因此从未执行过步骤4。

这是为什么?

如果附加流程在下游检测到需求并在步骤3生成需求上游,为什么步骤3中推送的元素没有通过附加流?

+0

Btw。您的形状已经在akka中定义为“BidiShape”。 – jrudolph

回答

1

不确定我遵循处理程序中的逻辑。我把它们修改为根据我从你的GraphDSL.create()内容如下理解:

def createLogic(initialAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { 
    var inElem: I = _ 
    setHandler(in, new InHandler { 
    def onPush = { 
     println("in.onPush") 
     inElem = grab(in) 
     push(flowOut, inElem) 
    } 
    }) 
    setHandler(out, new OutHandler { 
    def onPull = { 
     println("out.onPull") 
     pull(flowIn) 
    } 
    }) 
    setHandler(flowIn, new InHandler { 
    def onPush = { 
     println("flowIn.onPush") 
     val outElem = grab(flowIn) 
     push(out, outElem) 
    } 
    }) 
    setHandler(flowOut, new OutHandler { 
    def onPull = { 
     println("flowOut.onPull") 
     pull(in) 
    } 
    }) 
} 

执行它应该会产生以下的输出:

out.onPull 
flowOut.onPull 
in.onPush 
flowIn.onPush 
result = 2 

注意到方法copyFromPorts()是不是在你的WrapFlowShape情况下覆盖类(这不是一个抽象类)。我相信你会需要覆盖它像下面这样的东西:

override def copyFromPorts(
    inlets: immutable.Seq[Inlet[_]], 
    outlets: immutable.Seq[Outlet[_]]) = { 
    WrapFlowShape[I, O](
    inlets(0).as[I], 
    outlets(0).as[O], 
    inlets(1).as[O], 
    outlets(1).as[I]) 
} 
+0

感谢您的答案。它确实有效,但我的用例比这更复杂一点。我简化了它,但实际上,有多个flowIns和flowOuts。在我知道哪个flowIns要拉动之前,我需要得到inElem。所以我不能只把他们全部拉进out.onPull。 –

+0

顺便说一句,因为Shape是一个抽象类,deepCopy也是抽象类,所以我不需要重写。 –