2011-04-25 51 views
0

我有一个队列与多个生产者 - 一个消费者。消费者定期运行并完全排空队列(之后没有消息)。 一个优雅的算法应该运行消费者,并等待它超时或只是等待消费者已经运行。优雅的方式与超时消费者优雅地停止排队

目前我们有水木清华这样的:

void stop(boolean graceful) { 
     if (graceful && !checkAndStopDirectly()) { 
      executor.shutdown(); 
      try { 
       if (!executor.awaitTermination(shutdownWaitInterval, shutdownWaitIntervalUnit)) { 
        log.warn("..."); 
       } 
      } catch (InterruptedException e) { 
       log.error("...", e); 
      } 
     } else { 
      executor.shutdownNow(); 
     } 

private boolean checkAndStopDirectly() { 
    ExecutorService shutdownExecutor = Executors.newSingleThreadExecutor(); 
    try { 
     return shutdownExecutor.submit(new Callable<Boolean>(){ 
      @Override 
      public Boolean call() throws Exception { 
       if (isAlreadyRan.compareAndSet(false, true)) { 
        try { 
         runnableDrainTask.run(); 
        } finally { 
         isAlreadyRan.set(false); 
        } 
        return true; 
       } else { 
        return false; 
       } 
      } 
     }).get(shutdownWaitInterval, shutdownWaitIntervalUnit); 

有谁看到更优雅的方式来做到这一点? 例如我正在寻找一种方式W/O使用额外的AtomicBoolean(isAlreadyRan)或双等待逻辑与时间间隔作为对象字段等 顺便说一句,毒药模式来到我的脑海里...

+0

您能否更清楚您的要求?应该等什么?一个客户端到stop()调用?它应该等待什么,队列流失?你是否想要一个电话stop()来阻塞,直到这个东西被排空,不管是什么? – Toby 2011-09-06 13:36:53

回答

1

你在这里谈论正常关闭你的应用程序应该做以下事情?

  1. 等待队列排出,或
  2. 超时,如果排水时间太长

我可能需要了解您如何排排队,但如果你能做到这一点以可中断的方式,您可以在得到Future时超时,并尝试shutdownNow(如果没有完全耗尽则中断),无论如何;这对你看起来如何?

ExecutorService pool = Executors.newSingleThreadExecutor(); 

public void stop() { 
    try { 
     pool.submit(new DrainTask()).get(100, MILLISECONDS); 
    } catch (TimeoutException e) { 
     // nada, the timeout indicates the queue hasn't drained yet 
    } catch (InterruptedException e) { 
     Thread.currentThread().interrupt(); 
    } catch (Exception e) { 
     // nada, defer to finally 
    } finally { 
     pool.shutdownNow(); 
    } 
} 

private class DrainTask implements Callable<Void> { 
    @Override 
    public Void call() throws Exception { 
     while (!Thread.currentThread().isInterrupted()) 
      ; // drain away 
     return null; 
    } 
} 

是用来防止多个并发调用stopcompareAndSet一部分?我想我喜欢一般锁或者使用​​。但是,如果发生冲突,将会抛出ExecutionException,并重复拨打shutdownNow即可。

这种依赖于DrainTask能够阻止它响应中断调用的功能(因为shutdownNow将尝试在任何当前正在运行的线程上调用中断)。