2016-09-19 18 views
0

我试图运行一个简单的Akka Stream File Sink示例,但没有成功。我可以创建一个Source,运行Flow,然后创建一个文件,但ByteString不会写入文件。而如果我尝试将流量输出打印到控制台,我可以这样做。我在这里错过了什么吗?Akka Stream:无法写入文件接收器

import akka.stream._ 
import akka.stream.scaladsl._ 
import akka.{ NotUsed, Done} 
import akka.actor.ActorSystem 
import akka.util.ByteString 
import scala.concurrent._ 
import scala.concurrent.duration._ 
import java.nio.file.Paths 

object First extends App { 

    val source: Source[Int, NotUsed] = Source (1 to 100) 

    implicit val system = ActorSystem("QuickStart") 
    implicit val materializer = ActorMaterializer() 

    // works: prints 1-100 
    //source.runForeach(println) (materializer) 

    val factorials = source.scan(BigInt(1))((acc,next) => acc * next) 

    // there is no content in the Sink (file) 
    /**val result = 
    factorials 
    .map(num => ByteString(s"${num}\n")) 
    .runWith(FileIO.toPath(Paths.get("factorials.txt"))) 
**/ 

    def lineSink(fileName: String): Sink[String, Future[IOResult]] = 
    Flow[String] 
    .map(s => ByteString(s + "\n")) 
    .toMat(FileIO.toPath(Paths.get(fileName))) (Keep.right) 

    //There is no content in the Sink. 
    factorials.map(_.toString).runWith(lineSink("factorials.txt")) 

system.terminate() 

} 

build.sbt有:提前为你的时间

name := "akkaGuide" 
    version := "1.0" 
    scalaVersion := "2.11.8" 
    libraryDependencies ++= Seq(
     "com.typesafe.akka" %% "akka-stream" % "2.4.10" 
    ) 

感谢。

回答

6

我想你可能会过早地终止。请等待,直到Future完成:

val result = factorials.map(_.toString).runWith(lineSink("factorials.txt")) 
import system.dispatcher 
result.onComplete { _ => system.terminate() } 
+0

谢谢Brian Kent。有效 ! – Raxbangalore