1
任何人都可以解释下面的consumer-2是如何消费'null'的吗?我的代码应该防止这种情况。单个生产者多个消费者 - 队列包含null
public class Test {
public static void main(String args[]) throws InterruptedException {
BoundedQueue<Integer> sharedQueue = new BoundedQueue<>(10);
Callable<Integer> producer1 = new Producer(sharedQueue, "producer-1");
//Callable<Integer> producer2 = new Producer(sharedQueue, "producer-2");
Callable<Integer> consumer1 = new Consumer(sharedQueue, "consumer-1");
Callable<Integer> consumer2 = new Consumer(sharedQueue, "consumer-2");
Collection<Callable<Integer>> callables = new HashSet<>();
callables.add(producer1);
//callables.add(producer2);
callables.add(consumer1);
callables.add(consumer2);
ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.invokeAll(callables);
}
}
public class BoundedQueue<T> {
private int capacity;
private int head;
private int tail;
private int currentSizeOfBuffer;
private T[] buffer;
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public BoundedQueue(int capacity) {
this.capacity = capacity;
this.buffer = (T[]) new Object[capacity];
}
public void put(T element) throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
if(isBufferFull()) {
waitOnAvailableSlot();
}
try {
buffer[tail] = element;
tail = getNextAvailableSlot(tail);
currentSizeOfBuffer++;
informConsumerQueueHasElement();
} finally {
lock.unlock();
}
}
private boolean isBufferFull() {
return capacity == currentSizeOfBuffer;
}
private void waitOnAvailableSlot() throws InterruptedException {
notFull.await();
}
private void informConsumerQueueHasElement() {
notEmpty.signal();
}
public T take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
if(isBufferEmpty()) {
waitOnAvailableElement();
}
try {
T element = buffer[head];
head = getNextAvailableSlot(head);
currentSizeOfBuffer--;
informProducerQueueHasSpaceAvailable();
return element;
} finally {
lock.unlock();
}
}
private boolean isBufferEmpty() {
return 0 == currentSizeOfBuffer;
}
private void waitOnAvailableElement() throws InterruptedException {
notEmpty.await();
}
private void informProducerQueueHasSpaceAvailable() {
notFull.signal();
}
private final int getNextAvailableSlot(int currentSlotPosition) {
int nextAvailableSlot = ++currentSlotPosition;
return (nextAvailableSlot == capacity) ? 0 : nextAvailableSlot;
}
}
public class Producer implements Callable<Integer> {
private final BoundedQueue sharedQueue;
private String name;
@Override
public Integer call() throws Exception {
for(int i=0; i<10; i++){
try {
sharedQueue.put(i);
System.out.println(name + " produced: " + i);
} catch (InterruptedException ex) {
Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
}
}
return null;
}
public Producer(BoundedQueue sharedQueue, String name) {
this.sharedQueue = sharedQueue;
this.name = name;
}
}
public class Consumer implements Callable<Integer> {
private final BoundedQueue sharedQueue;
private String name;
@Override
public Integer call() throws Exception {
while(true){ //what is happening here?
try {
Integer element = (Integer) sharedQueue.take();
System.out.println(name + " consumed: "+ element);
} catch (InterruptedException ex) {
Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
public Consumer(BoundedQueue sharedQueue, String name) {
this.sharedQueue = sharedQueue;
this.name = name;
}
}
输出:
- 生产者-2中制备:0
- 消费者-2消耗:空
- 消费者-1消耗:0
- 生产者-2中制备:1
- 生产者-2生产者:2
- 消费者-2消费者:2
- 消费者-1消耗:0
- 生产者-1中制备:0
- 消费者-2消耗:3
- 等
另一个运行:
- 生产者-2中制备: 0
- 消费者-1消费:0
- 消费者-2消费:空
- 生产者-1中制备:0
- roducer-2中制备:1
- 生产者-1中制备:1
- 消费者-2消耗:0
- 消费者-1消耗:空
- 消费消耗2:2
- 等
当然..明确指出。 – TheCoder