2016-03-04 30 views
0

我试图用一个简单的循环创建一个Akka Stream。在阅读文档here并且没有运气的情况下,我尝试将示例代码复制为起始基础,但这也不起作用。代码编译(在包含示例中缺少的源代码之后),但没有打印出来。它看起来好像有些东西一直反压,但我不明白为什么。Akka Streams文档中的循环示例不起作用

这里是我的代码,任何帮助将是非常赞赏:

import akka.actor.ActorSystem 
import akka.stream.{ActorMaterializer, ActorMaterializerSettings} 
import akka.stream.scaladsl._ 
import akka.stream.ClosedShape 

object Simulate { 
    implicit val system = ActorSystem() 
    implicit val materializer = ActorMaterializer() 

    def main(args: Array[String]): Unit = { 

    // Define simulation flowgraph 
    val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit b => 
     import b._ 
     import GraphDSL.Implicits._ 

     val source = add(Source.repeat[Int](1)) 
     val zip  = add(ZipWith[Int, Int, Int]((left, right) => left)) 
     val bcast = add(Broadcast[Int](2)) 
     val concat = add(Concat[Int]()) 
     val start = add(Source.single[Int](0)) 
     val sink = add(Sink.ignore) 

     source ~> zip.in0 
       zip.out.map { s => println(s); s } ~> bcast ~> sink 
          concat  <~   bcast 
       zip.in1 <~ concat  <~   start 
     ClosedShape 
    }) 

    g.run() 

    } 
} 

回答

1

编辑:它实际上看来问题不加入一个缓冲区,但为了入口/被宣布网点。

这工作:

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit b => 
    import GraphDSL.Implicits._ 

    val source = Source.repeat(1) 
    val start = Source.single(0) 

    val zip = b.add(ZipWith((left: Int, right: Int) => left)) 
    val bcast = b.add(Broadcast[Int](2)) 
    val concat = b.add(Concat[Int]()) 

    source ~> zip.in0 
      zip.out.map { s => println(s); s } ~> bcast ~> Sink.ignore 
      zip.in1 <~ concat <~ start 
         concat <~ bcast 
    ClosedShape 
}) 

g.run() 

zip.in1 <~ concat <~ startconcat <~ bcast的顺序与上Docs什么是一致的。

+0

谢谢,我试过了,但不幸的是它不起作用。我得到了同样的东西,没有任何东西流过图表。 – Oli

+0

这很奇怪。我确实得到了它的工作。让我粘贴答案中的所有代码。 – manub

+0

@Oli我已经更新了答案。似乎在将'Broadcast'的出口流入'Concat'之前,为'zip'定义第二个入口是有用的。我不确定这是什么确切的原因。 – manub