2017-02-12 42 views
0

我想跟随在一些差距https://github.com/functional-streams-for-scala/fs2/wiki/Binding-to-asynchronous-processes从队列中scalaz

灌装的第一个例子创建过程,并增加了一些调试打印我到了下面的代码:

import java.util.concurrent.ScheduledExecutorService 
import scala.concurrent.{Await, Future} 
import scala.concurrent.duration._ 
import scalaz.concurrent.Task 
import scalaz.stream.async.mutable.Queue 
import scalaz.stream.{Process, Sink} 

object ProcessTest { 

    def main(args: Array[String]): Unit = { 
    import scala.concurrent.ExecutionContext.Implicits.global 
    import scalaz.stream.async 

    val q: Queue[Int] = async.unboundedQueue[Int] 
    val src: Process[Task, Int] = q.dequeue 

    // Thread 1 
    val f1 = Future { 
     for (i <- 0 to 10) { 
     println(s"enqueueOne $i") 
     Thread.sleep(100) 
     q.enqueueOne(i) 
     } 
     println("closing") 
     q.close 
     println("closed") 
    } 

    // Thread 2 
    val f2 = Future { 
     val buf = new collection.mutable.ArrayBuffer[Int] 
     val snk: Sink[Task, Int] = scalaz.stream.io.fillBuffer(buf) 
     val run: Task[Unit] = src.map(x => { 
     println(s"map $x") 
     x 
     }).to(snk).run 
     println("running") 
     run.get.runFor(3.seconds) 
     println(s"result = ${buf.toList}") 
    } 

    Await.result(f1, 10.seconds) 
    Await.result(f2, 10.seconds) 
    } 
} 

当我尝试运行此,线程2中没有收到任何内容:

enqueueOne 0 
running 
enqueueOne 1 
enqueueOne 2 
enqueueOne 3 
enqueueOne 4 
enqueueOne 5 
enqueueOne 6 
enqueueOne 7 
enqueueOne 8 
enqueueOne 9 
enqueueOne 10 
closing 
closed 
[error] (run-main-9) java.util.concurrent.TimeoutException 

我做错了什么? 这个阻塞在哪里?

(我用scalaz流0.8.6)

回答

1

好吧,我发现这个问题:enqueueOneclose回报任务,必须运行:

// Thread 1 
val f1 = Future { 
    for (i <- 0 to 10) { 
    println(s"enqueueOne $i") 
    Thread.sleep(100) 
    q.enqueueOne(i).run 
    } 
    println("closing") 
    q.close.run 
    println("closed") 
}