2014-08-29 133 views
1

我是ActiveMQ的新手。我试图在activemq中实现生产者 - 消费者(发送者 - 接收者)。在我的代码中,我很容易通过ActiveMQ发送从单个生产者到单个消费者的消息&。但问题是,我无法将信息发送给同一制作人的多个消费者。向ActiveMQ中的多个消费者发送消息

这里是我的制片 & 消费者类。

MsgProducer.java

package jms_service; 

import javax.jms.JMSException; 
import javax.jms.Session; 
import javax.jms.TextMessage; 
import org.apache.activemq.ActiveMQConnectionFactory; 

public class MsgProducer { 

     private static String url = "failover://tcp://localhost:61616"; 
     public static javax.jms.ConnectionFactory connFactory; 
     public static javax.jms.Connection connection; 
     public static javax.jms.Session mqSession; 
     public static javax.jms.Topic topic; 
     public static javax.jms.MessageProducer producer; 

     public static void main(String[] args) throws JMSException { 

      connFactory = new ActiveMQConnectionFactory(url); 
      connection = connFactory.createConnection("system","manager"); 
      connection.start(); 
      mqSession = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); 

      topic = mqSession.createTopic("RealTimeData"); 
      producer = mqSession.createProducer(topic);     
      producer.setTimeToLive(30000); 

      TextMessage message = mqSession.createTextMessage();  

      int seq_id =1; 

      while(true) 
      {    
       message.setText("Hello world | " +"seq_id #"+seq_id);    
       producer.send(message); 
       seq_id++; 

       System.out.println("sent_msg =>> "+ message.getText()); 
       // if(seq_id>100000) break; 

        try { 
         Thread.sleep(1000); 
         } 
        catch (InterruptedException e) { e.printStackTrace();}   
       }  

    } 

} 

MsgConsumer.java

package jms_service; 

import java.text.SimpleDateFormat; 
import java.util.Calendar; 

import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageListener; 
import javax.jms.Session; 
import javax.jms.TextMessage; 
import org.apache.activemq.ActiveMQConnectionFactory; 

public class MsgConsumer { 

      private static String url = "failover://tcp://localhost:61616";  
      public static javax.jms.ConnectionFactory connFactory; 
      public static javax.jms.Connection connection; 
      public static javax.jms.Session mqSession; 
      public static javax.jms.Topic topic; 
      public static javax.jms.MessageConsumer consumer; 

     public static void main(String[] args) throws JMSException, InterruptedException { 

      connFactory = new ActiveMQConnectionFactory(url); 
      connection = connFactory.createConnection("system", "manager"); 
      connection.setClientID("0002"); 
      //connection.start();    
      mqSession = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); 
      topic = mqSession.createTopic("RealTimeData"); 
      consumer = mqSession.createDurableSubscriber(topic, "SUBS01"); 
      connection.start(); 

      MessageListener listner = new MessageListener() { 
       public void onMessage(Message message) { 
        try { 
         if (message instanceof TextMessage) { 
          TextMessage txtmsg = (TextMessage) message; 
          Calendar cal = Calendar.getInstance(); 
          //cal.getTime(); 
          SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); 
          String time = sdf.format(cal.getTime()); 

          String msg="received_message =>> "+ txtmsg.getText() + " | received_at :: "+time; 
          System.out.println(msg); 

          //consumer.sendData(msg); 
         } 

         } catch (JMSException e) { 
          System.out.println("Caught:" + e); 
          e.printStackTrace(); 
          } 
        } 
      }; 

      consumer.setMessageListener(listner); 

     } 


} 

谁能帮助找出发送消息给多个消费者的方式。 在此先感谢。

+0

究竟是什么问题? – 2014-08-29 08:07:49

+0

你有一个硬编码的客户端ID ...我承认我不知道ActiveMQ,但我可以想象这是一个原因。 – Fildor 2014-08-29 08:27:37

+0

“如果在调用此方法时具有相同clientID的另一个连接已在运行,则JMS提供程序应检测到重复ID并引发InvalidClientIDException。” – Fildor 2014-08-29 08:30:26

回答

1

假设你的问题是

谁能帮助来发送消息给多个消费者

,并找出路,而无需通过您的完整代码阅读,这种做法可能是把你的客户在一个集合

static Vector<consumer> vecConsumer; 

在哪里你把每一个新的客户端,并给所有现有的客户参考。 广播就像发送至单个客户端,封装在,对于一个示例,循环foreach

for(consumer cons : vecConsumer) 
{ 
     //send stuff or put in sending queue 
} 
+0

我承认,我不知道ActiveMQ,但我希望有一个消息队列来处理这件事。 – Fildor 2014-08-29 08:26:03

+0

@Fildor由于JMS消息队列对于点对点连接(或多或少)的事实,我想到了一个更通用的方法;) – 2014-08-29 09:47:15

2

队列语义跨所有消费者递送消息一次一次和仅-。这是根据JMS规范(了解基本知识的一个很好的阅读)。

主题语义向每个消费者传递消息。所以,一个主题可能是您的需求的答案。

相关问题