2016-04-05 53 views
7

我在我的现实生活中遇到了这个问题,并通过我的测试代码和分析器进行了验证。我不是粘贴“tl; dr”代码,而是向您展示一张图片然后描述它。 enter image description here关于Future.firstCompletedOf和垃圾收集机制

简单地说,我使用Future.firstCompletedOf摆脱2个Future S,这两者有没有共同的东西,不关心对方的结果。即使这是我想要解决的问题,垃圾收集器无法回收第一个对象Result,直到Future完成

所以我很好奇背后的机制。有人可以从较低的层面解释它,或者提供一些暗示让我去研究。

谢谢!

PS:是因为他们共享相同的ExecutionContext

** **更新粘贴测试代码的要求

object Main extends App{ 
    println("Test start") 

    val timeout = 30000 

    trait Result { 
    val id: Int 
    val str = "I'm short" 
    } 
    class BigObject(val id: Int) extends Result{ 
    override val str = "really big str" 
    } 

    def guardian = Future({ 
    Thread.sleep(timeout) 
    new Result { val id = 99999 } 
    }) 

    def worker(i: Int) = Future({ 
    Thread.sleep(100) 
    new BigObject(i) 
    }) 

    for (i <- Range(1, 1000)){ 
    println("round " + i) 
    Thread.sleep(20) 
    Future.firstCompletedOf(Seq(
     guardian, 
     worker(i) 
    )).map(r => println("result" + r.id)) 
    } 

    while (true){ 
    Thread.sleep(2000) 
    } 
} 
+0

我很好奇你如何设法证明“结果”不能被垃圾收集,因为我会说相反,它可能很有趣。也许添加更多关于你如何验证这个的细节? –

+0

显示代码。没有它可能会发生什么是几乎不可能的。 –

+0

实际上,这个问题是一个普遍的问题,并不取决于具体的用例,所以很有可能在没有进一步细节的情况下回答。 –

回答

9

让我们看看如何firstCompletedOf实现:

def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = { 
    val p = Promise[T]() 
    val completeFirst: Try[T] => Unit = p tryComplete _ 
    futures foreach { _ onComplete completeFirst } 
    p.future 
} 

在做{ futures foreach { _ onComplete completeFirst },功能{ _ onComplete completeFirst }通过ExecutionContext.execute地方 保存。确切地说,保存的这个函数是无关紧要的,我们只知道它必须保存在某个地方 ,以便稍后可以选择并在线程变得可用时在线程池中执行。

该功能关闭completeFirst,关闭p。 所以只要还有一个未来(从futures)等待完成,还有就是p参考,防止它被收集(即使到那个时候有机会,firstCompletedOf已经返回垃圾,从去除p堆栈)。

当第一个未来完成时,它将结果保存到承诺中(通过调用p.tryComplete)。 因为承诺p保留结果,所以至少只要p是可再生的就可以得到结果,并且正如我们所看到的p只要从futures未来至少一次未完成就可以达到。 这就是为什么在所有期货完成前无法收集结果的原因。

UPDATE: 现在的问题是:它可以修复吗?我认为它可以。我们所要做的就是确保第一个将来以线程安全的方式“完成”对p的引用,这可以通过使用AtomicReference实例来完成。事情是这样的:

def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = { 
    val p = Promise[T]() 
    val pref = new java.util.concurrent.atomic.AtomicReference(p) 
    val completeFirst: Try[T] => Unit = { result: Try[T] => 
    val promise = pref.getAndSet(null) 
    if (promise != null) { 
     promise.tryComplete(result) 
    } 
    } 
    futures foreach { _ onComplete completeFirst } 
    p.future 
} 

我已经测试过它,并预期它允许将结果尽快为第一完成未来被垃圾收集。它应该在所有其他方面表现一致。

+0

感谢您为我完成这件事,我盯着'firstCompletedOf'很长一段时间,无法想象。而且,结论还是违背直觉的,不知道是否有人曾经抱怨过它...... – noru

+0

我已经添加了一个可以解决这种情况的替代实现。让我知道它是否适用于您(这可能需要对标准库进行拉取请求)。 –

+0

它工作正常,因为我观察。线程仍然被占用,但那完全是另一回事。谢谢你的帮助! – noru