2010-02-25 68 views
46

我想创建某种Producer/Consumer线程应用程序。但我不确定在两者之间实现队列的最佳方式。使用队列的生产者/消费者线程

所以我有两个想法(这可能是完全错误的)。我想知道哪个更好,如果他们都吸了,那么实施队列的最好方法是什么。这主要是我在这些例子中执行的队列,我很关心。我正在扩展一个Queue类,它是一个内部类,并且是线程安全的。下面是两个例子,每个例子有4个类。

主要类 -

public class SomeApp 
{ 
    private Consumer consumer; 
    private Producer producer; 

    public static void main (String args[]) 
    { 
     consumer = new Consumer(); 
     producer = new Producer(); 
    } 
} 

消费者接收机类

public class Consumer implements Runnable 
{ 
    public Consumer() 
    { 
     Thread consumer = new Thread(this); 
     consumer.start(); 
    } 

    public void run() 
    { 
     while(true) 
     { 
      //get an object off the queue 
      Object object = QueueHandler.dequeue(); 
      //do some stuff with the object 
     } 
    } 
} 

生产者接收机类

public class Producer implements Runnable 
{ 
    public Producer() 
    { 
     Thread producer = new Thread(this); 
     producer.start(); 
    } 

    public void run() 
    { 
     while(true) 
     { 
      //add to the queue some sort of unique object 
      QueueHandler.enqueue(new Object()); 
     } 
    } 
} 

队列接收机类

public class QueueHandler 
{ 
    //This Queue class is a thread safe (written in house) class 
    public static Queue<Object> readQ = new Queue<Object>(100); 

    public static void enqueue(Object object) 
    { 
     //do some stuff 
     readQ.add(object); 
    } 

    public static Object dequeue() 
    { 
     //do some stuff 
     return readQ.get(); 
    } 
} 

OR

主要类 -

public class SomeApp 
{ 
    Queue<Object> readQ; 
    private Consumer consumer; 
    private Producer producer; 

    public static void main (String args[]) 
    { 
     readQ = new Queue<Object>(100); 
     consumer = new Consumer(readQ); 
     producer = new Producer(readQ); 
    } 
} 

消费者接收机类

public class Consumer implements Runnable 
{ 
    Queue<Object> queue; 

    public Consumer(Queue<Object> readQ) 
    { 
     queue = readQ; 
     Thread consumer = new Thread(this); 
     consumer.start(); 
    } 

    public void run() 
    { 
     while(true) 
     { 
      //get an object off the queue 
      Object object = queue.dequeue(); 
      //do some stuff with the object 
     } 
    } 
} 

生产者接收机类

public class Producer implements Runnable 
{ 
    Queue<Object> queue; 

    public Producer(Queue<Object> readQ) 
    { 
     queue = readQ; 
     Thread producer = new Thread(this); 
     producer.start(); 
    } 

    public void run() 
    { 

     while(true) 
     { 
      //add to the queue some sort of unique object 
      queue.enqueue(new Object()); 
     } 
    } 
} 

队列接收机类

//the extended Queue class is a thread safe (written in house) class 
public class QueueHandler extends Queue<Object> 
{  
    public QueueHandler(int size) 
    { 
     super(size); //All I'm thinking about now is McDonalds. 
    } 

    public void enqueue(Object object) 
    { 
     //do some stuff 
     readQ.add(); 
    } 

    public Object dequeue() 
    { 
     //do some stuff 
     return readQ.get(); 
    } 
} 

去吧!

+0

生产者enqueue和消费者出队,顺便说一句。反之亦然.. – 2010-02-25 08:22:25

+1

哦,不要从构造函数中启动线程!该线程可以观察处于不一致状态的对象。有关详细信息,请参阅“Java并发实践”。 – 2010-02-25 08:24:12

+0

谢谢Zwei,入队的事情是我没有专注。从构造函数的东西开始线程我应该运行一个intiliazation方法,并在那里启动它,或者它应该从主方法类启动它? – Gareth 2010-02-25 09:05:29

回答

69

Java 5+拥有您需要的所有工具。你会想要:

  1. 把你所有的生产者在一个ExecutorService;
  2. 把所有的消费者放在另一个ExecutorService;
  3. 如有必要,使用BlockingQueue在两者之间进行通信。

我说“如果有必要”(3),因为从我的经验来看,这是一个不必要的步骤。你所做的就是向消费者执行者服务提交新任务。所以:

final ExecutorService producers = Executors.newFixedThreadPool(100); 
final ExecutorService consumers = Executors.newFixedThreadPool(100); 
while (/* has more work */) { 
    producers.submit(...); 
} 
producers.shutdown(); 
producers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); 
consumers.shutdown(); 
consumers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); 

所以producers直接提交consumers

+2

Cletus对钱的更多信息,以帮助澄清“从哪里开始” http://java.sun.com/docs/books/tutorial/essential/concurrency/ – edwardsmatt 2010-02-25 09:58:25

+0

“所以生产者直接提交给消费者” - 是否可以平行调用consumers.submit(...)是否安全,还是应该同步呢? – Gevorg 2013-09-23 21:27:21

+0

如果您共享BlockingQueue,您是否可以为生产者和消费者使用1个执行程序? – devo 2013-10-31 20:33:09

9

您正在重新发明轮子。

如果您需要持久性和其他企业功能使用JMS(我建议ActiveMq)。

如果您需要快速内存队列,请使用java的Queue的其中一个阻碍。

如果您需要支持java 1.4或更早版本,请使用Doug Lea的优秀concurrent包。

+4

你仍然可以被要求在求职面试时实施生产者消费者:) – 2012-11-06 16:43:21

+0

我的确发现了java.util.concurrent中的实用程序有用,但我觉得很难称它为“优秀”,但它仍然迫使我通过两个参数只是为了指定超时。它是否会杀死Doug制作一个名为Duration的课程? – Trejkaz 2014-03-03 22:24:03

17

OK,正如其他人注意,做的最好的事情就是用java.util.concurrent包。我强烈推荐“实践中的Java并发”。这是一本很好的书,涵盖了你需要知道的几乎所有内容。

至于你的具体实现,正如我在评论中指出,不要从构造器线程 - 它可以是不安全的。

留下之外,第二实施似乎更好。你不想把队列放在静态字段中。你可能只是失去灵活性而已。

如果你想与自己的实现先走(用于学习目的,我猜?),至少提供一个start()方法。你应该构造对象(你可以实例化Thread对象),然后调用start()来启动线程。

编辑:ExecutorService有自己的队列,这可能会造成混淆。这里的东西,让你开始。

public class Main { 
    public static void main(String[] args) { 
     //The numbers are just silly tune parameters. Refer to the API. 
     //The important thing is, we are passing a bounded queue. 
     ExecutorService consumer = new ThreadPoolExecutor(1,4,30,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(100)); 

     //No need to bound the queue for this executor. 
     //Use utility method instead of the complicated Constructor. 
     ExecutorService producer = Executors.newSingleThreadExecutor(); 

     Runnable produce = new Produce(consumer); 
     producer.submit(produce); 
    } 
} 

class Produce implements Runnable { 
    private final ExecutorService consumer; 

    public Produce(ExecutorService consumer) { 
     this.consumer = consumer; 
    } 

    @Override 
    public void run() { 
     Pancake cake = Pan.cook(); 
     Runnable consume = new Consume(cake); 
     consumer.submit(consume); 
    } 
} 

class Consume implements Runnable { 
    private final Pancake cake; 

    public Consume(Pancake cake){ 
     this.cake = cake; 
    } 

    @Override 
    public void run() { 
     cake.eat(); 
    } 
} 

进一步编辑: 对于生产者,而不是while(true),你可以这样做:

@Override 
public void run(){ 
    while(!Thread.currentThread().isInterrupted()){ 
     //do stuff 
    } 
} 

这种方式可以通过调用.shutdownNow()关闭执行人。如果你使用while(true),它不会关闭。

还要注意的是Producer仍易受RuntimeExceptions(即一个RuntimeException将暂停处理)

+0

所以我应该给消费者和生产者添加一个start()方法?你是说我应该在我的主要方法中加入类似的东西吗? consumer = new Consumer(); consumer.start(readQ); 还是这个? consumer = new Comsumer(readQ); consumer.start(); – Gareth 2010-02-25 09:35:21

+1

你通常会做新的Comsumer(readQ); consumer.start();.在你的情况下,最好声明队列私有final,如果你这样做,你需要在构造函数中设置队列。如果这是生产代码,我强烈建议您使用cletus的答案。如果您绝对需要使用您的内部队列,那么您应该使用ExecutorService executor = Executors.newSingleThreadExecutor()而不是原始线程。除此之外,这将保护您免受RuntimeException停止系统。 – 2010-02-25 18:41:24

+0

谢谢。很有帮助。我已经和BlockingQueue一样,像在内部队列中建议的cletus一样。仍然试图让我的头在ExecutorService类中,但是当我这样做时,我一定会使用它。谢谢你的帮助。 – Gareth 2010-02-26 08:32:23

1
  1. 已同步put和get方法的Java代码“的BlockingQueue”。
  2. Java代码“Producer”,生成器线程来生成数据。
  3. Java代码“消费者”,消费者线程消费生成的数据。
  4. Java代码“ProducerConsumer_Main”,主要功能是启动生产者和消费者线程。

BlockingQueue.java

public class BlockingQueue 
{ 
    int item; 
    boolean available = false; 

    public synchronized void put(int value) 
    { 
     while (available == true) 
     { 
      try 
      { 
       wait(); 
      } catch (InterruptedException e) { 
      } 
     } 

     item = value; 
     available = true; 
     notifyAll(); 
    } 

    public synchronized int get() 
    { 
     while(available == false) 
     { 
      try 
      { 
       wait(); 
      } 
      catch(InterruptedException e){ 
      } 
     } 

     available = false; 
     notifyAll(); 
     return item; 
    } 
} 

Consumer.java

package com.sukanya.producer_Consumer; 

public class Consumer extends Thread 
{ 
    blockingQueue queue; 
    private int number; 
    Consumer(BlockingQueue queue,int number) 
    { 
     this.queue = queue; 
     this.number = number; 
    } 

    public void run() 
    { 
     int value = 0; 

     for (int i = 0; i < 10; i++) 
     { 
      value = queue.get(); 
      System.out.println("Consumer #" + this.number+ " got: " + value); 
     } 
    } 
} 

ProducerConsumer_Main.java

package com.sukanya.producer_Consumer; 

public class ProducerConsumer_Main 
{ 
    public static void main(String args[]) 
    { 
     BlockingQueue queue = new BlockingQueue(); 
     Producer producer1 = new Producer(queue,1); 
     Consumer consumer1 = new Consumer(queue,1); 
     producer1.start(); 
     consumer1.start(); 
    } 
} 
+3

没有解释的代码转储很少有帮助。 – Chris 2014-10-15 17:49:40

6

我有扩展Cletus提出了对工作代码示例的回答。

  1. 其中一个ExecutorService(pes)接受Producer任务。
  2. 一个ExecutorService(ces)接受Consumer任务。
  3. 均为ProducerConsumerBlockingQueue
  4. 多个Producer任务会生成不同的数字。
  5. 任何的Consumer任务可以消耗由Producer

代码生成的数字:

import java.util.concurrent.*; 

public class ProducerConsumerWithES { 
    public static void main(String args[]){ 
     BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>(); 

     ExecutorService pes = Executors.newFixedThreadPool(2); 
     ExecutorService ces = Executors.newFixedThreadPool(2); 

     pes.submit(new Producer(sharedQueue,1)); 
     pes.submit(new Producer(sharedQueue,2)); 
     ces.submit(new Consumer(sharedQueue,1)); 
     ces.submit(new Consumer(sharedQueue,2)); 
     // shutdown should happen somewhere along with awaitTermination 
     /* https://stackoverflow.com/questions/36644043/how-to-properly-shutdown-java-executorservice/36644320#36644320 */ 
     pes.shutdown(); 
     ces.shutdown(); 
    } 
} 
class Producer implements Runnable { 
    private final BlockingQueue<Integer> sharedQueue; 
    private int threadNo; 
    public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) { 
     this.threadNo = threadNo; 
     this.sharedQueue = sharedQueue; 
    } 
    @Override 
    public void run() { 
     for(int i=1; i<= 5; i++){ 
      try { 
       int number = i+(10*threadNo); 
       System.out.println("Produced:" + number + ":by thread:"+ threadNo); 
       sharedQueue.put(number); 
      } catch (Exception err) { 
       err.printStackTrace(); 
      } 
     } 
    } 
} 

class Consumer implements Runnable{ 
    private final BlockingQueue<Integer> sharedQueue; 
    private int threadNo; 
    public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) { 
     this.sharedQueue = sharedQueue; 
     this.threadNo = threadNo; 
    } 
    @Override 
    public void run() { 
     while(true){ 
      try { 
       int num = sharedQueue.take(); 
       System.out.println("Consumed: "+ num + ":by thread:"+threadNo); 
      } catch (Exception err) { 
       err.printStackTrace(); 
      } 
     } 
    } 
} 

输出:

Produced:11:by thread:1 
Produced:21:by thread:2 
Produced:22:by thread:2 
Consumed: 11:by thread:1 
Produced:12:by thread:1 
Consumed: 22:by thread:1 
Consumed: 21:by thread:2 
Produced:23:by thread:2 
Consumed: 12:by thread:1 
Produced:13:by thread:1 
Consumed: 23:by thread:2 
Produced:24:by thread:2 
Consumed: 13:by thread:1 
Produced:14:by thread:1 
Consumed: 24:by thread:2 
Produced:25:by thread:2 
Consumed: 14:by thread:1 
Produced:15:by thread:1 
Consumed: 25:by thread:2 
Consumed: 15:by thread:1 

注。如果您不需要多个生产者和消费者,请保留单个生产者和消费者。我已经添加了多个生产者和消费者,以在多个生产者和消费者中展示BlockingQueue的功能。

+0

当多个生产者和消费者在场时,这并不关心竞争状况。每个看到容量为0并尝试添加。使用单生产者和单消费者无需在BlockingQueue上同步,如果它不止一个,仍需要同步。 – Cleonjoys 2017-09-16 17:30:41

+0

你可以做一件事,注释掉消费者,然后为BlockingQueue设置固定尺寸,你会看到你自己。我用新的LinkedBlockingQueue (2)尝试了您的代码; 然后被输出为如下: 生产:11:通过螺纹:1 生产:21:通过螺纹:2 生产:22:通过螺纹:2 生产:12:通过螺纹:1 如何更当Queue的设置容量为2时插入值 – Cleonjoys 2017-09-17 18:01:28

+0

这就是BlockingQueue的性质。除非有容量可用,否则将被阻止。我正在使用无界阻塞队列,以上情况不会出现。即使它是由于BlockingQueue有界而产生的,它也是Java实现它的方式。请查看https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/LinkedBlockingQueue.html#put-E-。我的帖子中的代码段没有任何问题。 – 2017-09-17 18:19:48

1

这是一个非常简单的代码。

import java.util.*; 

// @author : rootTraveller, June 2017 

class ProducerConsumer { 
    public static void main(String[] args) throws Exception { 
     Queue<Integer> queue = new LinkedList<>(); 
     Integer buffer = new Integer(10); //Important buffer or queue size, change as per need. 

     Producer producerThread = new Producer(queue, buffer, "PRODUCER"); 
     Consumer consumerThread = new Consumer(queue, buffer, "CONSUMER"); 

     producerThread.start(); 
     consumerThread.start(); 
    } 
} 

class Producer extends Thread { 
    private Queue<Integer> queue; 
    private int queueSize ; 

    public Producer (Queue<Integer> queueIn, int queueSizeIn, String ThreadName){ 
     super(ThreadName); 
     this.queue = queueIn; 
     this.queueSize = queueSizeIn; 
    } 

    public void run() { 
     while(true){ 
      synchronized (queue) { 
       while(queue.size() == queueSize){ 
        System.out.println(Thread.currentThread().getName() + " FULL   : waiting...\n"); 
        try{ 
         queue.wait(); //Important 
        } catch (Exception ex) { 
         ex.printStackTrace(); 
        } 
       } 

       //queue empty then produce one, add and notify 
       int randomInt = new Random().nextInt(); 
       System.out.println(Thread.currentThread().getName() + " producing... : " + randomInt); 
       queue.add(randomInt); 
       queue.notifyAll(); //Important 
      } //synchronized ends here : NOTE 
     } 
    } 
} 

class Consumer extends Thread { 
    private Queue<Integer> queue; 
    private int queueSize; 

    public Consumer(Queue<Integer> queueIn, int queueSizeIn, String ThreadName){ 
     super (ThreadName); 
     this.queue = queueIn; 
     this.queueSize = queueSizeIn; 
    } 

    public void run() { 
     while(true){ 
      synchronized (queue) { 
       while(queue.isEmpty()){ 
        System.out.println(Thread.currentThread().getName() + " Empty  : waiting...\n"); 
        try { 
         queue.wait(); //Important 
        } catch (Exception ex) { 
         ex.printStackTrace(); 
        } 
       } 

       //queue empty then consume one and notify 
       System.out.println(Thread.currentThread().getName() + " consuming... : " + queue.remove()); 
       queue.notifyAll(); 
      } //synchronized ends here : NOTE 
     } 
    } 
}