2016-01-21 43 views
1

我有工作接口IJob这样的:在Java中运行多个线程,错误管理

public interface IJob { 
    public void execute(); 
} 

在我的应用程序有多个类实现了这个接口,像IJob1和IJob2:

public class IJob1 implements IJob{ 
@Override 
public void execute(){ 
    System.out.println("IJob1\n"); 
    } 
} 

public class IJob2 implements IJob{ 
@Override 
public void execute(){ 
    System.out.println("IJob2\n"); 
    } 
} 

因为要运行的作业量稳步增加,所以我想创建一个新的作业,它将IJob实例列表并行运行。新实现用于并行运行作业的线程数量应该是可配置的。如果其中一个作业引发异常,则其他所有当前正在运行的作业也应该停止,并且execute()方法应将异常传递给调用者。

我写了这个,但我不能够运行作业和检查,如果有一个错误:

import java.util.LinkedList; 

public class WorkQueue 
{ 
    private final int nThreads; 
    private final IJob[] threads; 
    private final LinkedList queue; 

    public WorkQueue(int nThreads) 
    { 
     this.nThreads = nThreads; 
     queue = new LinkedList(); 
     threads = new IJob[nThreads]; 

     for (int i=0; i<nThreads; i++) { 
      threads[i] = new IJob(); 
      threads[i].execute(); 
     } 
    } 

    public void execute(Runnable r) { 
    synchronized(queue) { 
     queue.addLast(r); 
     queue.notify(); 
    } 
} 

private class PoolWorker extends Thread { 
    public void run() { 
     Runnable r; 

     while (true) { 
      synchronized(queue) { 
       while (queue.isEmpty()) { 
        try 
        { 
         queue.wait(); 
        } 
        catch (InterruptedException ignored) 
        { 
        } 
       } 

       r = (Runnable) queue.removeFirst(); 
      } 

      // If we don't catch RuntimeException, 
      // the pool could leak threads 
      try { 
       r.run(); 
      } 
      catch (RuntimeException e) { 
       // You might want to log something here 
      } 
     } 
    } 
} 
} 

能否请您给我一些帮助很少去? 非常感谢。

+2

看看'ThreadPoolExecutor'。你的工作将是这方面的任务。 – Thomas

+0

它们不是线程,所以它们按顺序执行......创建一个需要扩展'Thread'类的线程 – Marcx

+0

感谢您的帮助。我会去看看这个背景 – sharkbait

回答

5

我会推荐一个托管线程池。典型模式是使用Java Executors来获得ExecutorService。 ExecutorService通常使用线程池和作业队列来实现。在ExecutorService上,有一些方法如shutdownNow()“尝试停止所有正在执行的任务”。这听起来像你想做的事情。

例,

List<Callable<Result>> tasks = new ArrayList<>(); 
for (final Object job: jobs) { 
    final Callable<Result> task = new Callable<Result>() { 
      @Override 
      public Result call() throws Exception { 
       // Do job here 
       return result; 
      } 
     }; 
    tasks.add(task); 
} 
final numOfThreads = 20; 
final ExecutorService executor = Executors.newFixedThreadPool(numOfThreads); 
try { 
    executor.invokeAll(tasks); 
} finally { 
    executor.shutdownNow(); 
} 
+0

太棒了!你能否在我的答案中给我一个例子? – sharkbait

+0

@sharkbait如果你不能使用Java 8,我会使用ExecutorService。 –

+0

我使用Java 7,所以我可以使用它? – sharkbait

4

I want to create a new job, which takes a list of IJob instances and runs them in parallel. The amount of threads that the new implementation is using to run the jobs in parallel should be configurable. If one of the jobs throws an exception, all other currently running jobs should be stopped as well and the execute() method should pass the exception to the caller.

这是使用parallelStream()会让你的生活更加简单。你可以做

list.parallelStream(). 
    .forEach(IJob::execute); 

你可能会发现你根本不需要框架,你可以将这段代码合并到调用者中。例如

Map<String, T> map = dataSet.parallelStream() 
          .map(t -> t.transform1()) 
          .filter(t -> t.isGood()) 
          .collect(Collectors.groupingByConcurrent(t.getKey()));