2011-11-27 59 views
1

我需要编写一个类似于生产者 - 消费者的问题,它必须使用信号量。我尝试了几个解决方案,但都没有工作。首先,我在维基百科尝试了一个解决方案,但没有奏效。我当前的代码是类似的东西:如何使用信号量解决生产者 - 消费者?

消费者的方式运行:

public void run() { 
    int i=0; 
    DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); 
    String s = new String(); 
    while (1!=2){ 
     Date datainicio = new Date(); 
     String inicio=dateFormat.format(datainicio); 
     try { 
      Thread.sleep(1000);///10000 
     } catch (InterruptedException e) { 
      System.out.println("Excecao InterruptedException lancada."); 
     } 
     //this.encheBuffer.down(); 
     this.mutex.down(); 
     // RC 
     i=0; 
     while (i<buffer.length) { 
      if (buffer[i] == null) { 
       i++; 
      } else { 
       break; 
      } 
     } 
     if (i<buffer.length) { 
      QuantidadeBuffer.quantidade--; 
      Date datafim = new Date(); 
      String fim=dateFormat.format(datafim); 
      int identificador; 
      identificador=buffer[i].getIdentificador()[0]; 
      s="Consumidor Thread: "+Thread.currentThread()+" Pedido: "+identificador+" Inicio: "+inicio+" Fim: "+fim+" posicao "+i; 
      //System.out.println("Consumidor Thread: "+Thread.currentThread()+" Pedido: "+identificador+" Inicio: "+inicio+" Fim: "+fim+" posicao "+i); 
      buffer[i]= null; 
     } 
     // RC 
     this.mutex.up(); 
     //this.esvaziaBuffer.up(); 
     System.out.println(s); 
    //   lock.up(); 
    } 
} 

生产者的方法运行:

public void run() { 
    DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); 
    int i=0; 
    while (1!=2){ 
     Date datainicio = new Date(); 
     String inicio=dateFormat.format(datainicio); 
     // Produz Item 
     try { 
      Thread.sleep(500);//50000 
     } catch (InterruptedException e) { 
      System.out.println("Excecao InterruptedException lancada."); 
     } 
     //this.esvaziaBuffer.down(); 
     this.mutex.down(); 
     // RC 
     i=0; 
     while (i<buffer.length) { 
      if (buffer[i]!=null) { 
       i++; 
      } else { 
       break; 
      } 
     } 
     if (i<buffer.length) { 
      int identificador[]=new int[Pedido.getTamanho_identificador()]; 
      identificador[0]=i; 
      buffer[i]=new Pedido(); 
      Produtor.buffer[i].setIdentificador(identificador); 
      Produtor.buffer[i].setTexto("pacote de dados"); 
      QuantidadeBuffer.quantidade++; 
      Date datafim = new Date(); 
      String fim=dateFormat.format(datafim); 
      System.out.println("Produtor Thread: "+Thread.currentThread()+" Pedido: "+identificador[0]+" Inicio: "+inicio+" Fim: "+fim+" posicao "+i); 
      i++; 
     } 
     // RC 
     this.mutex.up(); 
     //this.encheBuffer.up(); 
    } 
    //this.encheBuffer.up(); 
} 

在上面的代码发生消费者线程读取一个位置,然后,另一个线程读取相同的位置,没有生产者填充该位置,如下所示:

Consumidor Thread: Thread[Thread-17,5,main] Pedido: 1 Inicio: 2011/11/27 17:23:33 Fim: 2011/11/27 17:23:34 posicao 1 
Consumidor Thread: Thread[Thread-19,5,main] Pedido: 1 Inicio: 2011/11/27 17:23:33 Fim: 2011/11/27 17:23:34 posicao 1 
+2

你尝试实施这一使用'wait'和'notify'在Java方法?它会让你的生活更容易 – PhD

+0

是的,我在信号类中做过。 – Victor

+0

@Nupul - 我发现信号量队列更容易理解 - 在锁内没有奇怪的'等待'(是的,我知道它释放了锁但它看起来很奇怪),没有反虚假唤醒循环。制作人推动然后发出信号,消费者等待然后弹出。根本不需要查看队列计数。 –

回答

7

你似乎在使用互斥锁而不是信号量?

在使用互斥锁时,您只有二进制同步 - 锁定和解锁一个资源。 Sempahores有一个价值,你可以发信号或获得。

您试图锁定/解锁整个缓冲区,但这是错误的方法,因为正如您所看到的,无论是生产者还是消费者锁定,以及读者锁定它时,生产者无法填充缓冲区(因为它必须先锁定)。

您应该改为创建一个Sempahore,然后当生产者写入一个数据包或数据块时,它可以发出信号灯信号。然后消费者可以尝试获取信号量,以便他们将等待,直到生产者已经发信号通知了一个数据包已经被写入。在发送一个书面数据包的信号时,其中一个使用者将被唤醒,并且它会知道它可以读取一个数据包。它可以读取数据包,然后返回尝试获取信号量。如果那时生产者已经写了另一个数据包,它就会再次发出信号,然后任何一个使用者都会继续读取另一个数据包。等等......

例如:

(监制) - 编写一个数据包 - Semaphore.release(1)

(消费者XN) - Semaphore.acquire(1) - 读一个包

如果你有多个消费者则消费者(不是生产者)应该锁定读取数据包时的缓冲(但不 acquirin时g信号量)来防止竞争条件。在下面的例子中,生产者也锁定列表,因为所有的东西都在同一个JVM上。

import java.util.LinkedList; 
import java.util.concurrent.Semaphore; 

public class Semaphores { 

    static Object LOCK = new Object(); 

    static LinkedList list = new LinkedList(); 
    static Semaphore sem = new Semaphore(0); 
    static Semaphore mutex = new Semaphore(1); 

    static class Consumer extends Thread { 
     String name; 
     public Consumer(String name) { 
      this.name = name; 
     } 
     public void run() { 
      try { 

       while (true) { 
        sem.acquire(1); 
        mutex.acquire(); 
        System.out.println("Consumer \""+name+"\" read: "+list.removeFirst()); 
        mutex.release(); 
       } 
      } catch (Exception x) { 
       x.printStackTrace(); 
      } 
     } 
    } 

    static class Producer extends Thread { 
     public void run() { 
      try { 

       int N = 0; 

       while (true) { 
        mutex.acquire(); 
        list.add(new Integer(N++)); 
        mutex.release(); 
        sem.release(1); 
        Thread.sleep(500); 
       } 
      } catch (Exception x) { 
       x.printStackTrace(); 
      } 
     } 
    } 

    public static void main(String [] args) { 
     new Producer().start(); 
     new Consumer("Alice").start(); 
     new Consumer("Bob").start(); 
    } 
} 
+0

感谢您的帮助。你能展示一些代码示例吗? – Victor

+1

当然,我的答案添加了示例代码。输出将是消费者将打印出来,当他们得到的数据包(曾经由500毫秒制造者确定 - 请注意,有没有睡在消费者)。 – AntonyM

+1

上面的例子使用链表来让这个例子清楚,但当然也可以使用有限数组,然后使用您在代码中它的替代品使用归零方法。 – AntonyM

0
import java.util.ArrayList; 
import java.util.List; 
import java.util.concurrent.Semaphore; 
import java.util.logging.Level; 
import java.util.logging.Logger; 

/* 
* To change this license header, choose License Headers in Project Properties. 
* To change this template file, choose Tools | Templates 
* and open the template in the editor. 
*/ 
/** 
* 
* @author sakshi 
*/ 
public class SemaphoreDemo { 

    static Semaphore producer = new Semaphore(1); 
    static Semaphore consumer = new Semaphore(0); 
    static List<Integer> list = new ArrayList<Integer>(); 

    static class Producer extends Thread { 

     List<Integer> list; 

     public Producer(List<Integer> list) { 
      this.list = list; 
     } 

     public void run() { 
      for (int i = 0; i < 10; i++) { 
       try { 
        producer.acquire(); 

       } catch (InterruptedException ex) { 
        Logger.getLogger(SemaphoreDemo.class.getName()).log(Level.SEVERE, null, ex); 
       } 
       System.out.println("produce=" + i); 

       list.add(i); 
       consumer.release(); 

      } 
     } 
    } 

    static class Consumer extends Thread { 

     List<Integer> list; 

     public Consumer(List<Integer> list) { 
      this.list = list; 
     } 

     public void run() { 
      for (int i = 0; i < 10; i++) { 
       try { 
        consumer.acquire(); 
       } catch (InterruptedException ex) { 
        Logger.getLogger(SemaphoreDemo.class.getName()).log(Level.SEVERE, null, ex); 
       } 

       System.out.println("consume=" + list.get(i)); 

       producer.release(); 
      } 
     } 
    } 

    public static void main(String[] args) { 
     Producer produce = new Producer(list); 

     Consumer consume = new Consumer(list); 

     produce.start(); 
     consume.start(); 
    } 
} 


output: 

produce=0 
consume=0 
produce=1 
consume=1 
produce=2 
consume=2 
produce=3 
consume=3 
produce=4 
consume=4 
produce=5 
consume=5 
produce=6 
consume=6 
produce=7 
consume=7 
produce=8 
consume=8 
produce=9 
consume=9 
0
import java.util.concurrent.Semaphore; 


public class ConsumerProducer{ 

    public static void main(String[] args) { 

      Semaphore semaphoreProducer=new Semaphore(1); 
      Semaphore semaphoreConsumer=new Semaphore(0); 
      System.out.println("semaphoreProducer permit=1 | semaphoreConsumer permit=0"); 

      new Producer(semaphoreProducer,semaphoreConsumer).start(); 
      new Consumer(semaphoreConsumer,semaphoreProducer).start(); 

    } 


/** 
* Producer Class. 
*/ 
static class Producer extends Thread{ 

    Semaphore semaphoreProducer; 
    Semaphore semaphoreConsumer; 


    public Producer(Semaphore semaphoreProducer,Semaphore semaphoreConsumer) { 
      this.semaphoreProducer=semaphoreProducer; 
      this.semaphoreConsumer=semaphoreConsumer; 
    } 

    public void run() { 
      for(;;){ 
        try { 
         semaphoreProducer.acquire(); 
         System.out.println("Produced : "+Thread.currentThread().getName()); 
         semaphoreConsumer.release(); 

        } catch (InterruptedException e) { 
         e.printStackTrace(); 
        } 
      }   
    } 
} 

/** 
* Consumer Class. 
*/ 
static class Consumer extends Thread{ 

    Semaphore semaphoreConsumer; 
    Semaphore semaphoreProducer; 

    public Consumer(Semaphore semaphoreConsumer,Semaphore semaphoreProducer) { 
      this.semaphoreConsumer=semaphoreConsumer; 
      this.semaphoreProducer=semaphoreProducer; 
    } 

    public void run() { 

      for(;;){ 
        try { 
         semaphoreConsumer.acquire(); 
         System.out.println("Consumed : "+Thread.currentThread().getName()); 
         semaphoreProducer.release(); 
        } catch (InterruptedException e) { 
         e.printStackTrace(); 
        } 
      } 
    } 

} 
}