2016-12-02 20 views
4

来自node.js背景,我是Scala的新手,我尝试使用Twitter的Future.collect来执行一些简单的并发操作。但是我的代码显示了顺序行为而不是并发行为。我究竟做错了什么?Twitter的Future.collect不能同时工作(Scala)

这里是我的代码,

import com.twitter.util.Future 

def waitForSeconds(seconds: Int, container:String): Future[String] = Future[String] { 
    Thread.sleep(seconds*1000) 
    println(container + ": done waiting for " + seconds + " seconds") 
    container + " :done waiting for " + seconds + " seconds" 
} 

def mainFunction:String = { 
    val allTasks = Future.collect(Seq(waitForSeconds(1, "All"), waitForSeconds(3, "All"), waitForSeconds(2, "All"))) 
    val singleTask = waitForSeconds(1, "Single") 

    allTasks onSuccess { res => 
    println("All tasks succeeded with result " + res) 
    } 

    singleTask onSuccess { res => 
    println("Single task succeeded with result " + res) 
    } 

    "Function Complete" 
} 

println(mainFunction) 

,这是输出我得到的,

All: done waiting for 1 seconds 
All: done waiting for 3 seconds 
All: done waiting for 2 seconds 
Single: done waiting for 1 seconds 
All tasks succeeded with result ArraySeq(All :done waiting for 1 seconds, All :done waiting for 3 seconds, All :done waiting for 2 seconds) 
Single task succeeded with result Single :done waiting for 1 seconds 
Function Complete 

我想到的是输出,

All: done waiting for 1 seconds 
Single: done waiting for 1 seconds 
All: done waiting for 2 seconds 
All: done waiting for 3 seconds 
All tasks succeeded with result ArraySeq(All :done waiting for 1 seconds, All :done waiting for 3 seconds, All :done waiting for 2 seconds) 
Single task succeeded with result Single :done waiting for 1 seconds 
Function Complete 
+0

那么'Future'的整个点就是将计算卸载到另一个线程。因此,在mainFunction的第一行中,将产生三个新线程,这些线程(当生成的线程完成时)会产生另一个线程来执行oncomplete函数。 在第二行中,生成另一个线程,并将与其他线程并行执行。所以我认为实际产出似乎有效... – irundaia

回答

8

Twitter的未来是更加明确,其中计算是比Scala标准库期货执行的。特别是,Future.apply将安全地捕获异常(如s.c.Future),但它没有说明计算将运行在哪个线程上。在您的情况下,计算在主线程中运行,这就是为什么您会看到结果你看到了。

这种方法与标准库的未来API相比有几个优点。一方面,它使方法签名更简单,因为不存在必须在各处传递的隐含的ExecutionContext。更重要的是,它更容易避免上下文切换(Brian Degenhardt的here's a classic explanation)。在这方面,Twitter的Future更像斯卡拉斯的Task,并且具有基本相同的性能优点(例如在this blog post中描述)。

更明确地指出计算运行的地方在于,您必须更清楚地了解计算的运行位置。你的情况,你可以写这样的事情:

import com.twitter.util.{ Future, FuturePool } 

val pool = FuturePool.unboundedPool 

def waitForSeconds(seconds: Int, container:String): Future[String] = pool { 
    Thread.sleep(seconds*1000) 
    println(container + ": done waiting for " + seconds + " seconds") 
    container + " :done waiting for " + seconds + " seconds" 
} 

这不会产生正是你要求的输出(“功能齐全”将首先打印,allTaskssingleTask不是对于测序彼此),但它会在单独的线程上并行运行任务。

(作为一个注脚:在上面我的例子FuturePool.unboundedPool是创建一个演示未来池的简单方法,而且往往是蛮好的,但它是不适合的CPU密集型的计算,看看the FuturePool API docs其他创建未来池的方法将使用您提供的并可以自己管理的ExecutorService。)

+1

非常好! 我用你的代码,并在最后一行添加了一个Thread.sleep(6000),现在我看到了并发行为。进行更改后的输出, '功能Complete' '所有:完成等待1 seconds' '单:做等待1 seconds' :完成等待1 seconds' '单任务与结果单成功'全部:等待2秒钟' '全部:等待3秒钟' '所有任务都成功结果ArraySeq(全部:完成等待1秒,全部:完成等待3秒,全部:完成等待2秒秒)' – Ram

+2

添加一点点。 Twitter最近发布的文档解释了他们背后的一些设计决策:https://github.com/twitter/finagle/blob/develop/doc/src/sphinx/developers/Futures.rst –

+1

(com.twitter .util。)Await.result(allTask​​s)可能会更好,而不是Thread.sleep(6000) – n4to4