2017-10-11 50 views
0

我有以下工作代码:加入多个回调执行在CompletableFuture

DiscoveryCallback callback = new DiscoveryCallback(); 
Manager.discover(someparam, callback); 

我想这个调用包装成一个CompletableFuture有RX-ISH API与其他异步操作组成。

Manager.discover()是一个第三方库,实际上是一个用于本机功能结合的方法,并且它多次执行回调,在不同的线程。

我DiscoveryCallback实现以下接口:

interface onFoundListerner { 
    onFound(List<Result> results) 
    onError(Throwable error) 
} 

我试图注入的CompletableFuture<List<Result>>一个实例为DiscoveryCallback,然后调用完成方法。对于一个回调执行它可以正常工作,而其他回调则被忽略。

我怎样才能加入这个多次执行的结果,使我的包装返回单个CompletableFuture?

+0

也许一个'Iterator'或'Stream'将比'CompletableFuture's更合适。 – acelent

+0

这样我的包装将被阻止。我需要客户端上的反应式API,但我不能依赖RxJava。顺便说一句,Observables一切都按预期工作。 – thiagogcm

回答

0

什么异步队列?

public class AsyncQueue<T> { 
    private final Object lock = new Object(); 
    private final Queue<T> queue = new ArrayDeque<T>(); 
    private CompletableFuture<Void> removeCf = new CompletableFuture<>(); 

    public void add(T item) { 
     synchronized (lock) { 
      queue.add(item); 
      removeCf.complete(null); 
     } 
    } 

    public CompletableFuture<T> removeAsync() { 
     CompletableFuture<Void> currentCf = null; 
     synchronized (lock) { 
      T item = queue.poll(); 
      if (item != null) { 
       return CompletableFuture.completedFuture(item); 
      } 
      else { 
       if (removeCf.isDone()) { 
        removeCf = new CompletableFuture<>(); 
       } 
       currentCf = removeCf; 
      } 
     } 
     return currentCf 
      .thenCompose(v -> removeAsync()); 
    } 
} 

在Java 9,你可以使用.completeOnTimeout(null, timeout, unit)removeAsync返回的CompletableFuture有超时机制。

的Java 9之前,你需要安排自己的超时。下面是使用嵌入式超时调度版本:

public class AsyncQueue<T> { 
    static final ScheduledExecutorService scheduledExecutorService; 

    static { 
     ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1, new ScheduledThreadFactory()); 
     scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true); 
     scheduledExecutorService = Executors.unconfigurableScheduledExecutorService(scheduledThreadPoolExecutor); 
    } 

    static final class ScheduledThreadFactory implements ThreadFactory { 
     static AtomicInteger scheduledExecutorThreadId = new AtomicInteger(0); 

     static final synchronized int nextScheduledExecutorThreadId() { 
      return scheduledExecutorThreadId.incrementAndGet(); 
     } 

     @Override 
     public Thread newThread(Runnable runnable) { 
      Thread thread = new Thread(runnable, "AsynchronousSemaphoreScheduler-" + nextScheduledExecutorThreadId()); 
      thread.setDaemon(true); 
      return thread; 
     } 
    } 

    private final Object lock = new Object(); 
    private final Queue<T> queue = new ArrayDeque<T>(); 
    private CompletableFuture<Long> removeCf = new CompletableFuture<>(); 

    public void add(T item) { 
     synchronized (lock) { 
      queue.add(item); 
      removeCf.complete(System.nanoTime()); 
     } 
    } 

    public CompletableFuture<T> removeAsync(long timeout, TimeUnit unit) { 
     if (unit == null) throw new NullPointerException("unit"); 

     CompletableFuture<Long> currentCf = null; 
     synchronized (lock) { 
      T item = queue.poll(); 
      if (item != null) { 
       return CompletableFuture.completedFuture(item); 
      } 
      else if (timeout <= 0L) { 
       return CompletableFuture.completedFuture(null); 
      } 
      else { 
       if (removeCf.isDone()) { 
        removeCf = new CompletableFuture<>(); 
       } 
       currentCf = removeCf; 
      } 
     } 
     long startTime = System.nanoTime(); 
     long nanosTimeout = unit.toNanos(timeout); 
     CompletableFuture<T> itemCf = currentCf 
      .thenCompose(endTime -> { 
       long leftNanosTimeout = nanosTimeout - (endTime - startTime); 
       return removeAsync(leftNanosTimeout, TimeUnit.NANOSECONDS); 
      }); 
     ScheduledFuture<?> scheduledFuture = scheduledExecutorService 
      .schedule(() -> itemCf.complete(null), timeout, unit); 
     itemCf 
      .thenRun(() -> scheduledFuture.cancel(true)); 
     return itemCf; 
    } 

    public CompletableFuture<T> removeAsync() { 
     CompletableFuture<Long> currentCf = null; 
     synchronized (lock) { 
      T item = queue.poll(); 
      if (item != null) { 
       return CompletableFuture.completedFuture(item); 
      } 
      else { 
       if (removeCf.isDone()) { 
        removeCf = new CompletableFuture<>(); 
       } 
       currentCf = removeCf; 
      } 
     } 
     return currentCf 
      .thenCompose(endTime -> removeAsync()); 
    } 
} 

您可以重构调度出这个类与其他类共享,也许成采用工厂在.properties文件建立并诉诸于一个单该示例中的默认值如果未配置。

您可以使用ReentrantLock而不是​​语句来获得这一点性能。它应该只在重大的争论中才重要,但AsyncQueue<T>可以用于这种目的。

相关问题