2017-09-03 69 views
0

我正在为一个生产者和多个消费者运行代码。我想优先执行消费者线程。即如果我有consThread1,consThread2,consThread3。我的问题是如何限制consThread3 consThread1和consThread2如何维护消费者线程的执行顺序

Producer.java

import java.util.concurrent.BlockingQueue; 
import org.json.simple.JSONObject; 

public class Producer implements Runnable { 
    private final BlockingQueue<Message> sharedQueue; 

    @SuppressWarnings("unchecked") 
    public Producer(BlockingQueue<Message> sharedQueue){ 
     this.sharedQueue=sharedQueue; 
    } 

    @Override 
    public void run() { 
     try{ 
      for(int i=0;i<4;i++) { 
       Message msg=new Message(""+i); 
       System.out.println("Producer Produced: " +msg.getMessage()); 
       sharedQueue.put(msg); 
       Thread.sleep(400); 
      } 
      sharedQueue.put(new Message("exit")); // end of producing 
      System.out.println("-------Producer STOPPED------"); 
     } 
     catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 
} 

Consumer.java之前消耗

import java.util.concurrent.BlockingQueue; 
import java.util.concurrent.TimeUnit; 
import org.json.simple.JSONObject; 

public class Consumer implements Runnable{ 

    private final BlockingQueue<Message> sharedQueue; 
    private String threadId; 

    public Consumer(BlockingQueue<Message> sharedQueue) {   
     this.sharedQueue=sharedQueue;   
    } 

    @SuppressWarnings("unchecked") 
    @Override 
    public void run() { 
     threadId = "Consumer-" + Thread.currentThread().getName(); 
     try { 
      Message msg; 
      while (true){ 
       msg=sharedQueue.poll(5,TimeUnit.SECONDS); 
       if(msg.getMessage()=="exit" || msg.getMessage()==null){ 
        sharedQueue.put(new Message("exit")); 
        break; 
       } 
       System.out.println(threadId + ": Consuming Message " + msg.getMessage()); 
       Thread.sleep(1000); 
      } 
      System.out.println(threadId + " STOPPED Consuming "); 
     } 
     catch (InterruptedException ie) { 
      ie.printStackTrace(); 
     } 
    } 
} 

测试程序ProducerConsumer.java

import java.util.concurrent.BlockingQueue; 
import java.util.concurrent.LinkedBlockingQueue; 
import org.json.simple.JSONObject; 

public class ProducerConsumer { 

    public static void main(String[] args) throws InterruptedException { 
     BlockingQueue<Message> sharedQueue = new LinkedBlockingQueue<>(10); 

     //Creating Producer and Consumer Thread 
     Thread prodThread = new Thread(new Producer(sharedQueue)); 
     Thread consThread1 = new Thread(new Consumer(sharedQueue)); 
     Thread consThread2 = new Thread(new Consumer(sharedQueue)); 
     Thread consThread3 = new Thread(new Consumer(sharedQueue)); 
     //Starting producer and Consumer thread 
     System.out.println("Producer and consumer threads started \n\n\n---------------------------------------"); 

     prodThread.start(); 
     consThread1.start(); 
     consThread2.start(); 
     consThread1.join(); 
     consThread2.join(); 
     consThread3.start(); 
    } 
} 
+1

你为什么要这样? – Kayaman

+7

因此,您要创建三个消费者线程,以便能够同时使用3个项目,但实际上您希望它们按顺序使用,而不是同时使用?为什么要启动3个线程呢?只需使用单个消费者线程,并且消费将是连续的。 –

+0

http://docs.oracle.com/javase/6/docs/api/java/lang/Thread.html#setPriority(int)但要注意,因为JB指出,为什么使用三个线程呢? – nullpointer

回答

-1

如果你想一个接一个地执行,为什么你使用多线程呢?你应该重构一个线程。

但是,如果你想跳过重构,你可以把消耗线程放到一个固定的线程池中。在线程池中,可以设置活动线程的最大数量,因此您可以将最大值设置为1,并且线程池将逐个执行线程。

另一种选择是创建一个循环障碍,其中障碍动作是您的第三个线程(它将在其他线程之后被调用)。您可以通过循环障碍执行前两个线程。屏障可以统计整理线程,并在达到阈值时执行第三个线程。这应该符合你希望第三个消费者线程等待事件消耗的目标。