2017-09-01 59 views
3

下面是使用最简单的图形中的PartitionMerge我能想出的,但运行时,它提供了以下错误: requirement failed: The inlets [] and outlets [] must correspond to the inlets [Merge.in0, Merge.in1] and outlets [Partition.out0, Partition.out1]阿卡流入口和出口匹配

据我所知,消息表明,我不是有比输入更多的输出或者没有连接的流量,但是在这个不匹配的简单例子中,我看不出来。

任何帮助表示赞赏。

图表:

def createGraph()(implicit actorSystem: ActorSystem): Graph[ClosedShape, Future[Done]] = { 
     GraphDSL.create(Sink.ignore) { implicit builder: GraphDSL.Builder[Future[Done]] => s => 
      import GraphDSL.Implicits._ 
      val inputs: List[Int] = List(1, 2, 3, 4) 
      val source: Source[Int, NotUsed] = Source(inputs) 

      val messageSplit: UniformFanOutShape[Int, Int] = builder.add(Partition[Int](2, i => i%2)) 

      val messageMerge: UniformFanInShape[Int, Int] = builder.add(Merge[Int](2)) 

      val processEven: Flow[Int, Int, NotUsed] = Flow[Int].map(rc => { 
      actorSystem.log.debug(s"even: $rc") 
      rc 
      }) 

      val processOdd: Flow[Int, Int, NotUsed] = Flow[Int].map(rc => { 
      actorSystem.log.debug(s"odd: $rc") 
      rc 
      }) 

      source ~> messageSplit.in 
      messageSplit.out(0) -> processEven -> messageMerge.in(0) 
      messageSplit.out(1) -> processOdd -> messageMerge.in(1) 
      messageMerge.out ~> s 
      ClosedShape 
     } 
    } 

测试:

import akka.actor.ActorSystem 
import akka.stream._ 
import akka.stream.scaladsl.{Flow, GraphDSL, Merge, Partition, RunnableGraph, Sink, Source} 
import akka.{Done, NotUsed} 
import org.scalatest.FunSpec 

import scala.concurrent.Future 
class RoomITSpec extends FunSpec { 

    implicit val actorSystem: ActorSystem = ActorSystem("RoomITSpec") 
    implicit val actorCreator: ActorMaterializer = ActorMaterializer() 
    describe("graph") { 
    it("should run") { 
     val graph = createGraph() 
     RunnableGraph.fromGraph(graph).run 
    } 
    } 
} 

回答

1

小的语法错误。

// Notice the curly arrows 
messageSplit.out(0) ~> processEven ~> messageMerge.in(0) 
messageSplit.out(1) ~> processOdd ~> messageMerge.in(1) 

而不是你写的:

// Straight arrows 
messageSplit.out(0) -> processEven -> messageMerge.in(0) 
messageSplit.out(1) -> processOdd -> messageMerge.in(1) 

你最终生成(和丢弃)元组,而不是增加了图形。

+0

非常感谢! –