2012-07-09 30 views
2

我有一个任务处理需要抛出IOException的文件目录,如果出现任何问题。我也需要它更快,所以我将工作分解成多个线程并等待终止。它看起来是这样的:创建自我中断执行程序服务

//Needs to throw IOException so the rest of the framework handles it properly. 
public void process(File directory) throws IOException { 
    ExecutorService executorService = 
     new ThreadPoolExecutor(16, 16, Long.MAX_VALUE, TimeUnit.NANOSECONDS, 
      new LinkedBlockingQueue<Runnable>()); 

    //Convenience class to walk over relevant file types. 
    Source source = new SourceImpl(directory); 
    while (source.hasNext()) { 
     File file = source.next(); 
     executorService.execute(new Worker(file)); 
    } 

    try { 
     executorService.shutdown(); 
     executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); 
    } catch (InterruptedException e) { 
     executorService.shutdownNow(); 
     throw new IOException("Worker thread had a problem!"); 
    } 
} 

虽然工作者线程基本上是:

private class Worker implements Runnable { 
    private final File file; 
    public Worker(File file) { this.file = file; } 

    @Override 
    public void run() { 
     try { 
      //Do work 
     } catch (IOException e) { 
      Thread.currentThread().interrupt(); 
     } 
    } 
} 

所需的行为方式是,如果任何工人有IOException,则产卵线由意识到这一点,并能在转身抛出自己的IOException。这是我想到的允许Worker线程发出错误信号的最佳方式,但我仍然不确定是否将其设置正确。

所以,首先,这会做我所期望的吗?如果一个Worker线程在run()中有一个错误,将会调用Thread.currentThread().interrupt();导致InterruptedException被抛出,使得它被阻塞executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);捕获?其次,如果一个正在运行的Worker在所有线程排队之前调用它的中断会发生什么;在阻止try/catch块之前?

最后(也是最重要的),有没有更优雅的方式来实现我的目标?我想让所有无数的子线程都执行直到完成或直到其中任何一个有错误,此时我想在生成的线程中处理它(通过有效地使整个目录失效)。


基于答案SOLUTION

,下面是我最终使用的实施。它很好地处理了我的异步期望,并在IOExceptions上干净而快速地失败。

public void process(File directory) throws IOException { 
    //Set up a thread pool of 16 to do work. 
    ExecutorService executorService = Executors.newFixedThreadPool(16); 
    //Arbitrary file source. 
    Source source = new SourceImpl(directory); 
    //List to hold references to all worker threads. 
    ArrayList<Callable<IOException>> filesToWork = 
     new ArrayList<Callable<IOException>>(); 
    //Service to manage the running of the threads. 
    ExecutorCompletionService<IOException> ecs = 
     new ExecutorCompletionService<IOException>(executorService); 

    //Queue up all of the file worker threads. 
    while (source.hasNext()) 
     filesToWork.add(new Worker(file)); 

    //Store the potential results of each worker thread. 
    int n = filesToWork.size(); 
    ArrayList<Future<IOException>> futures = 
     new ArrayList<Future<IOException>>(n); 

    //Prepare to return an arbitrary worker's exception. 
    IOException exception = null; 
    try { 
     //Add all workers to the ECS and Future collection. 
     for (Callable<IOException> callable : filesToWork) 
      futures.add(ecs.submit(callable)); 
     for (int i = 0; i < n; i++) { 
      try { 
       //Get each result as it's available, sometimes blocking. 
       IOException e = ecs.take().get(); 
       //Stop if an exception is returned. 
       if (e != null) { 
        exception = e; 
        break; 
       } 
      //Also catch our own exceptions. 
      } catch (InterruptedException e) { 
       exception = new IOException(e); 
       break; 
      } catch (ExecutionException e) { 
       exception = new IOException(e); 
       break; 
      } 
     } 
    } finally { 
     //Stop any pending tasks if we broke early. 
     for (Future<IOException> f : futures) 
      f.cancel(true); 
     //And kill all of the threads. 
     executorService.shutdownNow(); 
    } 

    //If anything went wrong, it was preserved. Throw it now. 
    if (exception != null) 
     throw exception; 
} 

而且

//Does work, and returns (not throws) an IOException object on error. 
private class Worker implements Callable<IOException> { 
    private final File file; 
    public Worker(File file) { this.file = file; } 

    @Override 
    public IOException call() { 
     try { 
      //Do work 
     } catch (IOException e) { 
      return e; 
     } 
     return null; 
    } 
} 
+1

Thread.currentThread()中断(); 只中断当前正在运行的线程,在这种情况下是工作人员本身。所有其他正在运行的线程都不受此中断的影响。 – 2012-07-09 19:40:34

+0

而且您不能依赖InterruptedException,因为它可能会或可能不会被引发。即使它被抛出,也会丢失原始IOException的上下文。 – 2012-07-09 19:47:38

+0

我有一种感觉InterruptedException是不可靠的,虽然我不会关心最初的IOException的上下文以及它发生的事实。但是,杰塔尔博恩已经指出了我没有意识到我正在寻找的方向。 – 2012-07-09 20:46:16

回答

2

调用interrupt()那样会不是影响主线程。

你应该做的是让你的工作人员Callable而不是Runnable,并允许失败例外离开call()方法。然后,使用ExecutorCompletionService执行你所有的工人。这将允许您确定每个任务的状态,并在其中一个任务失败时在主线程中执行操作。

+0

谢谢。我以前不知道CompletionServices,但他们似乎正是我想要做的;当然比我想要做的更优雅。 – 2012-07-09 20:44:56

1

与往常一样,线程之间最好的沟通是队列。让每个工作人员发送一条描述其执行完成情况的消息,并让产生的线程从队列中读取。此外,由于产卵线程知道它产生了多少工人,因此只需计算消息即可知道所有工人何时完成,而不依赖于池关闭。

1

制作工人实施Callable<Void>,你可以这样做:

public void process(File directory) throws IOException { 
    ExecutorService executorService = new ThreadPoolExecutor(16, 16, 
      Long.MAX_VALUE, TimeUnit.NANOSECONDS, 
      new LinkedBlockingQueue<Runnable>()); 

    // Convenience class to walk over relevant file types. 
    List<Future<Void>> futures = new ArrayList<Future<Void>>(); 
    Source source = new SourceImpl(directory); 
    while (source.hasNext()) { 
     File file = source.next(); 
     futures.add(executorService.submit(new Worker(file))); 
    } 

    try { 
     for (Future<Void> future : futures) { 
      future.get(); 
     } 
    } catch (ExecutionException e) { 
     throw new IOException("Worker thread had a problem!", e.getCause()); 
    } catch (InterruptedException e) { 
     throw new IOException("Worker thread had a problem!", e); 
    } finally { 
     executorService.shutdown(); 
    } 
}