2015-02-06 26 views
0

我使用Java的DelayQueue在第二次延迟后分派事件。但问题是,在负载很重的情况下,我的消费者在DelayQueue上阻塞,直到来自其他线程的相当多的offer()操作消失。Nonblocking DelayQueue,Java

有谁知道Java中的非阻塞延迟队列实现吗?

+0

当你说“散装”时,你的意思是你正在做很多要约电话,我想。多少? – rghome 2015-02-06 14:55:11

回答

0

不幸的是,DelayQueue阻塞队列,并且如果由于它使用锁而被强烈写入,它不会立即返回。

public E poll() { 
    final ReentrantLock lock = this.lock; 
    lock.lock(); 
    try { 
     E first = q.peek(); 
     if (first == null || first.getDelay(NANOSECONDS) > 0) 
      return null; 
     else 
      return q.poll(); 
    } finally { 
     lock.unlock(); 
    } 
} 

因此,如果许多线程写它,因为斯蒂芬说,没有什么可以做一下。

我使用ConcurrentSkipListSet和DelayedElement解决了这个问题。

public class DelayedElement implements Comparable<DelayedElement> { 

private final Long initTime; 
private final String msgId; 

public DelayedElement(Long initTime, String msgId) { 
    this.initTime = initTime; 
    this.msgId = msgId; 
}    

@Override 
public int hashCode() { 
    int hash = 5; 
    hash = 29 * hash + Objects.hashCode(this.initTime); 
    hash = 29 * hash + Objects.hashCode(this.msgId); 
    return hash; 
} 

@Override 
public boolean equals(Object obj) { 
    if (obj == null) { 
     return false; 
    } 
    if (getClass() != obj.getClass()) { 
     return false; 
    } 
    final DelayedElement other = (DelayedElement) obj; 
    if (!Objects.equals(this.initTime, other.initTime)) { 
     return false; 
    } 
    if (!Objects.equals(this.msgId, other.msgId)) { 
     return false; 
    } 
    return true; 
} 
@Override 
public int compareTo(DelayedElement o) { 
    return -o.initTime.compareTo(initTime); 
    } 
} 

在我的生产者的线程中,我添加每个元素的第二个延迟。 在我的消费者线程,我只是看它有像第二层的延迟元素:

long diff = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(1000L); 
NavigableSet<DelayedElement> set = queue.headSet(
new DelayedElement(diff, "", null)); 
//further processing goes here  

这样,我实现无阻塞性质和可以安全地写入,并在全油门从Collection读取。

1

我想你是误解了DelayQueue API或线程调度的工作方式。

如果你想要真正的非阻塞队列操作,那么DelayQueue已经提供了它们。例如,poll()将立即返回队列条目或null。它不会阻塞调用线程。 offer(...)方法是无阻塞插入的等效方法。另一方面,如果你实际上在说某些线程正在“饿死”,那么你可以做的事情就不多了。 Java的线程调度是不是“公平”:

  • 如果你有很多是可运行的线程的,没有试图给每一个运行时间类似的量。

  • 如果您有多个线程正在等待原始锁定或通知,那么调度程序将不会尝试选择一个“相当”的线程。

如果你有更多的线程比你有核心运行它们更多的线程可能会更容易。

最好的解决方案是设计你的算法,以至于如果线程被不公平地调度并不重要。它应该没关系;见Is a DelayQueue without fairness problematic?


为了记录在案,我不知道一个DelayQueue替代品,通告公平调度的。

+0

感谢您的回复。我看了一下DelayQueue的实现,结果发现它使用ReentrantLock来poll()和add()元素。这就是为什么当负载很大时我无法同时写和读的原因。 – 2015-02-09 11:19:38

+0

啊......我明白你的意思了。你在谈论争用而不是无限期阻止。 – 2015-02-09 11:36:43

+0

是的,确切地说。我应该使用正确的术语) – 2015-02-09 11:45:49