2009-02-11 74 views
37

首先,我必须说我对API java.util.concurrent很陌生,所以也许我所做的是完全错误的。这是使用java.util.concurrent.FutureTask的好方法吗?

我想要做什么?

我有一个基本上运行2分别处理(称为myFirstProcessmySecondProcess)的Java应用程序,但这些处理必须在同一时间运行。

所以,我想这样做:

public void startMyApplication() { 
    ExecutorService executor = Executors.newFixedThreadPool(2); 
    FutureTask<Object> futureOne = new FutureTask<Object>(myFirstProcess); 
    FutureTask<Object> futureTwo = new FutureTask<Object>(mySecondProcess); 
    executor.execute(futureOne); 
    executor.execute(futureTwo); 
    while (!(futureOne.isDone() && futureTwo.isDone())) { 
     try { 
      // I wait until both processes are finished. 
      Thread.sleep(1000); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 
    logger.info("Processing finished"); 
    executor.shutdown(); 
    // Do some processing on results 
    ... 
} 

myFirstProcessmySecondProcess是实现Callable<Object>类,并在所有的处理是在call()方法制成。

它工作得很好,但我不确定这是否正确。 是做我想做的事的好方法吗?如果没有,你可以给我一些提示,以加强我的代码(并尽可能保持简单)。

+3

你的条件是缺少一个闭括号:) – 2009-02-11 11:32:35

回答

43

您最好使用get()方法。

futureOne.get(); 
futureTwo.get(); 

这两者的等待线程,它完成处理的通知,这样可以节省你的忙 - 等待 - 与 - 计时器您现在使用效率不高也不优雅。

作为奖励,您拥有API get(long timeout, TimeUnit unit),它允许您定义线程休眠和等待响应的最长时间,否则将继续运行。

查看Java API了解更多信息。

+0

因此,而不是Callable他应该实现未来或他应该实现呢? – 2009-05-19 13:08:03

+1

对不起,我被你的链接弄糊涂了。 FutureTask有.get()方法。 – 2009-05-19 13:09:21

+1

有关使用CompletionService的答案比这个更好。 – 2015-01-24 20:11:35

5

如果您有兴趣同时启动线程,或者等待它们完成并进行一些进一步处理,则可能需要使用CyclicBarrier。 查看javadoc了解更多信息。

17

Yuval的解决方案很好。作为替代,您也可以这样做:

ExecutorService executor = Executors.newFixedThreadPool(); 
FutureTask<Object> futureOne = new FutureTask<Object>(myFirstProcess); 
FutureTask<Object> futureTwo = new FutureTask<Object>(mySecondProcess); 
executor.execute(futureOne); 
executor.execute(futureTwo); 
executor.shutdown(); 
try { 
    executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); 
} catch (InterruptedException e) { 
    // interrupted 
} 

这种方法的优点是什么?除了这种方式,你可以阻止执行者接受任何更多的任务(你也可以这样做),除此之外没有什么区别。虽然我倾向于喜欢这个成语。另外,如果get()抛出一个异常,你最终可能会在你的代码的一部分中假设这两个任务都完成了,这可能是不好的。

18

以上FutureTask的使用是可以忍受的,但绝对不是惯用的。您实际上正在将您提交给ExecutorService的那件包裹额外FutureTask。您的FutureTaskExecutorService视为Runnable。在内部,它将FutureTask-as- Runnable包装在新的FutureTask中,并将其作为Future<?>返回给您。

相反,您应该将Callable<Object>实例提交到CompletionService。您通过submit(Callable<V>)放下两个Callable,然后转身并拨打CompletionService#take()两次(每次提交一次Callable)。这些电话会阻塞,直到其中一个提交的任务完成。

鉴于您手头已有Executor,请围绕它构建一个新的ExecutorCompletionService并将您的任务放在那里。不要旋转和睡觉等待; CompletionService#take()将会阻塞,直到任一项任务完成(无论是完成运行还是取消)或等待take()的线程中断。

7

可以使用的invokeAll(Colelction ....)方法

package concurrent.threadPool; 

import java.util.Arrays; 
import java.util.List; 
import java.util.concurrent.Callable; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.Future; 

public class InvokeAll { 

    public static void main(String[] args) throws Exception { 
     ExecutorService service = Executors.newFixedThreadPool(5); 
     List<Future<java.lang.String>> futureList = service.invokeAll(Arrays.asList(new Task1<String>(),new Task2<String>())); 

     System.out.println(futureList.get(1).get()); 
     System.out.println(futureList.get(0).get()); 
    } 

    private static class Task1<String> implements Callable<String>{ 

     @Override 
     public String call() throws Exception { 
      Thread.sleep(1000 * 10); 
      return (String) "1000 * 5"; 
     } 

    } 

    private static class Task2<String> implements Callable<String>{ 

     @Override 
     public String call() throws Exception { 
      Thread.sleep(1000 * 2); 
      int i=3; 
      if(i==3) 
       throw new RuntimeException("Its Wrong"); 
      return (String) "1000 * 2"; 
     } 

    } 
} 
2

如果您futureTasks是超过2,请考虑[ListenableFuture][1]

当几个操作应尽快开始为另一个操作 开始 - “扇出” - ListenableFuture只是工作:它触发所有的 请求的回调。随着稍微更多的工作,我们可以“扇入式”或 触发ListenableFuture在其他几个 期货全部完成后立即计算。

相关问题