2013-08-06 84 views
1

熊与我同在,我不是在多线程编程非常精明......Java的线程池和可运行在创建可运行

我目前正在建设的是使用一个线程池的ExecutorService各种可运行的系统。这很简单。然而,我正在研究让runnables本身产生一个额外的runnable的可能性,这个runnable基于原始runnable中发生的事情(例如,如果成功,做到这一点,如果失败,做到这一点等等,因为有些任务必须先于其他任务完成执行)。应该注意的是,主线程不需要被通知这些任务的结果,尽管它可能对于处理异常很方便,即如果不能联系外部服务并且所有线程都因此抛出异常,则停止提交任务并定期检查外部服务,直到恢复。这并非完全必要,但它会很好。

即,提交任务A.任务A做一些事情。如果一切顺利,任务A将执行任务B.如果某些事情不能正常工作或引发异常,请执行任务C.每个子任务也可能有其他任务,但只有几个级别深。我宁愿在一个任务中做这样的事情,而不是大型的,咆哮的条件,因为这种方法允许更大的灵活性。

但是,我不确定这将如何影响线程池。我假设从池中的一个线程内创建的任何附加线程都将存在池外,因为它们本身并未直接提交到池中。这是一个正确的假设吗?如果是这样,这可能是一个坏主意(好吧,如果不是,它可能不是一个好主意),因为它可能会导致更多的线程,因为原来的线程完成,并提交一个新的任务,而线程从先前的任务仍然在进行(并且可能会持续比其他任务长得多)。

我也考虑过将这些实现为Callables,并在返回的Future中放置响应对象,然后根据响应将相应的Callable添加到线程池中。但是,这会将所有操作都绑定到主线程,这似乎是不必要的瓶颈。我想我可以将一个Runnable放入池中,该池本身处理Callable和后续动作的执行,但是然后我得到两倍的线程数。

我在这里的正确轨道上,还是我完全脱轨?

回答

0

有很多方法可以做你想做的。你需要小心,不要创建太多的线程。

以下是一个示例,您可以使用ExecutorCompletionService更高效,也可以使用Runnable's。

import java.util.ArrayList; 
import java.util.List; 
import java.util.Random; 
import java.util.concurrent.Callable; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.Future; 


public class ThreadsMakeThreads { 

    public static void main(String[] args) { 
     new ThreadsMakeThreads().start(); 
    } 

    public void start() { 
     //Create resources 
     ExecutorService threadPool = Executors.newCachedThreadPool(); 
     Random random = new Random(System.currentTimeMillis()); 
     int numberOfThreads = 5; 

     //Prepare threads 
     ArrayList<Leader> leaders = new ArrayList<Leader>(); 
     for(int i=0; i < numberOfThreads; i++) { 
      leaders.add(new Leader(threadPool, random)); 
     } 

     //Get the results 
     try { 
      List<Future<Integer>> results = threadPool.invokeAll(leaders); 
      for(Future<Integer> result : results) { 
       System.out.println("Result is " + result.get()); 
      } 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } catch (ExecutionException e) { 
      e.printStackTrace(); 
     } 

     threadPool.shutdown(); 

    } 

    class Leader implements Callable<Integer> { 

     private ExecutorService threadPool; 
     private Random random; 

     public Leader(ExecutorService threadPool, Random random) { 
      this.threadPool = threadPool; 
      this.random = random; 
     } 

     @Override 
     public Integer call() throws Exception { 
      int numberOfWorkers = random.nextInt(10); 
      ArrayList<Worker> workers = new ArrayList<Worker>(); 
      for(int i=0; i < numberOfWorkers; i++) { 
       workers.add(new Worker(random)); 
      } 
      List<Future<Integer>> tasks = threadPool.invokeAll(workers); 
      int result = 0; 
      for(Future<Integer> task : tasks) { 
       result += task.get(); 
      } 
      return result; 
     } 

    } 

    class Worker implements Callable<Integer> { 

     private Random random; 

     public Worker(Random random) { 
      this.random = random; 
     } 

     @Override 
     public Integer call() throws Exception { 
      return random.nextInt(100); 
     } 

    } 
} 
0

从其他任务提交任务到线程池是非常有意义的想法。但是我担心你会想到在不同的线程上运行新的任务,真的可以吃掉所有的内存。只需在创建池时设置线程数的限制,并将新任务提交到该线程池。

这种方法可以在不同的方向进一步阐述。首先,使用接口方法将任务视为普通对象,并让该方法决定是否要将此对象提交给线程池。这要求每个任务都知道其线程池 - 在创建时将其作为参数传递。更方便的是,将线程池作为线程局部变量继续引用。

您可以轻松地模拟函数式编程:对象表示函数调用,并且对于每个参数都有对应的set方法。当所有参数都被设置时,对象被提交给线程池。

另一个方向是角色编程:任务类具有单一方法,但可以多次调用,如果前一个参数尚未处理,set方法不会将任务提交给线程池,而只是存储它的论点在队列中。 run()方法处理来自队列的所有可用参数,然后返回。

所有这些功能都在数据流库https://github.com/rfqu/df4j中实现。我故意写它支持基于任务的并行性。

+0

直接在另一个任务中执行的任务不是一项任务,它只是一种方法。重新说一下你的问题,这样任务对于线程池(Runnable或Callable)来说意味着一件工作,然后,我可能会明白你想要什么。 –