2017-02-21 16 views
4

下面的示例代码我正在注入一个biconsumer睡眠100毫米作为一套完整的未来的完成行动。我已经使用whenCompleteAsync方法,通过单独使用executorServiceexecutorServiceThreadPoolExecutor与芯池大小5,最大尺寸和5 1.如何捕获CompletableFuture的whenCompleteAsync调用中抛出的RejectedExecutionException?

public class CompleteTest { 
    public static void main(String[] args) { 
     ExecutorService executorService = new ThreadPoolExecutor(5, 5, 10, 
       TimeUnit.SECONDS, new ArrayBlockingQueue<>(1)); 

     ArrayList<CompletableFuture<String>> list = new ArrayList<>(); 

     for (int i = 0; i <100; i++) { 
      CompletableFuture<String> stringCompletableFuture = new CompletableFuture<>(); 
      stringCompletableFuture.whenCompleteAsync((e, a) -> { 
       System.out.println("Complete " + e); 
       try { 
        Thread.sleep(100); 
       } catch (InterruptedException e1) {e1.printStackTrace();} 
      }, executorService); 

      list.add(stringCompletableFuture); 
     } 

     for (int i = 0; i < list.size(); i++) { 
      list.get(i).complete(i + ""); 
     } 
    } 
} 

当我跑的代码的队列长度,即使我完成100个期货仅6输出得到打印。这是5个核心线程和1个排队的线程。剩下的事发生了什么?如果由于队列已满而导致其他可运行列表无法提交给执行程序服务,则不应该出现异常。

输出

Complete 0 
Complete 1 
Complete 2 
Complete 3 
Complete 4 
Complete 5 

回答

5

抛出一个异常,并且CompletableFuture分外完成,只是那些不是任何你追踪。

您正在使用构造函数实例化并初始化ThreadPoolExecutor,该构造函数使用仅引发异常的默认RejectedExecutionHandler。我们知道如果ExecutorService无法接受任务,则会抛出RejectedExecutionException。那么添加的任务在哪里以及抛出的异常在哪里?

就目前而言,所有的链接发生在whenCompleteAsync之内。当你打电话给你的时候,你在接收者CompletableFuturestringCompletableFuture上添加一个依赖关系。当stringCompletableFuture完成(在这种情况下成功)时,它将创建一个新的CompletableFuture(它返回)并尝试安排给定的ExecutorService上给定的BiConsumer

由于ExecutorService的队列没有空间,它会调用RejectedExecutionHandler,这将抛出RejectedExecutionException。该例外情况在当时被捕获,并用于将被退回。

换句话说,在您的for循环中,捕获由whenCompleteAsync返回的CompletableFuture,将其存储并打印出其状态。

ArrayList<CompletableFuture<String>> list = new ArrayList<>(); 
ArrayList<CompletableFuture<?>> dependents = new ArrayList<>(); 
for (int i = 0; i <100; i++) { 
    CompletableFuture<String> stringCompletableFuture = new CompletableFuture<>(); 
    CompletableFuture<?> thisWillHaveException = stringCompletableFuture.whenCompleteAsync((e, a) -> { 
     System.out.println("Complete " + e); 
     try { 
      Thread.sleep(100); 
     } catch (InterruptedException e1) {e1.printStackTrace();} 
    }, executorService); 
    dependents.add(thisWillHaveException); 
    list.add(stringCompletableFuture); 
} 

for (int i = 0; i < list.size(); i++) { 
    list.get(i).complete(i + ""); 
} 
Thread.sleep(2000); 
dependents.forEach(cf -> { 
    cf.whenComplete((r, e) -> { 
     if (e != null) 
      System.out.println(cf + " " + e.getMessage()); 
    }); 
}); 

你会发现,他们都是(除了已成功先前打印的6)用RejectedExecutionException格外完成。

... 
[email protected][Completed exceptionally] java.util.concurrent.RejectedExecutionException: Task [email protected] rejected from [email protected][Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0] 
[email protected][Completed exceptionally] java.util.concurrent.RejectedExecutionException: Task [email protected] rejected from [email protected][Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0] 
[email protected][Completed exceptionally] java.util.concurrent.RejectedExecutionException: Task [email protected] rejected from [email protected][Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0] 
相关问题