2011-07-20 102 views
12

我有一个ScheduledExecutorService任务计划在一个小时内执行。我如何获得未完成任务的清单,以便我可以强制他们立即运行?如何在ExecutorService.shutdown()之后立即运行未完成的任务?

我认为shutdown()将等待一个小时,看起来好像shutdownNow()返回无法运行()的Runnable列表,因为Runnable实现检查Executor状态,以及何时发现它已关闭Runnable拒绝运行。有关实际实施,请参见ScheduledThreadPoolExecutor.ScheduledFutureTask.run()

任何想法?

回答

3

我已经采取了马克·彼得斯的答案,实现所有的抽象方法,增加线程安全,并试图尊重底层的ScheduledThreadPoolExecutor配置只要有可能。

/** 
* Overrides shutdown() to run outstanding tasks immediately. 
* 
* @author Gili Tzabari 
*/ 
public class RunOnShutdownScheduledExecutorService extends AbstractExecutorService 
    implements ScheduledExecutorService 
{ 
    private final ScheduledExecutorService delegate; 
    private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; 
    private final ExecutorService immediateService; 
    private final ConcurrentMap<Future<?>, Callable<?>> tasks = Maps.newConcurrentMap(); 

    /** 
    * Creates a new RunOnShutdownScheduledExecutorService. 
    * 
    * @param delegate the executor to delegate to 
    */ 
    public RunOnShutdownScheduledExecutorService(ScheduledExecutorService delegate) 
    { 
     Preconditions.checkNotNull(delegate, "delegate may not be null"); 

     this.delegate = delegate; 
     if (delegate instanceof ScheduledThreadPoolExecutor) 
     { 
      this.scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor) delegate; 
      this.immediateService = Executors.newFixedThreadPool(scheduledThreadPoolExecutor. 
       getCorePoolSize(), scheduledThreadPoolExecutor.getThreadFactory()); 
     } 
     else 
     { 
      scheduledThreadPoolExecutor = null; 
      this.immediateService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder(). 
       setNameFormat(RunOnShutdownScheduledExecutorService.class.getName() + "-%d").build()); 
     } 
    } 

    @Override 
    public boolean isShutdown() 
    { 
     return delegate.isShutdown(); 
    } 

    @Override 
    public boolean isTerminated() 
    { 
     return delegate.isTerminated(); 
    } 

    @Override 
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException 
    { 
     long before = System.nanoTime(); 
     if (!delegate.awaitTermination(timeout, unit)) 
      return false; 
     long after = System.nanoTime(); 
     long timeLeft = timeout - unit.convert(after - before, TimeUnit.NANOSECONDS); 
     return immediateService.awaitTermination(timeLeft, unit); 
    } 

    @Override 
    public void execute(Runnable command) 
    { 
     delegate.execute(command); 
    } 

    @Override 
    public ScheduledFuture<?> schedule(final Runnable command, long delay, TimeUnit unit) 
    { 
     CleaningRunnable decorated = new CleaningRunnable(command); 
     ScheduledFuture<?> future = delegate.schedule(decorated, delay, unit); 
     decorated.setFuture(future); 
     tasks.put(future, Executors.callable(command)); 
     return new CleaningScheduledFuture<>(future); 
    } 

    @Override 
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) 
    { 
     CallableWithFuture<V> decorated = new CallableWithFuture<>(callable); 
     ScheduledFuture<V> future = delegate.schedule(decorated, delay, unit); 
     decorated.setFuture(future); 
     tasks.put(future, callable); 
     return new CleaningScheduledFuture<>(future); 
    } 

    @Override 
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, 
     TimeUnit unit) 
    { 
     CleaningRunnable decorated = new CleaningRunnable(command); 
     ScheduledFuture<?> future = delegate.scheduleAtFixedRate(decorated, initialDelay, period, unit); 
     decorated.setFuture(future); 
     tasks.put(future, Executors.callable(command)); 
     return new CleaningScheduledFuture<>(future); 
    } 

    @Override 
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, 
     TimeUnit unit) 
    { 
     CleaningRunnable decorated = new CleaningRunnable(command); 
     ScheduledFuture<?> future = 
      delegate.scheduleWithFixedDelay(decorated, initialDelay, delay, unit); 
     decorated.setFuture(future); 
     tasks.put(future, Executors.callable(command)); 
     return new CleaningScheduledFuture<>(future); 
    } 

    @Override 
    public synchronized void shutdown() 
    { 
     if (delegate.isShutdown()) 
      return; 
     if (scheduledThreadPoolExecutor != null) 
     { 
      // WORKAROUND: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=7069418 
      // 
      // Cancel waiting scheduled tasks, otherwise executor won't shut down 
      scheduledThreadPoolExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); 
     } 
     delegate.shutdown(); 
     // Users will not be able to cancel() Futures past this point so we're guaranteed that 
     // "tasks" will not be modified. 

     final List<Callable<?>> outstandingTasks = Lists.newArrayList(); 
     for (Map.Entry<Future<?>, Callable<?>> entry: tasks.entrySet()) 
     { 
      Future<?> future = entry.getKey(); 
      Callable<?> task = entry.getValue(); 

      if (future.isDone() && future.isCancelled()) 
      { 
       // Task called by the underlying executor, not the user. See CleaningScheduledFuture. 
       outstandingTasks.add(task); 
      } 
     } 
     tasks.clear(); 
     if (outstandingTasks.isEmpty()) 
     { 
      immediateService.shutdown(); 
      return; 
     } 

     immediateService.submit(new Callable<Void>() 
     { 
      @Override 
      public Void call() throws Exception 
      { 
       delegate.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 

       // Execute outstanding tasks only after the delegate executor finishes shutting down 
       for (Callable<?> task: outstandingTasks) 
        immediateService.submit(task); 
       immediateService.shutdown(); 
       return null; 
      } 
     }); 
    } 

    @Override 
    public List<Runnable> shutdownNow() 
    { 
     return delegate.shutdownNow(); 
    } 

    /** 
    * A Runnable that removes its future when running. 
    */ 
    private class CleaningRunnable implements Runnable 
    { 
     private final Runnable delegate; 
     private Future<?> future; 

     /** 
     * Creates a new RunnableWithFuture. 
     * 
     * @param delegate the Runnable to delegate to 
     * @throws NullPointerException if delegate is null 
     */ 
     public CleaningRunnable(Runnable delegate) 
     { 
      Preconditions.checkNotNull(delegate, "delegate may not be null"); 

      this.delegate = delegate; 
     } 

     /** 
     * Associates a Future with the runnable. 
     * 
     * @param future a future 
     */ 
     public void setFuture(Future<?> future) 
     { 
      this.future = future; 
     } 

     @Override 
     public void run() 
     { 
      tasks.remove(future); 
      delegate.run(); 
     } 
    } 

    /** 
    * A Callable that removes its future when running. 
    */ 
    private class CallableWithFuture<V> implements Callable<V> 
    { 
     private final Callable<V> delegate; 
     private Future<V> future; 

     /** 
     * Creates a new CallableWithFuture. 
     * 
     * @param delegate the Callable to delegate to 
     * @throws NullPointerException if delegate is null 
     */ 
     public CallableWithFuture(Callable<V> delegate) 
     { 
      Preconditions.checkNotNull(delegate, "delegate may not be null"); 

      this.delegate = delegate; 
     } 

     /** 
     * Associates a Future with the runnable. 
     * 
     * @param future a future 
     */ 
     public void setFuture(Future<V> future) 
     { 
      this.future = future; 
     } 

     @Override 
     public V call() throws Exception 
     { 
      tasks.remove(future); 
      return delegate.call(); 
     } 
    } 

    /** 
    * A ScheduledFuture that removes its future when canceling. 
    * 
    * This allows us to differentiate between tasks canceled by the user and the underlying 
    * executor. Tasks canceled by the user are removed from "tasks". 
    * 
    * @param <V> The result type returned by this Future 
    */ 
    private class CleaningScheduledFuture<V> implements ScheduledFuture<V> 
    { 
     private final ScheduledFuture<V> delegate; 

     /** 
     * Creates a new MyScheduledFuture. 
     * 
     * @param delegate the future to delegate to 
     * @throws NullPointerException if delegate is null 
     */ 
     public CleaningScheduledFuture(ScheduledFuture<V> delegate) 
     { 
      Preconditions.checkNotNull(delegate, "delegate may not be null"); 

      this.delegate = delegate; 
     } 

     @Override 
     public long getDelay(TimeUnit unit) 
     { 
      return delegate.getDelay(unit); 
     } 

     @Override 
     public int compareTo(Delayed o) 
     { 
      return delegate.compareTo(o); 
     } 

     @Override 
     public boolean cancel(boolean mayInterruptIfRunning) 
     { 
      boolean result = delegate.cancel(mayInterruptIfRunning); 

      if (result) 
      { 
       // Tasks canceled by users are removed from "tasks" 
       tasks.remove(delegate); 
      } 
      return result; 
     } 

     @Override 
     public boolean isCancelled() 
     { 
      return delegate.isCancelled(); 
     } 

     @Override 
     public boolean isDone() 
     { 
      return delegate.isDone(); 
     } 

     @Override 
     public V get() throws InterruptedException, ExecutionException 
     { 
      return delegate.get(); 
     } 

     @Override 
     public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, 
      TimeoutException 
     { 
      return delegate.get(timeout, unit); 
     } 
    } 
} 
+1

我刚刚在ScheduledThreadPoolExecutor中发现了一个令人讨厌的bug。如果一个工作线程正在等待一个只能在一个小时内执行的任务,并且您取消该任务,则该工作线程将继续等待并且执行程序不会关闭。我提交了一个错误报告:http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=7069418 – Gili

+0

我更新了我的答案,并提供了一个bug#7069418的解决方法 – Gili

0

伟大的问题!看起来你可能会自己修补一个解决方案。

一个选项可能是用您自己的ScheduledExecutorService实现来包装ScheduledThreadPoolExecutor。当需要关闭服务时,取消任何可以取消的任务,而是将它们发送到即时执行它们的服务。然后shutdown()那个服务。

下面是一些非常粗略的代码,说明我的意思,虽然我警告你可能在这里有缺陷,因为它在几分钟内被掀起。特别是,我没有花费很多努力来确保这是线程安全的。

class RunOnShutdownScheduledExecutorService extends AbstractExecutorService implements ScheduledExecutorService { 
    private final ScheduledExecutorService delegateService; 

    private Map<Future<?>, Runnable> scheduledFutures = 
      Collections.synchronizedMap(new IdentityHashMap<Future<?>, Runnable>()); 


    public RunOnShutdownScheduledExecutorService(ScheduledExecutorService delegateService) { 
     this.delegateService = delegateService; 
    } 

    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { 
     ScheduledFuture<?> future = delegateService.schedule(command, delay, unit); 
     scheduledFutures.put(future, command); 
     return future; 
    } 

    public void shutdown() { 
     delegateService.shutdown(); 
     ExecutorService immediateService = Executors.newFixedThreadPool(5); 
     for (Map.Entry<Future<?>, Runnable> entry : scheduledFutures.entrySet()) { 
      Future<?> future = entry.getKey(); 
      Runnable task = entry.getValue(); 
      if (!future.isDone()) { 
       if (future.cancel(false)) { 
        immediateService.submit(task); 
       } 
      } 
     } 
     immediateService.shutdown(); 
    } 

    //... 
} 
+0

另一种方法(由ScheduledThreadPoolExecutor的Javadoc讨论)似乎是覆盖decorateTask()。我相信这可能会导致更简单的实施。 – Gili

+0

@Gili:看起来这是一种可能性。如果看起来可行,我建议你发布一个自己问题的答案。你会建议还是委托给一个直接的执行者,或者你是否认为你可以在关机时修改任务? –

+0

我不认为你可以将'decorateTask()'与委托执行者混合在一起,但我会给它更多的思考。 – Gili

相关问题