2011-03-10 80 views
3

我们遇到了一些问题。 :)将ExecutorService与任务树结合使用

我们希望确保只有N个线程随时都在执行后台任务。为此,我们使用了一个固定的线程池执行器。它似乎工作正常。

然后我们发现了一个问题。假设你有一个类,它使用执行器来做一些并行工作,然后在执行器线程中调用其他一些类,同时也执行一些并行工作,并打算等待它。这里发生了什么:

  • 主线程调用第一级方法。
  • 该方法认为它可以并行化为16个任务并分解其工作。
  • 将16个任务提交给执行者。
  • 主线程开始等待其任务完成。
  • 假设有四个线程可用,前四个任务每个都被拾取并运行。所以队列中剩下12个任务。
  • 现在,其中一个任务调用其他一些方法。
  • 这种新方法认为它可以并行化为2个任务。我们假设这是并行合并排序的第一步,或者沿着这些方向。
  • 2任务提交给执行者。
  • 此线程现在开始等待其任务完成。

嗯。所以在这一点上,所有四个线程现在都将等待任务完成,但他们正在协作阻止执行程序实际运行这些任务。

此问题的解决方案1如下所示:在向执行程序提交新任务时,如果我们已经运行了所有线程,并且已经在其中一个执行程序线程上运行,请以内联方式运行任务。这工作正常10个月,但现在我们遇到了问题。如果它提交的新任务仍然比较大,那么您可以进入新任务阻止该方法将其他任务添加到队列的情况,否则这些任务可以被其他工作线程拾取。因此,当线程正在处理内联工作时,您会遇到巨大延迟。

是否有更好的解决方案来执行潜在的无界限后台任务树的核心问题?我知道.NET等同于执行器服务具有某种内置的能力,可以从队列中窃取,从而防止发生最初的死锁问题,而据我所知,这是一个理想的解决方案。但是在Java的土地上呢?

+0

你们是否解决了你的问题?你在找什么答案? – 2011-03-15 15:24:12

回答

3

Java 7的概念是ForkJoinPool,允许任务通过将任务提交给同一个执行程序来“分叉”另一个任务。然后给它一个选项,稍后尝试通过试图运行它来“帮助加入”该任务,如果它尚未运行。

我相信在Java 6中可以通过简单组合ExecutorFutureTask来完成同样的事情。像这样:

public class Fib implements Callable<Integer> { 
    int n; 
    Executor exec; 

    Fib(final int n, final Executor exec) { 
     this.n = n; 
     this.exec = exec; 
    } 

    /** 
    * {@inheritDoc} 
    */ 
    @Override 
    public Integer call() throws Exception { 
     if (n == 0 || n == 1) { 
      return n; 
     } 

     //Divide the problem 
     final Fib n1 = new Fib(n - 1, exec); 
     final Fib n2 = new Fib(n - 2, exec); 

     //FutureTask only allows run to complete once 
     final FutureTask<Integer> n2Task = new FutureTask<Integer>(n2); 
     //Ask the Executor for help 
     exec.execute(n2Task); 

     //Do half the work ourselves 
     final int partialResult = n1.call(); 

     //Do the other half of the work if the Executor hasn't 
     n2Task.run(); 

     //Return the combined result 
     return partialResult + n2Task.get(); 
    } 

}   
+0

这似乎是要走的路。我不能等待Java 7.ForkJoinPool是*完美*! – Trejkaz 2011-03-29 00:10:44

0

似乎问题在于任务也尝试并行化自己,这使得很难避免资源限制。你为什么需要这样做?为什么不总是内联运行子任务?

如果你已经通过并行化完全利用了CPU,那么你将不会通过将工作再分成更小的任务来完成整体工作。

+0

因为没有在某个级别分裂,OP将只有一个大型任务,只能由一个CPU解决。拆分问题很好,但OP正在尝试通过限制线程数来减少上下文切换,现在问题是大多数线程都无所事事。 – 2011-03-10 01:53:56

+0

当然,当然。我明白,但他没有给出任务的持续时间,以及从该级别的并行化中获得的相对于上下文切换开销的好处。我希望OP能提供更多的背景。 – Jeremy 2011-03-10 03:47:13

+0

Tim的答案是它的一半 - 有时你在顶层分割,然后其中一个顶层任务仍然比其他顶层任务更大(即导致我上面命名的解决方案的问题的那种任务)。其他答案是,有时你称之为“底层”在这里*是“顶层”。 – Trejkaz 2011-03-13 00:15:10

1

您可以使用回调,而不是让线程等待任务完成。你的任务本身需要回调,因为他们提交了更多的任务。

例如为:

public class ParallelTask implements Runnable, Callback { 
    private final Callback mCB; 
    private final int mNumChildTasks; 
    private int mTimesCalledBack = 0; 
    private final Object mLock = new Object(); 
    private boolean mCompleted = false; 
    public ParallelTask(Callback cb) { 
    mCB = cb; 
    mNumChildTasks = N; // the number of direct child tasks you know this task will spawn 
    // only going down 1 generation 
    // of course you could figure this number out in the run method (will need to be volatile if so) 
    // just as long as it is set before submitting any child tasks for execution 
    } 

    @Override 
    public void run() { 
    // do your stuff 
    // and submit your child tasks, but don't wait on them to complete 
    synchronized(mLock) { 
     mCompleted = true; 
     if (mNumChildTasks == mTimesCalledBack) { 
     mCB.taskCompleted(); 
     } 
    } 
    } 

    // Callback interface 
    // taskCompleted is being called from the threads that this task's children are running in 
    @Override 
    public void taskCompleted() { 
    synchronized(mLock) { 
     mTimesCalledBack++; 
     // only call our parent back if our direct children have all called us back 
     // and our own task is done 
     if (mCompleted && mTimesCalledBack == mNumChildTasks) { 
     mCB.taskCompleted(); 
     } 
    } 
    } 
} 

在你的主线程您提交根的任务,注册一些回调执行。

由于所有的子任务都没有报告完成,直到他们的孩子已经完成报告,你的根回调不应该被调用,直到一切完成。

我写了这个动态,并没有测试或编译它,所以可能会有一些错误。

相关问题