我使用Java的DelayQueue在第二次延迟后分派事件。但问题是,在负载很重的情况下,我的消费者在DelayQueue上阻塞,直到来自其他线程的相当多的offer()操作消失。Nonblocking DelayQueue,Java
有谁知道Java中的非阻塞延迟队列实现吗?
我使用Java的DelayQueue在第二次延迟后分派事件。但问题是,在负载很重的情况下,我的消费者在DelayQueue上阻塞,直到来自其他线程的相当多的offer()操作消失。Nonblocking DelayQueue,Java
有谁知道Java中的非阻塞延迟队列实现吗?
不幸的是,DelayQueue阻塞队列,并且如果由于它使用锁而被强烈写入,它不会立即返回。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}
因此,如果许多线程写它,因为斯蒂芬说,没有什么可以做一下。
我使用ConcurrentSkipListSet和DelayedElement解决了这个问题。
public class DelayedElement implements Comparable<DelayedElement> {
private final Long initTime;
private final String msgId;
public DelayedElement(Long initTime, String msgId) {
this.initTime = initTime;
this.msgId = msgId;
}
@Override
public int hashCode() {
int hash = 5;
hash = 29 * hash + Objects.hashCode(this.initTime);
hash = 29 * hash + Objects.hashCode(this.msgId);
return hash;
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final DelayedElement other = (DelayedElement) obj;
if (!Objects.equals(this.initTime, other.initTime)) {
return false;
}
if (!Objects.equals(this.msgId, other.msgId)) {
return false;
}
return true;
}
@Override
public int compareTo(DelayedElement o) {
return -o.initTime.compareTo(initTime);
}
}
在我的生产者的线程中,我添加每个元素的第二个延迟。 在我的消费者线程,我只是看它有像第二层的延迟元素:
long diff = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(1000L);
NavigableSet<DelayedElement> set = queue.headSet(
new DelayedElement(diff, "", null));
//further processing goes here
这样,我实现无阻塞性质和可以安全地写入,并在全油门从Collection读取。
我想你是误解了DelayQueue
API或线程调度的工作方式。
如果你想要真正的非阻塞队列操作,那么DelayQueue
已经提供了它们。例如,poll()
将立即返回队列条目或null
。它不会阻塞调用线程。 offer(...)
方法是无阻塞插入的等效方法。另一方面,如果你实际上在说某些线程正在“饿死”,那么你可以做的事情就不多了。 Java的线程调度是不是“公平”:
如果你有很多是可运行的线程的,没有试图给每一个运行时间类似的量。
如果您有多个线程正在等待原始锁定或通知,那么调度程序将不会尝试选择一个“相当”的线程。
如果你有更多的线程比你有核心运行它们更多的线程可能会更容易。
最好的解决方案是设计你的算法,以至于如果线程被不公平地调度并不重要。它应该没关系;见Is a DelayQueue without fairness problematic?。
为了记录在案,我不知道一个DelayQueue
替代品,通告公平调度的。
感谢您的回复。我看了一下DelayQueue的实现,结果发现它使用ReentrantLock来poll()和add()元素。这就是为什么当负载很大时我无法同时写和读的原因。 – 2015-02-09 11:19:38
啊......我明白你的意思了。你在谈论争用而不是无限期阻止。 – 2015-02-09 11:36:43
是的,确切地说。我应该使用正确的术语) – 2015-02-09 11:45:49
当你说“散装”时,你的意思是你正在做很多要约电话,我想。多少? – rghome 2015-02-06 14:55:11