2010-08-05 210 views
13

我正在尝试使用ThreadPoolExecutor来计划任务,但遇到了一些其策略问题。这是它的行为:ThreadPoolExecutor策略

  1. 如果少于corePoolSize线程正在运行,执行程序总是喜欢添加一个新的线程,而不是排队。
  2. 如果corePoolSize或更多的线程正在运行,执行程序总是优先排队请求而不是添加新线程。
  3. 如果请求无法排队,则会创建一个新线程,除非该值超过maximumPoolSize,在这种情况下,该任务将被拒绝。

我想要的行为是这样的:

  1. 同上
  2. 如果不是maximumPoolSize线程多于corePoolSize但不运行的,更喜欢加入了排队一个新的线程,并使用空闲线程通过添加一个新的线程。
  3. 同上

基本上我不希望有任何的任务被拒绝;我希望他们排在一个无限的队列中。但我确实需要达到maximumPoolSize线程。如果我使用无界队列,它在命中coreSize之后永远不会生成线程。如果我使用有界队列,它会拒绝任务。有没有办法解决?

我现在想的是在SynchronousQueue上运行ThreadPoolExecutor,但不是直接向它提供任务 - 而是将它们馈送给单独的无界LinkedBlockingQueue。然后另一个线程从LinkedBlockingQueue馈入Executor,如果有人被拒绝,它会再次尝试,直到它不被拒绝。这看起来像是一种痛苦和一点点黑客攻击 - 是否有更简单的方法来做到这一点?

回答

1

您的用例很常见,完全合法,不幸的是比预期的更困难。有关背景信息,您可以阅读this discussion并找到一个指向解决方案的指针(也在线程中提到)here。谢伊的解决方案工作正常。

一般来说我会对无边界的队列有点警惕;明确的流入流量控制通常会更好地降低温度,并调整当前/剩余工作的比率,而不会压倒生产者或消费者。

1

只需设置corePoolsize = maximumPoolSize并使用无限队列?

在您的积分列表中,1不包括2,因为corePoolSize将始终小于或等于maximumPoolSize

编辑

还有什么你想要什么,什么TPE将为您提供之间不兼容。

如果您拥有无限队列,则会忽略maximumPoolSize,因此,如您所见,不会创建和使用多于corePoolSize的线程。

所以,再一次,如果你带着无边的队列corePoolsize = maximumPoolSize,你有你想要的,不是吗?

+0

哎呀,我写的并不完全是我想要的。我编辑了原文。 – 2010-08-05 21:48:28

+0

设置corePoolsize = maximumPoolSize确实接近,但我也使用allowCoreThreadTimeOut(false)和prestartAllCoreThreads()。 – 2010-08-05 22:21:42

4

这可能是没有必要进行微观管理的线程池被请求。

缓存的线程池将重新使用空闲线程,同时还允许潜在无限制的并发线程。这当然会导致在突发期间从上下文切换开销降低的失控性能。

Executors.newCachedThreadPool(); 

更好的选择是对线程总数设置限制,同时放弃确保首先使用空闲线程的概念。配置变化将是:

corePoolSize = maximumPoolSize = N; 
allowCoreThreadTimeOut(true); 
setKeepAliveTime(aReasonableTimeDuration, TimeUnit.SECONDS); 

推理在这种情况下,如果执行具有小于corePoolSize线程,比它不能是非常繁忙。如果系统不是很忙,那么在启动一个新线程时几乎没有什么坏处。这样做会导致您的ThreadPoolExecutor总是创建一个新的工人,如果它在允许的最大工人数以下。只有当最大数量的工人“跑步”时,才会有空闲地等待任务的工人被赋予任务。如果工作人员没有任务等待aReasonableTimeDuration,则允许终止。使用池大小的合理限制(毕竟只有很多CPU)和相当大的超时(以防止线程不必要地终止),可能会看到期望的好处。

最后的选择是hackish。基本上,ThreadPoolExecutor内部使用BlockingQueue.offer来确定队列是否有容量。自定义实现BlockingQueue可能会始终拒绝offer尝试。当ThreadPoolExecutor未能通过offer任务到队列中时,它会尝试创建一个新的工作者。如果无法创建新工作人员,则会调用RejectedExecutionHandler。此时,自定义RejectedExecutionHandler可能会强制put进入自定义BlockingQueue

/** Hackish BlockingQueue Implementation tightly coupled to ThreadPoolexecutor implementation details. */ 
class ThreadPoolHackyBlockingQueue<T> implements BlockingQueue<T>, RejectedExecutionHandler { 
    BlockingQueue<T> delegate; 

    public boolean offer(T item) { 
     return false; 
    } 

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 
     delegate.put(r); 
    } 

    //.... delegate methods 
}