2016-08-04 190 views
1

我正在尝试使用信号量的生产者 - 消费者问题。除了一个地方,该程序对我来说看起来很好。如何在使用Semphores的生产者 - 消费者中消费?

public class ProducerConsumerWithSemaphores 
{ 
    private final ArrayList<Integer> list = new ArrayList<>(5); 
    private final Semaphore semaphoreProducer = new Semaphore(1); 
    private final Semaphore semaphoreConsumer = new Semaphore(0); 

    private void produce() throws InterruptedException 
    { 
     for(int i = 0;i< 5;i++) 
     { 
      semaphoreProducer.acquire(); 
      list.add(i); 
      System.out.println("Produced: " + i); 
      semaphoreConsumer.release(); 
     } 
    } 

    private void consumer() throws InterruptedException 
    { 
     while (!list.isEmpty()) /// This line is where I have the doubt 
     { 
      semaphoreConsumer.acquire(); 
      System.out.println("Consumer: " + list.remove(list.size()-1)); 
      semaphoreProducer.release(); 
      Thread.sleep(100); 
     } 
    } 

    public static void main(String[] args) 
    { 
     final ProducerConsumerWithSemaphores obj = new ProducerConsumerWithSemaphores(); 

     new Thread(new Runnable() 
     { 
      @Override 
      public void run() 
      { 
       try 
       { 
        obj.produce(); 
       } catch (InterruptedException e) 
       { 
        e.printStackTrace(); 
       } 
      } 
     }).start(); 

     new Thread(new Runnable() 
     { 
      @Override 
      public void run() 
      { 
       try 
       { 
        obj.consumer(); 
       } catch (InterruptedException e) 
       { 
        e.printStackTrace(); 
       } 
      } 
     }).start(); 
    } 
} 

在获取信号量之前检查列表是否为空是否可以?这会导致多线程环境中的任何问题吗?

回答

1
private void consumer() throws InterruptedException 
{ 
    while (!list.isEmpty()) /// This line is where I have the doubt 

问题是,如果消费者跑得比生产者快,消费者立即退出,那么你就没有消费者!

正确的示例看起来像, Producer–consumer problem#Using semaphores。我相信你的意图不是使用true作为无限循环,因为你希望生产者/消费者在作业完成后退出。如果这是您的意图,您可以1.设置totalCount来结束循环。 2.或者 boolean标志,当生产者放弃最后一个标志时 putItemIntoBuffer将由生产者设置。该旗帜必须受到保护,以及 buffer(更新:如果有多个生产者/消费者这种方法不起作用)3.模拟EOF(从producer - consume; how does the consumer stop?采取的想法)

会不会做多线程环境中的任何问题?

您的关键部分(您的list)未受到保护。通常我们使用3个信号量。第三个用作互斥体来保护缓冲区。

要停止生产者/消费者,
示例代码方法1:

public class Test3 { 

    private Semaphore mutex = new Semaphore(1); 
    private Semaphore fillCount = new Semaphore(0); 
    private Semaphore emptyCount = new Semaphore(3); 

    private final List<Integer> list = new ArrayList<>(); 

    class Producer implements Runnable { 

    private final int totalTasks; 

    Producer(int totalTasks) { 
     this.totalTasks = totalTasks; 
    } 

    @Override 
    public void run() { 
     try { 
     for (int i = 0; i < totalTasks; i++) { 
      emptyCount.acquire(); 
      mutex.acquire(); 
      list.add(i); 
      System.out.println("Produced: " + i); 
      mutex.release(); 
      fillCount.release(); 
     } 
     } catch (InterruptedException e) { 
     e.printStackTrace(); 
     } 
    } 
    } 

    class Consumer implements Runnable { 
    private final int totalTasks; 

    Consumer(int totalTasks) { 
     this.totalTasks = totalTasks; 
    } 

    @Override 
    public void run() { 
     try { 
     for (int i = 0; i < totalTasks; i++) { 
      fillCount.acquire(); 
      mutex.acquire(); 
      int item = list.remove(list.size() - 1); 
      System.out.println("Consumed: " + item); 
      mutex.release(); 
      emptyCount.release(); 
     } 
     } catch (InterruptedException e) { 
     e.printStackTrace(); 
     } 
    } 
    } 

    public void runTest() { 
    int numProducer = 3; 
    int tasksPerProducer = 10; 
    int numConsumer = 6; 
    int tasksPerConsumer = 5; 

    for (int i = 0; i < numProducer; i++) { 
     new Thread(new Producer(tasksPerProducer)).start(); 
    } 
    for (int i = 0; i < numConsumer; i++) { 
     new Thread(new Consumer(tasksPerConsumer)).start(); 
    } 
    } 

    public static void main(String[] args) throws IOException { 
    Test3 t = new Test3(); 
    t.runTest(); 
    } 
} 

示例代码方法3:

public class Test4 { 

    private Semaphore mutex = new Semaphore(1); 
    private Semaphore fillCount = new Semaphore(0); 
    private Semaphore emptyCount = new Semaphore(3); 

    private Integer EOF = Integer.MAX_VALUE; 

    private final Queue<Integer> list = new LinkedList<>(); // need to put/get data in FIFO 

    class Producer implements Runnable { 

    private final int totalTasks; 

    Producer(int totalTasks) { 
     this.totalTasks = totalTasks; 
    } 

    @Override 
    public void run() { 
     try { 
     for (int i = 0; i < totalTasks + 1; i++) { 
      emptyCount.acquire(); 
      mutex.acquire(); 
      if (i == totalTasks) { 
      list.offer(EOF); 
      } else { 
      // add a valid value 
      list.offer(i); 
      System.out.println("Produced: " + i); 
      } 
      mutex.release(); 
      fillCount.release(); 
     } 
     } catch (InterruptedException e) { 
     e.printStackTrace(); 
     } 
    } 
    } 

    class Consumer implements Runnable { 

    @Override 
    public void run() { 
     try { 
     boolean finished = false; 
     while (!finished) { 
      fillCount.acquire(); 
      mutex.acquire(); 
      int item = list.poll(); 
      if (EOF.equals(item)) { 
      // do not consume this item because it means EOF 
      finished = true; 
      } else { 
      // it's a valid value, consume it. 
      System.out.println("Consumed: " + item); 
      } 
      mutex.release(); 
      emptyCount.release(); 
     } 
     } catch (InterruptedException e) { 
     e.printStackTrace(); 
     } 
    } 
    } 

    public void runTest() { 
    int numProducer = 3; 
    int tasksPerProducer = 10; 

    for (int i = 0; i < numProducer; i++) { 
     new Thread(new Producer(tasksPerProducer)).start(); 
    } 

    int numConsumer = numProducer; // producers will put N EOFs to kill N consumers. 
    for (int i = 0; i < numConsumer; i++) { 
     new Thread(new Consumer()).start(); 
    } 
    } 

    public static void main(String[] args) throws IOException { 
    Test4 t = new Test4(); 
    t.runTest(); 
    } 
} 
+0

谢谢@waltersu。那是我正在寻找的答案。你有什么链接可以看到这个实现吗? – Kode

0

而不是使用两个信号灯你为什么不使用单个信号以便在线程之间进行同步link

另外你可以它是线程安全的ArrayBlockingQueue,以正确显示生产者消费者问题。

+0

谢谢Sohil。我知道如何使用ArrayBlockingQueue来解决这个问题,但我正在以更细化的方式尝试它 – Kode