2013-07-16 86 views
3

我有期货清单,在完成每个未来时,我有一个应该执行的回调。番石榴期货等待回调

我正在使用Futures.successfulAsList检查是否所有期货都已完成。但是,这并没有考虑到回调的完成。

有没有一种方法可以确保回调完成?

而不是回调,我可以使用Futures.transform包装到另一个未来并检查完成。但是,有了这个,我无法访问在包装未来中抛出的运行时异常。

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(20)); 

List<ListenableFuture<Object>> futures = new ArrayList<>(); 

for (int i = 1; i <= 20; i++) { 
    final int x = i * 100; 

    ListenableFuture<Object> future = service.submit(new Callable() { 
    @Override 
    public Object call() throws Exception { 
     Thread.sleep(10000/x); 

     return x; 
    } 
    }); 

    futures.add(future); 

    Futures.addCallback(future, new FutureCallback<Object>() { 

    @Override 
    public void onFailure(Throwable t) { 
     t.printStackTrace(); 
    } 

    @Override 
    public void onSuccess(Object x) { 
     try {Thread.sleep((Integer) x * 10);} catch (Exception e) {} 

     System.out.println(x); 
    } 
    }); 
} 

ListenableFuture<List<Object>> listFuture = Futures 
    .successfulAsList(futures); 
System.out.println("Waiting..."); 
System.out.println(listFuture.get()); 
System.out.println("Done"); 
+0

这是一个真正伟大的问题,因为它在我看来,它可能是不直接由番石榴API支持的一个相当常见的情况。 –

回答

1

如果您为每个回调创建另一个未来并确保它将在回调中完成,那么该如何处理?

// create "callback" future here 
futures.add(callbackFuture); 

Futures.addCallback(future, new FutureCallback<Object>() { 

    @Override 
    public void onFailure(Throwable t) { 
    t.printStackTrace(); 
    // do something with callbackFuture 
    } 

    @Override 
    public void onSuccess(Object x) { 
    try {Thread.sleep((Integer) x * 10);} catch (Exception e) {} 

    System.out.println(x); 
    // do something with callbackFuture 
    } 
}); 
+0

你指的是什么回调Future? – vinoths

+0

只是你可以在你的回调中访问一些未来。没什么特别的。甚至不需要做任何事情。 (我不熟悉番石榴类;在java.util.concurrent中,你会做一个“新的FutureTask()”,其中有一个可运行的runTable并且在它的回调函数内部调用run(),之后isDone()将返回true;番石榴当然有相同的东西) – user2543253

+0

好吧,让我试试。 – vinoths

0

谢谢,这个作品!

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(20)); 

List<ListenableFuture<Void>> futures = new ArrayList<>(); 

for (int i = 1; i <= 20; i ++) { 
    final int x = i * 100; 

    ListenableFuture<Object> future = service.submit(new Callable(){ 
    @Override 
    public Object call() throws Exception { 
     Thread.sleep(10000/x); 

     return x; 
    } 
    }); 

    //Blank runnable to evaluate write completion 
    Runnable callback = new Runnable(){ 
    @Override 
    public void run() { 
     //do nothing 
    } 
    }; 

    final ListenableFutureTask<Void> callbackFuture = ListenableFutureTask.create(callback, null); 

    futures.add(callbackFuture); 

    Futures.addCallback(future, new FutureCallback<Object>() { 

    @Override 
    public void onFailure(Throwable t) { 
     try { 
     t.printStackTrace(); 
     } 
     finally { 
     callbackFuture.run(); 
     } 
    } 

    @Override 
    public void onSuccess(Object x) { 
     try { 
     try {Thread.sleep((Integer)x*10);}catch(Exception e){} 

     System.out.println(x); 
     } 
     finally { 
     callbackFuture.run(); 
     } 
    } 
    }); 
} 

ListenableFuture<List<Void>> listFuture = Futures.successfulAsList(futures); 
System.out.println("Waiting..."); 
System.out.println(listFuture.get()); 
System.out.println("Done"); 
4

如果你只是想阻止,直到您提交已全部完成N个任务的回调,你可以创建一个CountDownLatch与N的count然后只需调用countDown()它当每个回调完成(无论成功或失败)和await()它在你想阻止的点。

或者,你可以不喜欢你在你的答案一样,但不是使用ListenableFutureTask<Void>和无操作Runnable,只需用SettableFuture<Void>代替并在完成它叫set(null)

+0

我用'CountDownLatch'方法去了。很好地工作,干净实施! – geld0r

0

实现无眠:

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(20)); 

    List<ListenableFuture<Object>> futures = new ArrayList<>(); 

    for (int i = 1; i <= 20; i++) { 
     final int x = i * 100; 

     ListenableFuture<Object> future = service.submit(new Callable() { 
      @Override 
      public Object call() throws Exception { 
       Thread.sleep(10000/x); 

       return x; 
      } 
     }); 

     Futures.addCallback(future, new FutureCallback<Object>() { 

      @Override 
      public void onFailure(Throwable t) { 
       t.printStackTrace(); 
      } 

      @Override 
      public void onSuccess(Object x) { 
       try {Thread.sleep((Integer) x * 10);} catch (Exception e) {} 

       System.out.println(x); 
      } 
     }); 

     /* all Callbacks added in one list (ExecutionList) and executed by order. If not defined 3d argument (Executor) 
      then callbacks executed sequentially at task thread. 
     */ 
     final SettableFuture<Object> lastCalledFuture = SettableFuture.create(); 
     Futures.addCallback(future, new FutureCallback<Object>() { 
      @Override 
      public void onSuccess(Object result) { 
       lastCalledFuture.set(result); 
      } 

      @Override 
      public void onFailure(Throwable t) { 
       lastCalledFuture.setException(t); 
      } 
     }); 
     futures.add(lastCalledFuture); 
    } 

    ListenableFuture<List<Object>> listFuture = Futures 
      .successfulAsList(futures); 
    System.out.println("Waiting..."); 
    System.out.println(listFuture.get()); 
    System.out.println("Done");