2

我想通过CompletableFuture.supplyAsync将优先级队列添加到与LinkedBlockingQueue一起使用ThreadPoolExecutor的现有应用程序中。问题是我无法想出一个设计来分配任务优先级,然后我可以在PriorityBlockingQueue的比较器中访问它。这是因为我的任务被CompletableFuture包装到一个称为AsyncSupply的私有内部类的实例中,该私有内部类将原始任务隐藏在私有字段中。比较器随后被调用铸成的Runnable这些AsyncSupply对象,如下所示:如何与PriorityBlockingQueue一起使用CompletableFuture.supplyAsync?

public class PriorityComparator<T extends Runnable> implements Comparator<T> { 

    @Override 
    public int compare(T o1, T o2) { 

     // T is an AsyncSupply object. 
     // BUT I WANT SOMETHING I CAN ASSIGN PRIORITIES TOO! 
     return 0; 
    } 
} 

我调查延长CompletableFuture这样我就可以把它包装在不同的对象,但这么多的多CompletableFuture的可能性被封装和uninheritable 。所以扩展它似乎不是一个选项。也不是用适配器封装它,因为它实现了非常宽的接口。

我不知道如何解决这个问题,除了复制整个CompletableFuture,并修改它。有任何想法吗?

+0

我不明白你的问题。你正在调用'asyncSupply()'并传递一个配置了'LinkedBlockingQueue'的Executor?你想用'PriorityBlockingQueue'取代它?也就是说,似乎所有这些都在'supplyAsync()'函数和'CompletableFuture'的外部。你在哪里遇到麻烦?也许你可以显示你想写的代码,中间缺少的位突出显示。 – erickson

+0

谢谢@erickson回复,并提出一个很好的问题。我没有注意到为什么我不能只使用PriorityBlockingQueue的重要一点。所以原因是PriorityBlockingQueue会传递给比较器:compare方法将两个AsyncSupply对象转换为Runnables。这个AsyncSupply是CompletableFuture的一个私有的内部类,所以我不能将它转换回AsyncSupply,更不用说访问它的私有字段,该字段引用我想要分配优先级的原始未解包任务。所以我想这个问题是:我如何或在哪里分配我的优先事项?我会添加一些代码。 – jacob

回答

5

这似乎是API中的一个限制,CompletableFuture没有提供直接使用PriorityBlockingQueue的方法。幸运的是,我们可以在没有太多麻烦的情况下绕过它。在Oracle的1.8 JVM,他们碰巧命名所有的内部类字段fn的,所以提取我们的首要任务感知Runnable S可没有太多的麻烦来完成:

public class CFRunnableComparator implements Comparator<Runnable> { 

    @Override 
    @SuppressWarnings("unchecked") 
    public int compare(Runnable r1, Runnable r2) { 
     // T might be AsyncSupply, UniApply, etc., but we want to 
     // compare our original Runnables. 
     return ((Comparable) unwrap(r1)).compareTo(unwrap(r2)); 
    } 

    private Object unwrap(Runnable r) { 
     try { 
      Field field = r.getClass().getDeclaredField("fn"); 
      field.setAccessible(true); 
      // NB: For performance-intensive contexts, you may want to 
      // cache these in a ConcurrentHashMap<Class<?>, Field>. 
      return field.get(r); 
     } catch (IllegalAccessException | NoSuchFieldException e) { 
      throw new IllegalArgumentException("Couldn't unwrap " + r, e); 
     } 
    } 
} 

这是假设你的Supplier类是Comparable,是这样的:

public interface WithPriority extends Comparable<WithPriority> { 
    int priority(); 
    @Override 
    default int compareTo(WithPriority o) { 
     // Reverse comparison so higher priority comes first. 
     return Integer.compare(o.priority(), priority()); 
    } 
} 

public class PrioritySupplier<T> implements Supplier<T>, WithPriority { 
    private final int priority; 
    private final Supplier<T> supplier; 
    public PrioritySupplier(int priority, Supplier<T> supplier) { 
     this.priority = priority; 
     this.supplier = supplier; 
    } 
    @Override 
    public T get() { 
     return supplier.get(); 
    } 
    @Override 
    public int priority() { 
     return priority; 
    } 
} 

使用方法如下:

PriorityBlockingQueue<Runnable> q = new PriorityBlockingQueue<>(11 /*default*/, 
     new CFRunnableComparator()); 
ThreadPoolExecutor pool = new ThreadPoolExecutor(..., q); 
CompletableFuture.supplyAsync(new PrioritySupplier<>(n,() -> { 
    ... 
}), pool); 

如果你创建类,比如PriorityFunctionPriorityBiConsumer,您可以使用相同的技术来调用方法,如thenApplyAsyncwhenCompleteAsync以及适当的优先级。

+1

抱歉,延迟回复,非常感谢。我最终做了你所说的话,并且为供应商和比较者抽出了我自己的实现。 – jacob

相关问题