2012-06-07 81 views
5

我有一个多线程应用程序,它有一个生产者线程和多个消费者线程。 数据存储在共享线程安全集合中,并在缓冲区中有足够数据时刷新到数据库。多线程Java应用程序中的数据缓冲

从javadocs中 -

BlockingQueue<E> 

队列支持两个附加等待存储元素时该队列,以获取元素时变为非空,并等待空间在队列中变得可用的操作。

take() 

获取并移除此队列的头部,如有必要则等待直到一个可用元素。

我的问题 -

  1. 是否有具有E []取(INT n)的方法的另一个集合?即阻塞队列等待一个元素可用。我想要的是 ,它应该等到100或200个元素可用。
  2. 另外,有没有另一种方法可以用来解决这个问题而无需轮询?
+0

应该将元素平均分配给每个消费者,还是第一个消费者对take方法获得第一个'n'元素,第二个消费者获得下一个'n'元素等? – SimonC

+0

这真的是你想要做的吗?如果生产速度超出最终调整的速度,则可能会在数据生成和刷新到数据库之间产生几乎任意大的延迟。如果你真的需要做到这一点缓冲的所有逻辑也许应该更喜欢“等我有N个元素或者X毫秒过去了” – DRMacIver

+0

你为什么要等待?为什么不使用'drain()'?我会将所有可用的数据写入最大值,我宁愿不丢失数据。 –

回答

2

我认为唯一的办法就是要么扩展一些BlockingQueue的实现或者使用take创建某种实用方法:

public <E> void take(BlockingQueue<E> queue, List<E> to, int max) 
     throws InterruptedException { 

    for (int i = 0; i < max; i++) 
     to.add(queue.take()); 
} 
+0

实际上,假设只有一个消费者,你的方法比我的要好得多。 – Zarkonnen

+1

这种方法根本不能很好地处理InterruptedException,因为如果中断,则会丢失任何元素。如果它不打算处理中断本身,或者为了捕获并返回到目前为止已耗尽的元素,它确实需要将元素添加到传入的集合中。 – DRMacIver

+0

哦,评论点+1。更新! – dacwe

1

我不知道是否有在具有take(int n)型方法标准库相似的类,但你应该能够包住默认BlockingQueue添加该功能没有太多的麻烦,你不认为?

另一种情况是触发一个操作,在该操作中,您将元素置于集合中,其中由您设置的阈值将触发冲洗。

2

drainTo方法不是你正在寻找的东西,而是它会满足你的目的吗?

http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/BlockingQueue.html#drainTo(java.util.Collection,INT)

编辑

你可以实现一个稍微更高性能的批量阻挡takemin使用takedrainTo组合:

public <E> void drainTo(final BlockingQueue<E> queue, final List<E> list, final int min) throws InterruptedException 
{ 
    int drained = 0; 
    do 
    { 
    if (queue.size() > 0) 
     drained += queue.drainTo(list, min - drained); 
    else 
    { 
     list.add(queue.take()); 
     drained++; 
    } 
    } 
    while (drained < min); 
} 
+1

这是他/她想要做的相当有什么固定利率。 – posdef

+0

我已经更新了答案,表明它不能解决确切的问题。有时,OP不知道其他解决方案,所以总是值得提问。 – SimonC

+0

广东话反驳这:) – posdef

1

所以这应该是一个线程安全的队列,可以阻止任意数量的元素。验证线程代码是否正确的更多目光是值得欢迎的。

package mybq; 

import java.util.ArrayList; 
import java.util.LinkedList; 
import java.util.List; 

public class ChunkyBlockingQueue<T> { 
    protected final LinkedList<T> q = new LinkedList<T>(); 
    protected final Object lock = new Object(); 

    public void add(T t) { 
     synchronized (lock) { 
      q.add(t); 
      lock.notifyAll(); 
     } 
    } 

    public List<T> take(int numElements) { 
     synchronized (lock) { 
      while (q.size() < numElements) { 
       try { 
        lock.wait(); 
       } catch (InterruptedException e) { 
        Thread.currentThread().interrupt(); 
       } 
      } 
      ArrayList<T> l = new ArrayList<T>(numElements); 
      l.addAll(q.subList(0, numElements)); 
      q.subList(0, numElements).clear(); 
      return l; 
     } 
    } 
} 
+1

'add'中的'notifyAll'在这里有点浪费。它应该是一个单独的'notify',然后'take'可以在完成后还有更多的元素时再次调用'notify'。 – SimonC