2012-03-30 53 views
3

我遇到了这个并发问题,我一直在挠头我的好几天。等待所有任务(未知号码)完成 - ThreadPoolExecutor

基本上,我希望我的ThreadPoolExecutor在关闭之前等待所有任务(任务数量未知)完成。

public class AutoShutdownThreadPoolExecutor extends ThreadPoolExecutor{ 
    private static final Logger logger = Logger.getLogger(AutoShutdownThreadPoolExecutor.class); 
    private int executing = 0; 
    private ReentrantLock lock = new ReentrantLock(); 
    private final Condition newTaskCondition = lock.newCondition(); 
    private final int WAIT_FOR_NEW_TASK = 120000; 

    public AutoShutdownThreadPoolExecutor(int coorPoolSize, int maxPoolSize, long keepAliveTime, 
     TimeUnit seconds, BlockingQueue<Runnable> queue) { 
     super(coorPoolSize, maxPoolSize, keepAliveTime, seconds, queue); 
    } 


    @Override 
    public void execute(Runnable command) { 
     lock.lock(); 
     executing++; 
     lock.unlock(); 
     super.execute(command); 
    } 


    @Override 
    protected void afterExecute(Runnable r, Throwable t) { 
     super.afterExecute(r, t); 
     try{ 
      lock.lock(); 
      int count = executing--; 
      if(count == 0) { 
       newTaskCondition.await(WAIT_FOR_NEW_TASK, TimeUnit.MILLISECONDS); 
       if(count == 0){ 
        this.shutdown(); 
        logger.info("Shutting down Executor..."); 
       } 
      } 
     } 
     catch (InterruptedException e) { 
      logger.error("Sleeping task interrupted", e); 
     } 
     finally{ 
      lock.unlock(); 
     } 
    } 
} 

的目的是,为任务计数器任务检查(执行),如果它等于0,它阻止了一段时间,后来释放其锁,以便其他任务可能要执行的机会,不要太早关闭执行者。

但是,它没有发生。所有4个线程在执行程序进入等待状态:

"pool-1-thread-4" prio=6 tid=0x034a1000 nid=0x2d0 waiting on condition [0x039cf000] 
"pool-1-thread-3" prio=6 tid=0x034d0400 nid=0x1328 waiting on condition [0x0397f000] 
"pool-1-thread-2" prio=6 tid=0x03493400 nid=0x15ec waiting on condition [0x0392f000] 
"pool-1-thread-1" prio=6 tid=0x034c3800 nid=0x1fe4 waiting on condition [0x038df000] 

如果我在运行的类把日志语句(应该减缓线向下),这个问题似乎消失了。

public void run() { 
    // logger.info("Starting task" + taskName); 
     try { 
      //doTask(); 
     } 
     catch (Exception e){ 
      logger.error("task " + taskName + " failed", e); 
     } 
} 

的问题是,类似这样的帖子 Java ExecutorService: awaitTermination of all recursively created tasks

,我都按照原来的海报解决方案,试图解决afterExecute()的比赛条件,但它不工作。

请帮忙解释一下。 谢谢。

+0

使用CountDownLatch怎么样? – 2012-03-30 03:10:57

+0

任务数量事先未知。该程序将扫描其子文件夹中所有文件的目录,并在每个文件上执行一些编号处理任务。 – TommyQ 2012-03-30 03:15:03

回答

1

你有你的任务等待这newTaskCondition,但没有任何信号表明这种情况。所以你的线程都堆积如山,等待newTaskCondition,直到它超时。此外,阻止afterExecute将延迟完成任务,因此可能不是您想要做的事情。

如果您想让它稍等一会儿以查看是否有更多工作进入,则此功能已存在。只需调用setKeepAliveTime来设置等待的时间,并设置allowCoreThreadTimeOut以确保所有线程(包括核心线程)都可以终止。

+0

这就是意图。检查是否没有更多的任务 - >等待并释放锁定一段时间,(如果有新任务,他们应该拿起这个锁) - >唤醒并重新获得锁 - >再次检查,如果数量任务仍然是0 - >如果是,则关闭执行程序,如果不是,则退出任务。在这种情况下,没有必要指示线程唤醒,我猜? – TommyQ 2012-03-30 03:24:29

+0

@TommyQ,这就是发生了什么 - 一个任务运行,它看到没有更多的线程,它开始等待120秒。重复四次,现在你有四个等待任务。除非定时器到期,否则什么都不会唤醒它们,如果因为忙于等待120秒而耗尽线程,那太糟糕了。内置Keepalive计时器有什么问题? – bdonlan 2012-03-30 03:30:50

+0

setKeepAliveTime只适用于多余的线程?我设置了corePoolsize = maxPoolsize; – TommyQ 2012-03-30 03:41:32

0

我发现了一个非常简单的解决方案来解决我的问题。

尽管预先未知任务的数量,并且打算让主线程等待threadPool中的所有任务完成,但已知所有任务的提交时间(在查找并提交后-task方法已经扫描了所有文件)。

我可以简单地调用ThreadPoolExecutor.shutdown()和awaitTermination(Long.MAX_VALUE),以便主线程将无限期地等待ThreadPool完成其任务。