2012-02-02 60 views
4

我希望有一个线程池,它提供了最大X线程的处理任务,到目前为止,没有任何问题。然而,每个提交的任务都可以指定具体限制的IO目标(如Y)。线程池有多个限制

所以提交的IOTask返回目标“google.com”与限4(Y)和池中有一个全球性的极限16(X)。我想提交10个google.com-任务,其中只有4个并行处理,并且该池有12个线程可用于其他任务。

我该如何做到这一点?

回答

2

您可以在自定义类包装2个ExecutorService的实例和人工管理任务提交如下:

class ExecutorWrapper { 

    private ExecutorService ioExec = Executors.newFixedThreadPool(4); 
    private ExecutorService genExec = Executors.newFixedThreadPool(12); 

    public Future<?> submit(final IOTask task) { 
     return ioExec.submit(task); 
    } 

    public Future<?> submit(final Runnable task) { 
     return genExec.submit(task); 
    } 
} 

interface IOTask extends Runnable {} 

这允许您使用4个线程执行IO操作和叶等12个线程服务其他任务。

+0

这就是我基本诉诸的。 – hotzen 2012-08-24 23:59:46

1

嗯...我怕离开ExecutorService不允许这种细粒度的控制。你可能需要要么扩展ExecutorService类将自己加入这个功能,或者使用两个单独的固定线程池,一个为4的容量,其他与12

3

能力实现这个功能并不简单,因为您需要为每个目标分别设置队列(等待代码变得复杂得多),或者从一个队列跳过容量较大的目标(导致性能开销)。您可以尝试扩展ExecutorService来实现此目的,但扩展似乎不重要。

更新答案/解决方案:

思考这个一点点后,到阻塞问题最简单的解决方案是同时具有阻塞队列(按照正常),以及地图队列(一个每个目标的队列数,以及每个目标的可用线程数)。队列映射仅用于从常规阻塞队列中获取任务之后已传递执行的任务(由于已针对该目标运行的线程过多)。

所以执行流程是这样的:

  1. 任务是通过调用代码(具体目标)提交。
  2. 任务被放置到阻塞队列(可能由包括目标信息,自己的任务类这里,裹)。
  3. 螺纹(从线程池)正在等待阻塞队列(通过取())。
  4. 线程接受提交的任务。
  5. 线程在锁上同步。
  6. 线程检查该目标的可用计数。
  7. 如果可用计数> 0

    • 那么线程减1计数,释放锁,运行任务。
    • 否则线程将任务放入目标到任务队列的映射中(该映射是通过任务映射),释放锁定并返回到等待阻塞队列。
  8. 当一个线程完成一个任务,它的执行:

    • 同步的锁。
    • 检查它刚刚执行的目标的计数。
    • 如果计数== 0
      • 然后检查传递的任务映射对于此目标的任何任务(如果存在),然后释放锁定并运行它。
    • 如果count不是0或者没有任何同一个目标的任务位于传递的映射/队列上,则增加可用计数(对于该目标),释放该锁并返回等待阻塞队列。

该解决方案避免了任何显著的性能开销或有一个单独的线程只是为了管理队列。

+0

感谢一堆,将不得不考虑它。 – hotzen 2012-02-06 22:32:39

1

对于这一点的想法可能是延长的ExecutorService和你们班有两个线程池,一个为4的能力和其他与12

容量然后实现你需要的方法和基于在提交的IOTasks上,您可以将任务分配给您希望访问的池。

1

使用计数器计算总线程数和HashMap,它计算当前尝试访问站点X的线程数。当您想要启动一个新线程时,调用一个同步方法来检查等待(wait循环内的wait)直到哈希映射中的线程数小于4,并且线程总数小于16.然后增加两个计数器并启动线程。当线程结束时,它应该调用第二个同步方法,递减计数器并调用notify()

+0

但是这意味着,尝试启动新线程的操作本身会被wait/notify阻塞,直到线程可用。我想尽量减少使用的线程数 – hotzen 2012-02-03 14:30:11

+0

是的,你是对的。这是一个不好的答案。如果我可以 – dspyz 2012-02-04 16:19:05

+0

感谢思考我的问题,我会降低它的投票;) – hotzen 2012-02-04 21:04:16

2

以更具体的方式思考一些答案。

  1. 您需要自己的BlockingQueue,它可以分离不同类型的任务,并根据内部计数器返回所需的Runnable。

  2. 扩展ThreadPoolExecutor并实现beforeExecute和afterExecute。

当beforeExecute被称为将递增队列内的计数器,如果了Runnable是X类型的当afterExecute被称为将是递减计数器。

在你的队列中,你会根据计数器的值返回适当的Runnable,我相信take方法就是你要做的地方。

这里有一些同步问题必须彻底考虑,以确保计数器永远不会超过4.不幸的是,一旦你在beforeExecute中,它已经太晚了,但能够简单地知道有多少任务在运行给定的时间可能会让你开始。