2012-10-19 63 views
3

考虑接受需要很长时间初始化的服务(例如,JDBC连接的参数)的服务的配置设置的用户界面。我们希望我们的用户界面在服务初始化发生时保持响应。如果用户进行其他更改,则应使用新参数取消初始化并重新启动。ExecutorService在提交新任务时取消当前任务

由于参数在用户键入每个字符时被组合在配置中,因此可能会在一行中创建大量初始化请求。只有最后一个应该执行。

我们已经放在一起的代码实现了这个结果,但是看起来这种行为似乎是实现ExecutorService的非常好的候选者。在我们将所有事情重构为ExecutorService之前,我想我会询问世界上是否有类似的实现。

更具体:

的ExecutorService的将有一个工作线程。一旦提交新任务,当前任务就会被取消(并且工作人员中断)。然后为下一次执行捕获新任务。如果提交了另一个任务,则当前任务再次被取消,并且“下一次执行”任务被设置为该新任务。当工作线程最终拿起下一个执行任务时,它将始终是提交的最后一个任务 - 所有其他任务都被取消或丢弃。

有没有人有像他们愿意分享的实现?或者是否有一个涵盖此类行为的标准库?实现起来并不难,但要确定线程的安全性可能会非常棘手,所以我宁愿使用经过验证的代码(如果可以的话)。

回答

3

这里就是我最终想出了 - 我感兴趣的任何意见:

public class InterruptingExecutorService extends ThreadPoolExecutor{ 
    private volatile FutureTask<?> currentFuture; 

    public InterruptingExecutorService(boolean daemon) { 
     super(0, 1, 1000L, TimeUnit.MILLISECONDS, 
       new LinkedBlockingQueue<Runnable>(), 
       daemon ? new DaemonThreadFactory() : Executors.defaultThreadFactory()); 

    } 

    public static class DaemonThreadFactory implements ThreadFactory{ 
     ThreadFactory delegate = Executors.defaultThreadFactory(); 

     @Override 
     public Thread newThread(Runnable r) { 
      Thread t = delegate.newThread(r); 
      t.setDaemon(true); 
      return t; 
     } 

    } 

    private void cancelCurrentFuture(){ 
     // cancel all pending tasks 
     Iterator<Runnable> it = getQueue().iterator(); 
     while(it.hasNext()){ 
      FutureTask<?> task = (FutureTask<?>)it.next(); 
      task.cancel(true); 
      it.remove(); 
     } 

     // cancel the current task 
     FutureTask<?> currentFuture = this.currentFuture; 
     if(currentFuture != null){ 
      currentFuture.cancel(true); 
     } 
    } 

    @Override 
    public void execute(Runnable command) { 
     if (command == null) throw new NullPointerException(); 

     cancelCurrentFuture(); 
     if (!(command instanceof FutureTask)){ // we have to be able to cancel a task, so we have to wrap any non Future 
      command = newTaskFor(command, null); 
     } 
     super.execute(command); 
    } 

    @Override 
    protected void beforeExecute(Thread t, Runnable r) { 
     // it is safe to access currentFuture like this b/c we have limited the # of worker threads to only 1 
     // it isn't possible for currentFuture to be set by any other thread than the one calling this method 
     this.currentFuture = (FutureTask<?>)r; 
    } 

    @Override 
    protected void afterExecute(Runnable r, Throwable t) { 
     // it is safe to access currentFuture like this b/c we have limited the # of worker threads to only 1 
     // it isn't possible for currentFuture to be set by any other thread than the one calling this method 
     this.currentFuture = null; 
    } 
} 
+0

很酷的代码,以更好的方式制作了中断者。从getQueue()的项目可以投到FutureTask是问题的关键 – farmer1992

+0

有趣的是,我发现execute()方法得到了该演员的方式。所以我最后得到了一个覆盖执行的不同实现,检查FutureTask是否被传入,如果没有,则包装它。最终的代码实际上是更清洁的,并且无论ExecutorService如何使用都能正常工作 - 如果我能记得的话,我会在我的代码得到帮助时发布更新。最终结果,BTW,非常光滑 - 各种各样的用例。 –

+0

ok - 更新了代码 –

1

你可能需要一个DiscardOldestPolicy添加到您的遗嘱执行人

http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.DiscardOldestPolicy.html

You will get 
0 submitted 
1 submitted 
2 submitted 
3 submitted 
4 submitted 
5 submitted 
6 submitted 
7 submitted 
8 submitted 
9 submitted 
9 finished 

public static void main(String[] args) throws SecurityException, 
     NoSuchMethodException { 

    final Method interruptWorkers = ThreadPoolExecutor.class 
      .getDeclaredMethod("interruptWorkers"); 
    interruptWorkers.setAccessible(true); 

    ExecutorService executor = new ThreadPoolExecutor(1, 1, 0L, 
      TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), 
      new RejectedExecutionHandler() { 

       @Override 
       public void rejectedExecution(Runnable r, 
         ThreadPoolExecutor executor) { 
        if (!executor.isShutdown()) { 
         try { 

          interruptWorkers.invoke(executor); 
          executor.execute(r); 

         } catch (IllegalArgumentException e) { 
          e.printStackTrace(); 
         } catch (IllegalAccessException e) { 
          e.printStackTrace(); 
         } catch (InvocationTargetException e) { 
          e.printStackTrace(); 
         } 
        } 
       } 
      }); 


    for(int i =0 ;i<10;i++) 
     executor.submit(newTask(i)); 
} 

private static Runnable newTask(final int id) { 
    return new Runnable() { 
     { 

      System.out.println(id + " submitted"); 
     } 

     @Override 
     public void run() { 
      try { 

       Thread.sleep(5000l); 
       System.out.println(id + " finished"); 
      } catch (InterruptedException e) { 
      } 

     } 

    }; 
} 
+0

啊 - 很优雅...不得不打中断工作者的方法有点令人不安,但这确实看起来像是最好的方法。 –

+0

ok - interruptWorkers不是我正在运行的JDK的一部分(Oracle,1.6.0_34),所以这不起作用。关于一种可以使用公共API的方法的其他想法? –

+0

我使用openjdk 1.6.0_24并从源代码获得 – farmer1992