2012-06-07 142 views
25

我的问题:如何在ThreadPoolExecutor上执行一堆线程对象并等待它们全部完成后才能继续?如何等待ThreadPoolExecutor完成

我是ThreadPoolExecutor的新手。所以这段代码是一个测试它是如何工作的。现在我甚至没有填写对象BlockingQueue,因为我不知道如何在不调用​​与另一个RunnableObject的情况下启动队列。无论如何,现在我只需拨打awaitTermination(),但我认为我仍然错过了一些东西。任何提示都会很棒!谢谢。

public void testThreadPoolExecutor() throws InterruptedException { 
    int limit = 20; 
    BlockingQueue q = new ArrayBlockingQueue(limit); 
    ThreadPoolExecutor ex = new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q); 
    for (int i = 0; i < limit; i++) { 
    ex.execute(new RunnableObject(i + 1)); 
    } 
    ex.awaitTermination(2, TimeUnit.SECONDS); 
    System.out.println("finished"); 
} 

的RunnableObject类:

package playground; 

public class RunnableObject implements Runnable { 

    private final int id; 

    public RunnableObject(int id) { 
    this.id = id; 
    } 

    @Override 
    public void run() { 
    System.out.println("ID: " + id + " started"); 
    try { 
     Thread.sleep(2354); 
    } catch (InterruptedException ignore) { 
    } 
    System.out.println("ID: " + id + " ended"); 
    } 
} 
+1

,我不认为这是一个好主意,回答你的问题,你张贴和建议后,立即您自己的答案不太合适。充其量,这保证和更新你的原始问题,解释为什么这个答案是不够的。 – Kiril

+1

@Link,你是对的。我会解决它。 – kentcdodds

+0

我刚刚编辑了我的答案,以显示在关闭执行程序之前等待所有作业完成的一种方式。 – Gray

回答

43

你应该awaitTermination

ExecutorService threads; 
// ... 
// Tell threads to finish off. 
threads.shutdown(); 
// Wait for everything to finish. 
while (!threads.awaitTermination(10, TimeUnit.SECONDS)) { 
    log.info("Awaiting completion of threads."); 
} 
+0

完全做到了。我没有注意到'awaitTermination'返回任何东西。没有其他人提到我需要通过循环来阻止。谢谢! – kentcdodds

+9

为什么在一个循环?为什么不增加等待的秒数?通常我会执行一个'awaitTermination(Long.MAX_VALUE,TimeUnit.SECONDS)'或其他。 – Gray

+1

@Gray - 我最初只是在我的一段代码中调用过它,或者有时候出现了一些非常奇怪的事情,或者有时候一切都被锁定了。最终我追踪到这一点,所以现在我循环并记录一个“等待”消息,以便我知道发生了一些奇怪的事情。 – OldCurmudgeon

4

您的问题似乎是,你是不是您所提交的所有作业,以您的池后,调用shutdown。如果没有shutdown(),您的awaitTermination将始终返回false。

ThreadPoolExecutor ex = 
    new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q); 
for (int i = 0; i < limit; i++) { 
    ex.execute(new RunnableObject(i + 1)); 
} 
// you are missing this line!! 
ex.shutdown(); 
ex.awaitTermination(2, TimeUnit.SECONDS); 

你也可以做类似以下内容等待您的所有作业完成:

List<Future<Object>> futures = new ArrayList<Future<Object>>(); 
for (int i = 0; i < limit; i++) { 
    futures.add(ex.submit(new RunnableObject(i + 1), (Object)null)); 
} 
for (Future<Object> future : futures) { 
    // this joins with the submitted job 
    future.get(); 
} 
... 
// still need to shutdown at the end 
ex.shutdown(); 

而且,因为你在睡觉的2354毫秒,但只有等待所有的终止2SECONDS,awaitTermination的作业将始终返回false

最后,这听起来像你正在担心创建一个新的ThreadPoolExecutor,而你却想重用第一个。不要。与您编写的用于检测作业是否完成的任何代码相比,GC开销将极小。


从的javadoc引用,ThreadPoolExecutor.shutdown()

发起在以前已提交任务的执行一个有序的关闭,但没有新的任务将被接受。如果已关闭,调用没有其他影响。

ThreadPoolExecutor.awaitTermination(...)方法,它正在等待执行的状态去TERMINATED。但是,如果调用shutdown(),首先状态必须转到SHUTDOWN,如果调用shutdownNow(),则状态必须转到STOP

3

这与执行者本身没有任何关系。只需使用界面的java.util.concurrent.ExecutorService.invokeAll(Collection<? extends Callable<T>>)即可。它会阻止,直到所有的Callable完成。

执行者意味着长寿;超出一组任务的整个生命周期。 shutdown适用于应用程序完成并清理完毕。

+0

你能详细解释一下这个答案吗?我很喜欢这个答案,但我很难实现它。谢谢 – kentcdodds

-1

试试这个,

ThreadPoolExecutor ex = 
    new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q); 
for (int i = 0; i < limit; i++) { 
    ex.execute(new RunnableObject(i + 1)); 
} 

行添加

ex.shutdown(); 
ex.awaitTermination(timeout, unit) 
+0

你应该检查从等待终止返回的布尔值并继续尝试,直到你成为true。 –

1

另一种方法循环是使用CompletionService,如果你有非常有用尝试任何任务结果:

//run 3 task at time 
final int numParallelThreads = 3; 

//I used newFixedThreadPool for convenience but if you need you can use ThreadPoolExecutor 
ExecutorService executor = Executors.newFixedThreadPool(numParallelThreads); 
CompletionService<String> completionService = new ExecutorCompletionService<String>(executor); 

int numTaskToStart = 15; 

for(int i=0; i<numTaskToStart ; i++){ 
    //task class that implements Callable<String> (or something you need) 
    MyTask mt = new MyTask(); 

    completionService.submit(mt); 
} 

executor.shutdown(); //it cannot be queued more task 

try { 
    for (int t = 0; t < numTaskToStart ; t++) { 
     Future<String> f = completionService.take(); 
     String result = f.get(); 
     // ... something to do ... 
    } 
} catch (InterruptedException e) { 
    //termination of all started tasks (it returns all not started tasks in queue) 
    executor.shutdownNow(); 
} catch (ExecutionException e) { 
    // ... something to catch ... 
} 
2

这里有公认的答案,处理重变体如果/当InterruptedException异常被抛出:

executor.shutdown(); 

boolean isWait = true; 

while (isWait) 
{ 
    try 
    {    
     isWait = !executor.awaitTermination(10, TimeUnit.SECONDS); 
     if (isWait) 
     { 
      log.info("Awaiting completion of bulk callback threads."); 
     } 
    } catch (InterruptedException e) { 
     log.debug("Interruped while awaiting completion of callback threads - trying again..."); 
    } 
}