2011-12-18 46 views
0

我有2个主题。块和唤醒消费者线程/

他们每个人都从共享缓冲区读取一些数据。

currentDataBuffer.get(thisID); //currentDataBuffer is my shared buffer object 

我要拦截的每个线程每次调用拿到后,并释放它,当所有线程读取缓冲区(一次) 所以我用这个currentDataBuffer对象锁定:

currentDataBuffer.get(thisID); 
synchronized (currentDataBuffer) { 
    currentDataBuffer.wait(); 
} 

问题当所有线程完成从缓冲区读取(每行一行)时,如何释放这些线程?

里面currentDataBuffer我有一个地图,我存储从缓冲区中读取数据的线程的ID。

如何使用this.notifyAll();(来自currentDataBuffer)来唤醒所有锁定的线程?

回答

0

我建议你使用Java BlockingQueue数据结构。调用BlockingQueue.take()块直到有元素可用。 因此,而不是:

currentDataBuffer.get(thisID); 
synchronized (currentDataBuffer) { 
    currentDataBuffer.wait(); 
} 

您将有:

currentDataBuffer.take(); 

挡在了take()调用的线程可以通过添加元素到队列通过调用BlockingQueue.offer(object)方法

+0

我需要一个n线程的缓冲区,只有在所有线程都读取相同的数据后,我需要移动到缓冲区中的下一个项目。 – kenny 2011-12-18 16:36:50

0

的释放答案继续你的代码可能是使用currentDataBuffer.notifyAll()而不是this.notifyAll()(它不太清楚什么this引用你的问题)。但是,那么如何确保在所有线程读取缓冲区并进入等待状态之后调用notifyAll

如果您知道应读取缓冲区的线程数,更好的解决方案是使用两个CountDownLatch。这个班的javadoc有一个很好的例子。

------------更新:

您不需要更改读者线程的代码。您应该将锁存器放在您的currentDataBuffer对象中。在currentDataBuffer中创建一个线程,等待n项目锁存。在get方法中,读数完成后,在该锁存器上调用CountDownLatch.countDown()。哦,我会写它:

class CurrentDataBuffer implements Runnable { 

    private CountDownLatch singleThreadFinishedSignal; 
    private CountDownLatch allThreadsFinishedSignal; 

    public CurrentDataBuffer(int N) { 
     singleThreadFinishedSignal = new CountDownLatch(N); // waiter thread waits on this 
     allThreadsFinishedSignal = new CountDownLatch(1); // reader threads wait on this after finished reading 
    } 

    public void run() { 
     try { 
      singleThreadFinishedSignal.await(); // wait for all reader threads to finish 
      allThreadsFinishedSignal.countDown(); // let all reader threads proceed 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 

    public void get() { 
     try { 
      // read buffer item here /////////////////// 
      singleThreadFinishedSignal.countDown(); // mark that a thread has read the item 
      allThreadsFinishedSignal.await(); // wait for other reader threads to finish 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 
} 
+0

是的,我知道线程的数量。我用这个。currentDataBuffer对象本身内的notifyAll(),这就是为什么它是“this”。 – kenny 2011-12-18 17:00:58

+0

我无法使用CountDownLatch,因为我无法更改(代码)创建线程 – kenny 2011-12-18 17:02:47

+0

我更新了答案。 – yair 2011-12-18 17:19:33

0

我想的CountDownLatch(http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/CountDownLatch.html)会有所帮助。

I.e.使用初始值为2的CountDownLatch(或任何想要处理的线程数)来扩充currentDataBuffer。处理线程后调用latch.countDown(),然后latch.await()。

为了安全起见,您可能要小心countDowns不会丢失(例如,如果抛出异常)。

+0

我无法使用CountDownLatch,因为我无法更改线程的创建(代码) – kenny 2011-12-18 17:05:32

+0

您是否有权控制currentDataBuffer的类?如果是这样,你可以在其get方法中添加countDown()/ await()。 – 2011-12-18 17:10:47

+0

是的,我有currentDataBuffer控件,但为什么我应该在那里等待?消费者线程是那些从缓冲区读取数据的线程! – kenny 2011-12-18 17:13:45