1

我有以下代码:如何防止在上下文线程CompletableFuture#whenComplete执行

ConcurrentHashMap taskMap= new ConcurrentHashMap(); 
.... 
taskMap.compute(key, (k, queue) -> { 
     CompletableFuture<Void> future = (queue == null) 
       ? CompletableFuture.runAsync(myTask, poolExecutor) 
       : queue.whenCompleteAsync((r, e) -> myTask.run(), poolExecutor); 
     //to prevent OutOfMemoryError in case if we will have too much keys 
     future.whenComplete((r, e) -> taskMap.remove(key, future));    
     return future; 
    }); 

这个代码在future已经完成whenComplete函数参数的情况下调用在同一线程compute所调用的问题。在这个方法的主体中,我们从地图中删除条目。但计算方法文档禁止此应用程序冻结。

我该如何解决这个问题?

回答

0

最明显的解决方案是使用whenCompleteAsync而不是whenComplete,因为前者保证使用提供的Executor执行操作而不是调用线程。可与

Executor ex = r -> { System.out.println("job scheduled"); new Thread(r).start(); }; 
for(int run = 0; run<2; run++) { 
    boolean completed = run==0; 
    System.out.println("*** "+(completed? "with already completed": "with async")); 
    CompletableFuture<String> source = completed? 
     CompletableFuture.completedFuture("created in "+Thread.currentThread()): 
     CompletableFuture.supplyAsync(() -> { 
      LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1)); 
      return "created in "+Thread.currentThread(); 
     }, ex); 

    source.thenApplyAsync(s -> s+"\nprocessed in "+Thread.currentThread(), ex) 
      .whenCompleteAsync((s,t) -> { 
       if(t!=null) t.printStackTrace(); else System.out.println(s); 
       System.out.println("consumed in "+Thread.currentThread()); 
      }, ex) 
      .join(); 
} 

得到证实,这将打印出类似这样

*** with already completed 
job scheduled 
job scheduled 
created in Thread[main,5,main] 
processed in Thread[Thread-0,5,main] 
consumed in Thread[Thread-1,5,main] 
*** with async 
job scheduled 
job scheduled 
job scheduled 
created in Thread[Thread-2,5,main] 
processed in Thread[Thread-3,5,main] 
consumed in Thread[Thread-4,5,main] 

所以你可以只使用

taskMap.compute(key, (k, queue) -> { 
     CompletableFuture<Void> future = (queue == null) 
       ? CompletableFuture.runAsync(myTask, poolExecutor) 
       : queue.whenCompleteAsync((r, e) -> myTask.run(), poolExecutor); 
     //to prevent OutOfMemoryError in case if we will have too much keys 
     future.whenCompleteAsync((r, e) -> taskMap.remove(key, future), poolExecutor); 
     return future; 
    }); 

如果提前完成有显著的可能性,你可以使用减少开销

taskMap.compute(key, (k, queue) -> { 
     CompletableFuture<Void> future = (queue == null) 
       ? CompletableFuture.runAsync(myTask, poolExecutor) 
       : queue.whenCompleteAsync((r, e) -> myTask.run(), poolExecutor); 
     //to prevent OutOfMemoryError in case if we will have too much keys 
     if(future.isDone()) future = null; 
     else future.whenCompleteAsync((r, e) -> taskMap.remove(key, future), poolExecutor); 
     return future; 
    }); 

也许,你没有找到这个明显的解决方案,因为你不喜欢依赖操作总是作为一个新任务被调度到池中,即使完成发生在不同的任务中。你可以用一个专门的执行,将重新安排仅在必要时任务解决这个问题:

Executor inPlace = Runnable::run; 
Thread forbidden = Thread.currentThread(); 
Executor forceBackground 
     = r -> (Thread.currentThread()==forbidden? poolExecutor: inPlace).execute(r); 

… 

future.whenCompleteAsync((r, e) -> taskMap.remove(key, future), forceBackground); 

但是你可能会重新考虑这个复杂的每个映射清理逻辑是否真的需要的话。它不仅复杂,而且可能会产生显着的开销,可能会安排大量的清理操作,而这些清理操作在执行时已经过时并不是真正需要的。

这可能是更简单,更有效地从时间执行

taskMap.values().removeIf(CompletableFuture::isDone); 

时间清理整个地图。

相关问题