2012-12-07 37 views
1

在线程抛出异常的情况下,我该如何等待,直到没有抛出异常的所有线程都已完成(因此用户在所有事情都停止之前不会再次启动)?我使用GPars以几种不同的方式,所以我需要为每个(并行集合,异步闭包和fork/join)使用策略。例外情况没有被埋没,他们很好地通过承诺,getChildrenResults等处理,所以这不是问题(感谢Vaclav Pech的答案)。我只需要确保主线程等待,直到任何仍在运行的东西完成或以其他方式停止。例如,当使用并行集合时,一些线程继续运行,而另一些线​​程在异常之后永远不会启动。因此,很难说出有多少人在等待,或者有可能抓住他们。GPars如何知道抛出异常时所有线程都已完成?

我的猜测是也许有一种方法来处理线程池(在这种情况下GParsPool)。有什么建议么?

谢谢!

回答

3

我相信我有这个问题的解决方案,我实现了它在全面测试后的应用程序和它的作品。

withPool closure作为第一个参数传入创建的池(a jsr166y.ForkJoinPool)。我可以抓住这一点,保存它关闭在一个变量(currentPool),要由主线程以后使用,就像这样:

GParsPool.withPool { pool -> 
     currentPool = pool 

当一个异常被抛出,并返回到主线程对于处理,我可以把它等到一切都完了,像这样的东西:

} catch (Exception exc) { 
     if (currentPool) { 
      while (!currentPool.isQuiescent()) { 
       Thread.sleep(100) 
       println 'waiting for threads to finish' 
      } 
     } 

     println 'all done' 
    } 

的isQuiescent()似乎是要确保有没有做更多的工作,以安全的方式。

请注意,在测试过程中,我还发现异常似乎并不像我原先想象的那样终止循环的执行。如果我有一个500的列表,并且做了eachParallel,他们都会跑,不管第一个是否有错误。所以我必须通过在并行循环的异常处理程序中使用currentPool.shutdownNow()来终止循环。另请参见:GPars - proper way to terminate a parallel collection early

下面是实际的解决方案的完整简化表示:

void example() { 
    jsr166y.ForkJoinPool currentPool 

    AtomicInteger threadCounter = new AtomicInteger(0) 
    AtomicInteger threadCounterEnd = new AtomicInteger(0) 

    AtomicReference<Exception> realException = new AtomicReference<Exception>() 

    try { 
     GParsPool.withPool { pool -> 
      currentPool = pool 

      (1..500).eachParallel { 
       try { 
        if (threadCounter.incrementAndGet() == 1) { 
         throw new RuntimeException('planet blew up!') 
        } 

        if (realException.get() != null) { 
         // We had an exception already in this eachParallel - quit early 
         return 
        } 

        // Do some long work 
        Integer counter=0 
        (1..1000000).each() {counter++} 

        // Flag if we went all the way through 
        threadCounterEnd.incrementAndGet() 
       } catch (Exception exc) { 
        realException.compareAndSet(null, exc) 

        pool.shutdownNow() 
        throw realException 
       } 
      } 
     } 
    } catch (Exception exc) { 
     // If we used pool.shutdownNow(), we need to look at the real exception. 
     // This is needed because pool.shutdownNow() sometimes generates a CancellationException 
     // which can cover up the real exception that caused us to do a shutdownNow(). 
     if (realException.get()) { 
      exc = realException.get() 
     } 

     if (currentPool) { 
      while (!currentPool.isQuiescent()) { 
       Thread.sleep(100) 
       println 'waiting for threads to finish' 
      } 
     } 

     // Do further exception handling here... 
     exc.printStackTrace() 
    } 
} 

回到我前面的例子,如果我抛出异常一日一次通过一个4核的机器上,有大约5个线程排队。 shutdownNow()会在大约20个左右的线程通过后停止工作,所以在顶端附近进行“退出提前”检查可以帮助那些20个或更多的线程尽快退出。

只是在这里发布它以防万一它帮助别人,作为对我在这里得到的所有帮助的回报。谢谢!

2

我相信你需要捕捉异常,然后返回除预期结果之外的其他内容(例如String或null,如果你期待一个数字例如),即;

@Grab('org.codehaus.gpars:gpars:0.12') 
import static groovyx.gpars.GParsPool.* 

def results = withPool { 
    [1,2,3].collectParallel { 
    try { 
     if(it % 2 == 0) { 
     throw new RuntimeException('2 fails') 
     } 
     else { 
     Thread.sleep(2000) 
     it 
     } 
    } 
    catch(e) { e.class.name } 
    } 
} 
+0

在我的示例中,结果将包含传递线程的整数,以及引发异常的字符串。所有线程将完成... –

+0

好的,我看到断开连接的位置。假设您的集合中有1,000个项目要迭代。如果抛出一个异常,通常会发生的事情是组被中断 - 它会停止触发线程,这是我想要的,并且控制权返回到主线程。所以不是每个线程都会被解雇。但是,在主线程中,仍然有线程在异常之前启动,但尚未完成清理。那有意义吗? – user1373467

+0

现在,您的建议确实给了我一个想法 - 我可以保留多少实际被解雇并跟踪完成的数量。我只是希望更简单一些。但我很欣赏它,它是有用的+1 – user1373467

相关问题