2016-01-27 42 views
1

我是很新,JMS和有一个要求AMQ和WMQ之间的桥梁,我的骆驼文档中看到,而不是使用JMS到JMS桥他们推荐使用骆驼。骆驼的JMS到JMS桥 - 不表示作为一个消费者AMQ

首先,我试图让我的应用程序将消息从AMQ中取出,并简单地记录它是这样做的,但是每当我在Jetty中启动我的应用程序时,它都不会在apiToTopsQueue上显示为消费者,因此不会将消息从队列中取出。

我的应用程序的context.xml(它加载骆驼上下文)

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
     xmlns:context="http://www.springframework.org/schema/context" 
     xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> 

    <context:property-placeholder location="classpath:tops-bridge.properties" /> 

    <import resource="classpath:camel-context.xml" /> 

    <bean id="log4jInitialization" 
      class="org.springframework.beans.factory.config.MethodInvokingFactoryBean"> 
     <property name="targetClass" value="org.springframework.util.Log4jConfigurer" /> 
     <property name="targetMethod" value="initLogging" /> 
     <property name="arguments"> 
      <list> 
       <value>classpath:log4j.xml</value> 
       <value>60000</value> <!-- Refresh Log4j config every 60 seconds --> 
      </list> 
     </property> 
    </bean> 
</beans> 

那么这是骆驼的context.xml(我一直在进出随机码从教程评论,所以可能很奇怪)

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
     xmlns:camel="http://camel.apache.org/schema/spring" 
     xmlns:broker="http://activemq.apache.org/schema/core" 
     xsi:schemaLocation="http://www.springframework.org/schema/beans 
          http://www.springframework.org/schema/beans/spring-beans.xsd 
          http://camel.apache.org/schema/spring 
          http://camel.apache.org/schema/spring/camel-spring.xsd 
          http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> 

    <camel:camelContext id="defaultCamelContext"> 
     <camel:routeBuilder ref="bridgeRouteConfig"/> 
     <!--<camel:jmxAgent id="agent" createConnector="true"/>--> 
    </camel:camelContext> 

    <bean id="bridgeRouteConfig" class="com.caci.asg.rail.tops.bridge.TopsBridgeRouteBuilder"> 
     <constructor-arg name="amqToBridgeQueue" value="${topsBridgeRouteBuilder.route.amqToBridgeRoute}"/> 
     <constructor-arg name="bridgeToWmqQueue" value="${topsBridgeRouteBuilder.route.bridgeToWmqRoute}"/> 
     <constructor-arg name="wmqToBridgeQueue" value="${topsBridgeRouteBuilder.route.wmqToBridgeRoute}"/> 
     <constructor-arg name="bridgeToAmqQueue" value="${topsBridgeRouteBuilder.route.bridgeToAmqRoute}"/> 
    </bean> 

    <bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 
     <property name="brokerURL" value="${application.activemq.url}"/> 
     <property name="useAsyncSend" value="true"/> 
    </bean> 

    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" 
      init-method="start" destroy-method="stop"> 
     <property name="maxConnections" value="8"/> 
     <property name="connectionFactory" ref="jmsFactory"/> 
    </bean> 

    <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration"> 
     <property name="connectionFactory" ref="pooledConnectionFactory"/> 
     <property name="concurrentConsumers" value="10"/> 
    </bean> 


    <!-- lets configure the ActiveMQ JMS broker server --> 
    <broker:broker useJmx="true" persistent="false" brokerName="myBroker"> 
     <broker:transportConnectors> 
      <!-- expose a VM transport for in-JVM transport between AMQ and Camel on the server side --> 
      <broker:transportConnector name="vm" uri="vm://myBroker"/> 
      <!-- expose a TCP transport for clients to use --> 
      <broker:transportConnector name="tcp" uri="${application.activemq.url}"/> 
     </broker:transportConnectors> 
    </broker:broker> 

    <!-- lets configure the Camel ActiveMQ to use the embedded ActiveMQ broker declared above --> 
    <bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent"> 
     <property name="brokerURL" value="vm://myBroker"/> 
    </bean> 

</beans> 

骆驼-context.xml中利用一些性能,这是如下的(那些指WMQ是在此刻,直到我AMQ工作)的占位符。

application.activemq.url=tcp://localhost:61616 

topsBridgeRouteBuilder.route.amqToBridgeRoute=jms:apiToTopsQueue 
topsBridgeRouteBuilder.route.bridgeToWmqRoute=mq:toWmq 
topsBridgeRouteBuilder.route.wmqToBridgeRoute=mq:fromWmq 
topsBridgeRouteBuilder.route.bridgeToAmqRoute=jms:topsToApiQueue 

路线建设者是在Java中如下

import org.apache.camel.LoggingLevel; 
import org.apache.camel.builder.RouteBuilder; 

public class TopsBridgeRouteBuilder extends RouteBuilder { 

    private final String amqToBridgeQueue; 
    private final String bridgeToWmqQueue; 
    private final String wmqToBridgeQueue; 
    private final String bridgeToAmqQueue; 

    public TopsBridgeRouteBuilder(String amqToBridgeQueue, String bridgeToWmqQueue, String wmqToBridgeQueue, String bridgeToAmqQueue) { 
     this.amqToBridgeQueue = amqToBridgeQueue; 
     this.bridgeToWmqQueue = bridgeToWmqQueue; 
     this.wmqToBridgeQueue = wmqToBridgeQueue; 
     this.bridgeToAmqQueue = bridgeToAmqQueue; 
    } 

    @Override 
    public void configure() throws Exception { 
//  from(amqToBridgeQueue).to(bridgeToWmqQueue).log(LoggingLevel.INFO, "Message moving to " + bridgeToWmqQueue); 
//  from(wmqToBridgeQueue).to(bridgeToAmqQueue).log(LoggingLevel.INFO, "Message moving to " + bridgeToAmqQueue); 

     from(amqToBridgeQueue).log(LoggingLevel.WARN, "Consuming message from" + amqToBridgeQueue); 
    } 
} 

所以,我真的不知道为什么它没有被列为apiToTopsQueue的消费者,当我开始码头。我的pom包含AMQ/WMQ/camel库的依赖关系。依赖如下(继承自父POM版本/范围)

<dependency> 
      <groupId>org.apache.camel</groupId> 
      <artifactId>camel-core</artifactId> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.camel</groupId> 
      <artifactId>camel-spring</artifactId> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.camel</groupId> 
      <artifactId>camel-jms</artifactId> 
     </dependency> 
     <dependency> 
      <groupId>com.ibm.mq</groupId> 
      <artifactId>com.ibm.mq.jmqi</artifactId> 
     </dependency> 
     <dependency> 
      <groupId>com.ibm.mq</groupId> 
      <artifactId>com.ibm.mqjms</artifactId> 
     </dependency> 
     <dependency> 
      <groupId>com.ibm.mq</groupId> 
      <artifactId>connector</artifactId> 
     </dependency> 
     <dependency> 
      <groupId>com.ibm.mq</groupId> 
      <artifactId>dhbcore</artifactId> 
     </dependency> 

     <dependency> 
      <groupId>org.apache.activemq</groupId> 
      <artifactId>activemq-all</artifactId> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.activemq</groupId> 
      <artifactId>activemq-pool</artifactId> 
     </dependency> 

我写了这个测试,看看我可以把信息上,并把他们从我桥AMQ - 这工作,在AMQ管理页面我可以看到入队/出队消息计数递增。

import org.apache.activemq.ActiveMQConnectionFactory; 
import org.junit.Test; 

import static org.hamcrest.core.Is.is; 
import static org.junit.Assert.assertThat; 

import javax.jms.*; 

public class TopsBridgeRouteBuilderTest { 

    @Test 
    public void testAMessageAddedToAmqCanBeRetrieved() throws JMSException { 

     String brokerURL = "tcp://localhost:61616"; 
     String amqQueue = "apiToTopsQueue"; 
     String messageToSend = "Test message"; 

     // Put a message on the AMQ 
     ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL); 
     Connection connection = connectionFactory.createConnection(); 
     connection.start(); 
     Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     Destination destination = session.createQueue(amqQueue); 
     MessageProducer producer = session.createProducer(destination); 
     producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 
     TextMessage tm = session.createTextMessage(messageToSend); 
     producer.send(tm); 

     // Create read only consumer to take the message off the queue 
     ActiveMQConnectionFactory connectionFactoryReadOnly = new ActiveMQConnectionFactory(brokerURL); 
     Connection connectionReadOnly = connectionFactoryReadOnly.createConnection(); 
     connectionReadOnly.start(); 
     Session sessionReadOnly = connectionReadOnly.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     MessageConsumer consumer = sessionReadOnly.createConsumer(destination); 
     final TextMessage message = (TextMessage) consumer.receive(); 

     System.out.println("Message retrieved = " + message.getText()); 
     assertThat(message.getText(), is(messageToSend)); 
    } 
} 

在我的配置中是否有什么错误,意味着当我启动Jetty时,Camel路由不工作或正在查看AMQ?

谢谢。

回答

0

原来我缺少我的web.xml本节。为未来学到的教训!

<listener> 
    <listener-class> 
     org.springframework.web.context.ContextLoaderListener 
    </listener-class> 
</listener>