2012-12-12 99 views
0

我正在研究Java中生产者消费者问题的变体。我有一个生成器线程创建对象,将其放入优先级阻塞队列中,然后传递到主容器,控制器,这是一个有界缓冲区。生产者消费者变种java BlockingQueues

该队列的原因是,当主容器有一定百分比的对象A时,它只接受类型B的对象,以及我们被要求查看的其他一些场景。 我无法弄清楚代码出了什么问题,调试器只是从InQueue中的in.offer跳转到Producer中的in.push。任何方向或建议,将不胜感激。因为走错了路你使用泛型

import java.util.concurrent.PriorityBlockingQueue; 

     public class InQueue implements Runnable { 

     Controller c; 
     private PriorityBlockingQueue in; 

     public InQueue(Controller c) { 
      this.c = c; 
      in = new PriorityBlockingQueue(); 
     } 

     public void push(C c) { 

      in.offer(c); 
      try { 
       Thread.sleep(1000); 
      } catch (InterruptedException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
     } 

     public void run() { 
      while (true) { 
       try { 
        C temp = (C) in.take(); //will block if empty 
        c.arrive(temp); 
       } catch (InterruptedException e) {} // TODO 
      } 
     } 
    } 

public class Controller { 

    private BoundedBuffer buffer; 
    private int used; 


    Controller(int capacity) { 
     this.buffer = new BoundedBuffer(capacity); 
     used = 0; 
    } 


    public void arrive(C c) { 
     try { 
      buffer.put(c); 
      used++; 
     } catch (InterruptedException e) { } //TODO 
    } 

    public C depart() { 
     C temp = null; //BAD IDEA? 
     try { 
      temp = (C)buffer.take(); 
      used--; 
     } catch (InterruptedException e) { } //TODO 
     return temp; //could be null 
    } 
} 

回答

0

你的代码不编译。另一件事是没有BoundedBuffer的默认实现。下面我通过阻塞队列为生产者 - 消费者问题做了一个工作实现。看看并纠正你的错误。

package concurrency; 

import java.util.concurrent.BlockingQueue; 
import java.util.concurrent.LinkedBlockingQueue; 

public class Producer<T> { 
    private final BlockingQueue<T> queue; 
    private final Consumer consumer; 
    private static volatile boolean isShutdown; 
    private final static Object lock = new Object(); 

    public Producer() { 
     this.queue = new LinkedBlockingQueue<T>(); 
     this.consumer = new Consumer(); 
    } 

    public void start() { 
     consumer.start(); 
    } 

    public void stop() { 
     synchronized (lock) { 
      isShutdown = true; 
     } 
     consumer.interrupt(); 
    } 

    public void put(T obj) throws InterruptedException { 
     synchronized (lock) { 
      if (isShutdown) 
       throw new IllegalStateException("Consumer Thread is not active"); 
     } 
     queue.put(obj); 
    } 

    private class Consumer extends Thread { 

     public void run() { 
      while (true) { 
       synchronized (lock) { 
        if (isShutdown) 
         break; 
       } 

       T t = takeItem(); 
       // do something with 't' 
       if(t!=null) 
       printItem(t); 
      } 
     } 

     private void printItem(T t) { 
      System.out.println(t); 
     } 

     private T takeItem() { 
      try { 
       return queue.take(); 
      } catch (InterruptedException e) { 
       Thread.currentThread().interrupt(); 
      } 
      return null; 
     } 
    } 

    public static void main(String[] args) throws InterruptedException { 

     Producer<Integer> producer = new Producer<Integer>(); 
     producer.start(); 
     for (int i = 0; i <20; i++) { 
      producer.put(i); 
      if (i >= 7) 
       Thread.sleep(500); 
     } 
     producer.stop(); 
    } 
} 
+0

没有伴侣,我可以那样做,你列出了什么。泛型需要整理我会承认,但我不认为这是问题所在。另外,我自己编写了有界的缓冲区。 设置是生产者 - > BlockingQueue - > BoundedByffer - > OutBlockingQueue - >消费者。 – Saf