2015-09-09 138 views
2

我有一种方法,使用ThreadPoolExecuter创建一些文件,然后压缩创建的文件。完成ThreadPoolExecuter的所有任务后运行代码

private void createAndZip(){ 
    // Some Code 
    ThreadPoolExecutor executer = (ThreadPoolExecutor) Executors.newFixedThreadPool(5); 
    for(String str : someStringList){ 
     // This piece of code creates files and drops to certain location. 
     executer.execute(new MyRunnable()); 
    } 
    executer.shutdown(); 
    // Code to Zip the files created above. 
} 

现在我创建zip文件的一段代码甚至在创建所有文件之前运行,因此并非所有文件都被压缩。

请帮忙。我尝试了睡眠,但无法保证文件创建需要多长时间。

+2

你可以使用的五个线程倒计时锁或多项任务,并在任务结束时,在每个线程主线程和倒计时锁等待这么countdownlatch达到0之后它会继续。 – virendrao

回答

2

您需要调用执行器对象上的awaitTermination,以等待执行器完成关闭。

+0

不要忘记检查返回值 –

0
private void createAndZip() throws Exception{ 

    // Some Code 
    ThreadPoolExecutor executer = (ThreadPoolExecutor) Executors.newFixedThreadPool(5); 

    for(String str : someStringList){ 
     // This piece of code creates files and drops to certain location. 
     executer.execute(new MyRunnable()); 
    } 
    executer.shutdown(); 

    while (true) { 
      boolean result_ = threadPoolExecutor.awaitTermination(TimeUnit.DAYS, 1); 

      if(result_) 
       break; 
     } 
    // Code to Zip the files created above. 

    //Code here. 
} 
+1

'while(foo){break; ''没有意义。这实际上相当于直接运行'foo;'。 –

+0

@ ChrisJester-Young现在你可以检查逻辑 –

+0

你可以刚刚说'while(!foo){}'。 ;-) –

0

我用CountDownLatch解决了这个问题。这里是示例代码。

private void createAndZip() throws Exception{ 
    CountDownLatch latch = new CountDownLatch(someStringList.size()); 
    // Some Code 
    ThreadPoolExecutor executer = (ThreadPoolExecutor) Executors.newFixedThreadPool(5); 

    for(String str : someStringList){ 
     // This piece of code creates files and drops to certain location. 
     executer.execute(new MyRunnable(latch)); 
    } 
    executer.shutdown(); 


    // Code to Zip the files created above. 
    try { 
     latch.await(); 
    } catch (InterruptedException exception) { 
     throw new GIException(exception); 
    } 

    //Code here. 
} 

public class MyRunnable implements Runnable{ 
    CountDownLatch latch = null; 

    MyRunnable(CountDownLatch latch){ 
     this.latch = latch; 
    } 

    @Override 
    public void run() { 
     try { 
      // Some Logic 
      latch.countDown(); 
      } catch (Exception e) { 
       e.printStackTrace(); 
      } 
     } 
    } 
1

在您的代码块中,您正在缩小Executors.newFixedThreadPool(5)的返回范围。你有一个选择是使用它返回的ExecutorService。这个类已经有了避免重新实现锁存器等同步代码的功能。例如:

使用期货

private void createAndZip(ExecutorService executor) throws ExecutionException, InterruptedException { 
    // Some Code 
    List<String> list = new ArrayList<>(); 
    // For a number of reasons ExecutorService should be constructed outside 
    // ExecutorService executer = Executors.newFixedThreadPool(5); 
    List<Future<?>> futures = new ArrayList<>(); 
    for(String str : list){ 
     // This piece of code creates files and drops to certain location. 
     futures.add(executer.submit(new MyRunnable())); 
    } 
    // async work 
    for (Future<?> future : futures) { 
     future.get(); // blocks 
    } 

    // Code to Zip the files created above. 
} 

这里有一些优势:

  • 错误管理:执行的背景时,如果使用另一种方法,你必须为错误的安排从后台线程传递到您的主线程。在这里,未来会照顾到这一点。如果您的工作人员抛出异常,则会将其返回到您的控制线程。
  • 在代码中保留少量线程池。首先汇集线程的原因是为了降低启动成本。如果您有任何重要大小的程序,那么无论何时想要并行执行操作,都不希望创建并销毁线程池。

使用Java8 Lambda可以以更紧凑的方式编写循环。

叉/加入

也许更适合你的任务,特别是如果你要处理的文件树是Fork/Join framework。在这里,您可以将处理和压缩转换为提交给fork-join池的任务的集合。这很整齐,因为您可以为整个zip文件获取Future,从而使您能够从主线程生成整个zip文件。使用类似的东西来设计叉/加入可能是:

static class PrepareFile extends RecursiveTask<Void> { 

    private String filePath; 

    PrepareFile(String filePath) { 
     this.filePath = filePath; 
    } 

    @Override 
    protected Void compute() { 
     try { 
      System.out.println(filePath); 
      Thread.sleep(1009L); 
     } catch (InterruptedException e) { 
      throw new RuntimeException(e); 
     } 
     return null; // void 
    } 
} 

static class ZipTask extends RecursiveTask<String> 
{ 

    private List<String> files; 

    ZipTask(List<String> files) { 
     this.files = files; 
    } 

    @Override 
    protected String compute() { 
     List<PrepareFile> prepareTasks = new ArrayList<>(); 
     for(String file : files) { 
      PrepareFile fileTask = new PrepareFile(file); 
      prepareTasks.add(fileTask); 
      fileTask.fork(); 
     } 
     for(PrepareFile task : prepareTasks) { 
      task.join(); // can collect results here 
     } 
     System.out.println("Zipping"); 
     try { 
      Thread.sleep(5000); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
     System.out.println("Done task"); 
     return "filename.zip"; 
    } 
} 
public static void main(String[] args) { 
    ForkJoinPool pool = new ForkJoinPool(); 
    List<String> toProcess = Arrays.asList("a","b"); 
    String filename = pool.invoke(new ZipTask(toProcess)); 
    System.out.println("Zipped " + filename); 
} 

这是你想改变一些东西,比如返回类型的任务以及如何在任务也许调用的说明。

在awaitTermination

有可能调用shutdown等待所有进程终止后仍继续使用awaitTermination方法。然而,对于可能在操作之间共享线程池的较长时间运行的服务或程序而言,这可能不是那么理想。

1

我认为你可以在这里使用Future对象。执行者使用submit()方法而不是调用​​。这应该会为您提交给执行者的每个任务提供一个Future对象。一旦您提交所有任务,只需循环访问您所获得的期货清单,并分别拨打get()即可。这是一个阻塞呼叫,并等待相应的任务完成。

这里的好处是你可以检索任务抛出的异常,然后决定是否压缩文件。

请参阅此码 -

private void createAndZip() throws Exception { 
    // Some Code 
    ThreadPoolExecutor executer = (ThreadPoolExecutor) Executors.newFixedThreadPool(5); 
    // collect all futures 
    List<Future> futures = new ArrayList<>(); 
    for(String str : someStringList){ 
     // This piece of code creates files and drops to certain location. 
     futures.add(executer.submit(new MyRunnable())); 
    } 
    // wait for all tasks to finish 
    try { 
     for (Future future : futures) { 
      future.get(); 
     } 
    } catch (Exception e) { 
     e.printStackTrace(); 
     if (e instanceof ExecutionException) { 
      throw e; 
     } 
    } finally { 
     executer.shutdown(); 
    } 
    // Code to Zip the files created above. 
}