2015-11-04 29 views
0

我用下面的Java应用程序把消息放在JMS队列中的消息消费上的ActiveMQ复古积极

import org.springframework.context.ApplicationContext; 
import org.springframework.context.support.ClassPathXmlApplicationContext; 


public class Main { 
public static void main(String[] args) { 
    ApplicationContext ctx = new ClassPathXmlApplicationContext("app-context.xml"); 
    JmsMessageSender jmsMessageSender = (JmsMessageSender)ctx.getBean("jmsMessageSender"); 

    java.lang.String text = "{\"name\": \"Bob\"}"; 
    jmsMessageSender.send(text); 
    ((ClassPathXmlApplicationContext)ctx).close(); 
} 

} 

消息发送者看起来是这样的:

import javax.jms.Destination; 
import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.Session; 

import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.jms.core.JmsTemplate; 
import org.springframework.jms.core.MessageCreator; 
import org.springframework.stereotype.Service; 

@Service 
public class JmsMessageSender { 
@Autowired 
private JmsTemplate jmsTemplate; 
public void send(final String text) { 
    this.jmsTemplate.send(new MessageCreator() { 
     @Override 
     public Message createMessage(Session session) throws JMSException { 
      Message message = session.createTextMessage(text); 
      return message; 
     } 
    }); 
} 


} 

和Spring配置如下:

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:beans="http://www.springframework.org/schema/beans" 
    xmlns:context="http://www.springframework.org/schema/context" 
    xsi:schemaLocation=" 
http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd 
http://www.springframework.org/schema/context 
http://www.springframework.org/schema/context/spring-context.xsd"> 


<context:component-scan base-package="com.intonilof" /> 
<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 
    <constructor-arg index="0" value="tcp://127.0.0.1:61616" /> 
</bean> 

<bean id="connectionFactory" 
     class="org.springframework.jms.connection.CachingConnectionFactory"> 
    <constructor-arg ref="amqConnectionFactory" /> 
</bean> 

<bean id="defaultDestination" class="org.apache.activemq.command.ActiveMQTopic"> 
    <constructor-arg index="0" value="emailsToSend" /> 
</bean> 

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 
    <property name="connectionFactory" ref="connectionFactory" /> 
    <property name="defaultDestination" ref="defaultDestination" /> 
</bean> 

如果我使用此代码将消息放在如何创建消费者的主题,该主题将在稍后消耗jms消息?

我曾尝试以下,但它并未占用消息:

import org.apache.activemq.command.ActiveMQTextMessage; 
import org.apache.log4j.Logger; 
import org.json.JSONObject; 

import java.util.Date; 

public class App { 
final static Logger logger = Logger.getLogger(App.class); 
public static void main(String[] args) throws Exception { 
    thread(new HelloWorldConsumer(), false); 
} 

public static void thread(Runnable runnable, boolean daemon) { 
    Thread brokerThread = new Thread(runnable); 
    brokerThread.setDaemon(daemon); 
    brokerThread.start(); 
} 

public static class HelloWorldConsumer implements Runnable, ExceptionListener { 

    public void run() { 
     try { 
      logger.trace("Running at: " + new Date().toString()); 
      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); 
      Connection connection = connectionFactory.createConnection(); 
      connection.start(); 
      connection.setExceptionListener(this); 
      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
      Queue queue = session.createQueue("emailsToSend?consumer.retroactive=true"); 
      MessageConsumer consumer = session.createConsumer(topic); 
      Message message = consumer.receive(1000); 

      if (message instanceof TextMessage) { 
        System.out.println("Found message"); 

      } else if (message == null){ 
       logger.trace("No messages"); 
      } 
      consumer.close(); 
      session.close(); 
      connection.close(); 
     } catch (Exception e) { 
      logger.info("Caught: " + e); 
      e.printStackTrace(); 
     } 
    } 

    public synchronized void onException(JMSException ex) { 
     logger.trace("JMS Exception occured. Shutting down client."); 
    } 
} 
} 

回答

1

您需要先创建耐用主题订阅。然后,JMS将在用户断开连接时保留所有发布的消息,并在重新连接时交付它们。