2016-11-10 118 views
1

任何人都可以解释下面的consumer-2是如何消费'null'的吗?我的代码应该防止这种情况。单个生产者多个消费者 - 队列包含null

public class Test { 

public static void main(String args[]) throws InterruptedException { 

    BoundedQueue<Integer> sharedQueue = new BoundedQueue<>(10); 

    Callable<Integer> producer1 = new Producer(sharedQueue, "producer-1"); 
    //Callable<Integer> producer2 = new Producer(sharedQueue, "producer-2"); 
    Callable<Integer> consumer1 = new Consumer(sharedQueue, "consumer-1"); 
    Callable<Integer> consumer2 = new Consumer(sharedQueue, "consumer-2"); 

    Collection<Callable<Integer>> callables = new HashSet<>(); 
    callables.add(producer1); 
    //callables.add(producer2); 
    callables.add(consumer1); 
    callables.add(consumer2); 

    ExecutorService executorService = Executors.newFixedThreadPool(10); 
    executorService.invokeAll(callables); 
} 
} 

public class BoundedQueue<T> { 

private int capacity; 
private int head; 
private int tail; 
private int currentSizeOfBuffer; 
private T[] buffer; 

private final ReentrantLock lock = new ReentrantLock(); 
private final Condition notFull = lock.newCondition(); 
private final Condition notEmpty = lock.newCondition(); 

public BoundedQueue(int capacity) { 
    this.capacity = capacity; 
    this.buffer = (T[]) new Object[capacity]; 
} 

public void put(T element) throws InterruptedException { 

    final ReentrantLock lock = this.lock; 
    lock.lock(); 

    if(isBufferFull()) { 
     waitOnAvailableSlot(); 
    } 

    try { 
     buffer[tail] = element; 
     tail = getNextAvailableSlot(tail); 
     currentSizeOfBuffer++; 

     informConsumerQueueHasElement(); 

    } finally { 
     lock.unlock(); 
    } 
} 

private boolean isBufferFull() { 
    return capacity == currentSizeOfBuffer; 
} 

private void waitOnAvailableSlot() throws InterruptedException { 
    notFull.await(); 
} 

private void informConsumerQueueHasElement() { 
    notEmpty.signal(); 
} 

public T take() throws InterruptedException { 

    final ReentrantLock lock = this.lock; 
    lock.lock(); 

    if(isBufferEmpty()) { 
     waitOnAvailableElement(); 
    } 

    try { 
     T element = buffer[head]; 
     head = getNextAvailableSlot(head); 
     currentSizeOfBuffer--; 

     informProducerQueueHasSpaceAvailable(); 

     return element; 
    } finally { 
     lock.unlock(); 
    } 
} 

private boolean isBufferEmpty() { 
    return 0 == currentSizeOfBuffer; 
} 

private void waitOnAvailableElement() throws InterruptedException { 
    notEmpty.await(); 
} 

private void informProducerQueueHasSpaceAvailable() { 
    notFull.signal(); 
} 

private final int getNextAvailableSlot(int currentSlotPosition) { 
    int nextAvailableSlot = ++currentSlotPosition; 
    return (nextAvailableSlot == capacity) ? 0 : nextAvailableSlot; 
} 
} 


public class Producer implements Callable<Integer> { 

private final BoundedQueue sharedQueue; 
private String name; 

@Override 
public Integer call() throws Exception { 

    for(int i=0; i<10; i++){ 
     try { 
      sharedQueue.put(i); 
      System.out.println(name + " produced: " + i); 
     } catch (InterruptedException ex) { 
      Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex); 
     } 
    } 
    return null; 
} 

public Producer(BoundedQueue sharedQueue, String name) { 
    this.sharedQueue = sharedQueue; 
    this.name = name; 
} 
} 

public class Consumer implements Callable<Integer> { 

private final BoundedQueue sharedQueue; 
private String name; 

@Override 
public Integer call() throws Exception { 

    while(true){ //what is happening here? 
     try { 
      Integer element = (Integer) sharedQueue.take(); 
      System.out.println(name + " consumed: "+ element); 
     } catch (InterruptedException ex) { 
      Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex); 
     } 
    } 
} 

public Consumer(BoundedQueue sharedQueue, String name) { 
    this.sharedQueue = sharedQueue; 
    this.name = name; 
} 
} 

输出:

  • 生产者-2中制备:0
  • 消费者-2消耗:空
  • 消费者-1消耗:0
  • 生产者-2中制备:1
  • 生产者-2生产者:2
  • 消费者-2消费者:2
  • 消费者-1消耗:0
  • 生产者-1中制备:0
  • 消费者-2消耗:3

另一个运行:

  • 生产者-2中制备: 0
  • 消费者-1消费:0
  • 消费者-2消费:
  • 生产者-1中制备:0
  • roducer-2中制备:1
  • 生产者-1中制备:1
  • 消费者-2消耗:0
  • 消费者-1消耗:
  • 消费消耗2:2

回答

1

您需要使用while(isBufferEmpty())而不仅仅是if(并且完全相同)。由于所有消费者(和生产者)同时得到信号,因此您必须重新检查以确保其他消费者尚未处理添加在队列中的元素。

+0

当然..明确指出。 – TheCoder

相关问题