2011-10-27 127 views
1

我有一个情况,我写了一个简单的生产者消费者模型,用于从蓝牙读取数据块,然后每写入10k字节。我使用了一个标准的P-C模型,使用Vector作为我的消息持有者。那么,如何改变这种情况,以便多个Thread消费者可以阅读相同的消息,我认为这个术语会是Multicaster?我实际上是在Android手机上使用它,所以JMS可能不是一种选择。单生产者单消费者现在我需要多个消费者

static final int MAXQUEUE = 50000; 
private Vector<byte[]> messages = new Vector<byte[]>(); 

/** 
* Put the message in the queue for the Consumer Thread 
*/ 
private synchronized void putMessage(byte[] send) throws InterruptedException { 

    while (messages.size() == MAXQUEUE) 
     wait(); 
    messages.addElement(send); 
    notify(); 
} 


/** 
* This method is called by the consumer to see if any messages in the queue 
*/ 
public synchronized byte[] getMessage()throws InterruptedException { 
    notify(); 
    while (messages.size() == 0 && !Thread.interrupted()) { 
     wait(1); 
    } 
    byte[] message = messages.firstElement(); 
    messages.removeElement(message); 
    return message; 
} 

我从奥赖利书引用代码Message Parser部分

回答

0

这就是我在挖掘代码和修改现有示例时作为示例想到的。

package test.messaging; 

import java.util.ArrayList; 
import java.util.concurrent.LinkedBlockingQueue; 

public class TestProducerConsumers { 

    static Broker broker; 

    public TestProducerConsumers(int maxSize) { 
     broker = new Broker(maxSize); 
     Producer p = new Producer(); 
     Consumer c1 = new Consumer("One"); 
     broker.consumers.add(c1); 
     c1.start(); 

     Consumer c2 = new Consumer("Two"); 
     broker.consumers.add(c2); 
     c2.start(); 

     p.start(); 
    } 

    // Test Producer, use your own message producer on a thread to call up 
    // broker.insert() possibly passing it the message instead. 
    class Producer extends Thread { 

     @Override 
     public void run() { 
      while (true) { 
       try { 
        broker.insert(); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
      } 
     } 
    } 

    class Consumer extends Thread { 
     String myName; 
     LinkedBlockingQueue<String> queue; 

     Consumer(String m) { 
      this.myName = m; 
      queue = new LinkedBlockingQueue<String>(); 
     } 

     @Override 
     public void run() { 
      while(!Thread.interrupted()) { 
       try { 
        while (queue.size() == 0 && !Thread.interrupted()) { 
         ; 
        } 
        while (queue.peek() == null && !Thread.interrupted()) { 
         ; 
        } 
        System.out.println("" + myName + " Consumer: " + queue.poll()); 
       } catch (Exception e) { } 
      } 
     } 
    } 

    class Broker { 
     public ArrayList<Consumer> consumers = new ArrayList<Consumer>(); 

     int n; 
     int maxSize; 

     public Broker(int maxSize) { 
      n = 0; 
      this.maxSize = maxSize; 
     } 

     synchronized void insert() throws InterruptedException { 
        // only here for testing don't want it to runaway and 
        //memory leak, only testing first 100 samples. 
      if (n == maxSize) 
       wait(); 
      System.out.println("Producer: " + n++); 
      for (Consumer c : consumers) { 
       c.queue.add("Message " + n); 
      } 
     } 

    } 

    public static void main(String[] args) { 
     TestProducerConsumers pc = new TestProducerConsumers(100); 

    } 
} 
+0

不要使用这些* while(...)*循环在消费者忙等待!这是非常糟糕的编码。一个*阻塞队列*将为你处理所有的事情,包括* insert()*中的* wait()*。 (无论如何,相应的* notify()*调用在哪里?) – JimmyB

+0

因为消费者没有使用wait(),所以您不需要通知插入。'while'在那里用于测试。我投入了一些睡眠而不是等待消费者,因为我的消息不需要实时。 – JPM

+0

那么,那是什么“if(n == maxSize)wait();”? – JimmyB

0

你绝对应该使用queue代替Vector
为每个线程分配自己的队列,并在接收到新消息时向每个线程的队列发送新消息add()。为了灵活性,听众模式也可能有用。

编辑:

好吧,我觉得我应该补充一个例子,也:
(古典观察者模式)

这是接口,所有的消费者必须实现:

public interface MessageListener { 
    public void newMessage(byte[] message); 
} 

生产者可能是这样的:

public class Producer { 
    Collection<MessageListener> listeners = new ArrayList<MessageListener>(); 


    // Allow interested parties to register for new messages 
    public void addListener(MessageListener listener) { 
    this.listeners.add(listener); 
    } 

    public void removeListener(Object listener) { 
    this.listeners.remove(listener); 
    } 

    protected void produceMessages() { 
    byte[] msg = new byte[10]; 

    // Create message and put into msg 

    // Tell all registered listeners about the new message: 
    for (MessageListener l : this.listeners) { 
     l.newMessage(msg); 
    } 

    } 
} 

而消费类可能是(使用阻塞队列它确实是wait() ING和notify()荷兰国际集团都对我们):

public class Consumer implements MessageListener { 

    BlockingQueue<byte[]> queue = new LinkedBlockingQueue<byte[]>(); 

    // This implements the MessageListener interface: 
    @Override 
    public void newMessage(byte[] message) { 
    try { 
     queue.put(message); 
    } catch (InterruptedException e) { 
     // won't happen. 
    } 
    } 

    // Execute in another thread:  
    protected void handleMessages() throws InterruptedException { 
     while (true) { 
      byte[] newMessage = queue.take(); 

      // handle the new message. 
     } 
    } 


} 
+0

矢量和队列都只能容纳对象,而不是子女。使用阻塞队列和上面的整个代码变得像queue.put(message)和queue.take()一样简单。 – JimmyB

+0

我更倾向于您的示例,因为我无法使SettableFuture工作。 – JPM

+0

我总是处理类似这样的事情,Listener甚至编写了一个完整的GUI状态监听器系统,用于从JMS系统获取实时数据并使用这些状态更改更新自定义组件。 – JPM

1

发布 - 订阅机制绝对是达到你想要的方式。我不确定为什么为Android开发将限制您使用JMS,这与JMS一样简单。检查出SO上的 this thread

+0

是的,但主要是谈论使用JMS,这是有点过分的,我需要在线程应用程序之间发送消息。 – JPM

+0

在这种情况下,将简单的Vector(或Hanno Binder建议的队列)包装成Observer/Observable模式吗? – mazaneicha