2013-01-11 43 views
1

我对RabbitMQ和sprimg-amqp有个令人沮丧的问题。我需要从外部进程向一个队列发送一条消息(一个JUnit类只是为了测试路由是否工作正常)。这是我的camel-context.xml文件:RabbitMQ和Camel:由于“消息在恢复时丢失”导致路由中断

<camelContext xmlns="http://camel.apache.org/schema/spring"> 
    <route> 
     <from uri="spring-amqp:KipcastDirect:KipcastQueue:KipcastRouting?type=direct&amp;autodelete=true&amp;durable=true" /> 
     <log message="Message received!!! "/> 
     <to uri="spring-amqp:KipcastDirect2:TestQueue:KipcastRouting2?type=direct&amp;autodelete=false&amp;durable=true" /> 
    </route> 
</camelContext> 

<rabbit:connection-factory id="amqpConnectionFactory" /> 
<rabbit:template id="amqpTemplate" connection-factory="amqpConnectionFactory" message-converter="messageConverter" exchange="KipcastBean" /> 
<rabbit:admin connection-factory="amqpConnectionFactory"/> 

<bean id="amqpConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> 
    <property name="host" value="10.211.55.20"/> 
    <property name="port" value="5672"/> 
    <property name="username" value="guest"/> 
    <property name="password" value="guest"/> 
    <property name="virtualHost" value="/"/> 
</bean> 

<bean id="messageConverter" class="amqp.spring.converter.XStreamConverter"/> 

当我开始使用maven camel:run时,它工作正常。交易所可用,也可以在RabbitMQ管理中排队。在问题发生时,我尝试将消息发送到交易所:

ConnectionFactory factory = new ConnectionFactory(); 
factory.setHost("10.211.55.20"); 
factory.setPort(5672); 
factory.setVirtualHost("/"); 
factory.setUsername("guest"); 
factory.setPassword("guest"); 
Connection connection = factory.newConnection(); 
Channel channel = connection.createChannel(); 

channel.exchangeDeclare("KipcastDirect", "direct", 
     true, /* durable */ 
     true, /* autodelete */ 
     null); /* */ 

byte[] messageBodyBytes = "Hello, world!".getBytes(); 

AMQP.BasicProperties.Builder bob = new AMQP.BasicProperties.Builder(); 
AMQP.BasicProperties minBasic = bob.build(); 
minBasic = bob.priority(0).messageId("Test").build(); 
minBasic = bob.priority(0).deliveryMode(1).build(); 

while (true) { 

    channel.basicPublish("KipcastDirect", "KipcastRouting", minBasic, messageBodyBytes); 
    System.out.println(" [x] Sent "); 

} 

消息被正确发送到队列(我可以看到他们在日志),但将引发异常和路由停止:

[  SimpleAsyncTaskExecutor-1] SpringAMQPConsumer    WARN Caused by: [org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException - Listener threw exception] 
org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException: Listener threw exception 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:590)[spring-rabbit-1.0.0.RELEASE.jar:] 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:529)[spring-rabbit-1.0.0.RELEASE.jar:] 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:472)[spring-rabbit-1.0.0.RELEASE.jar:] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:56)[spring-rabbit-1.0.0.RELEASE.jar:] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:103)[spring-rabbit-1.0.0.RELEASE.jar:] 
    at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)[:1.6.0_37] 
    at java.lang.reflect.Method.invoke(Method.java:597)[:1.6.0_37] 
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:309)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor$MethodInvocationRetryCallback.doWithRetry(StatefulRetryOperationsInterceptor.java:173)[spring-retry-1.0.0.RELEASE.jar:] 
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:239)[spring-retry-1.0.0.RELEASE.jar:] 
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:186)[spring-retry-1.0.0.RELEASE.jar:] 
    at org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor.invoke(StatefulRetryOperationsInterceptor.java:145)[spring-retry-1.0.0.RELEASE.jar:] 
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at $Proxy46.invokeListener(Unknown Source)[:] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:560)[spring-rabbit-1.0.0.RELEASE.jar:] 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:452)[spring-rabbit-1.0.0.RELEASE.jar:] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:436)[spring-rabbit-1.0.0.RELEASE.jar:] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:420)[spring-rabbit-1.0.0.RELEASE.jar:] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$200(SimpleMessageListenerContainer.java:56)[spring-rabbit-1.0.0.RELEASE.jar:] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:505)[spring-rabbit-1.0.0.RELEASE.jar:] 
    at java.lang.Thread.run(Thread.java:680)[:1.6.0_37] 
[  SimpleAsyncTaskExecutor-1] erationsInterceptorFactoryBean WARN Message dropped on recovery: (Body:'Hello, world!'; ID:Test; Content:text/plain; Headers:{}; Exchange:KipcastDirect; RoutingKey:KipcastRouting; Reply:null; DeliveryMode:NON_PERSISTENT; DeliveryTag:2) 
org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException: Listener threw exception 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:590)[spring-rabbit-1.0.0.RELEASE.jar:] 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:529)[spring-rabbit-1.0.0.RELEASE.jar:] 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:472)[spring-rabbit-1.0.0.RELEASE.jar:] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:56)[spring-rabbit-1.0.0.RELEASE.jar:] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:103)[spring-rabbit-1.0.0.RELEASE.jar:] 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)[:1.6.0_37] 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)[:1.6.0_37] 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)[:1.6.0_37] 
    at java.lang.reflect.Method.invoke(Method.java:597)[:1.6.0_37] 
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:309)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor$MethodInvocationRetryCallback.doWithRetry(StatefulRetryOperationsInterceptor.java:173)[spring-retry-1.0.0.RELEASE.jar:] 
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:239)[spring-retry-1.0.0.RELEASE.jar:] 
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:186)[spring-retry-1.0.0.RELEASE.jar:] 
    at org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor.invoke(StatefulRetryOperationsInterceptor.java:145)[spring-retry-1.0.0.RELEASE.jar:] 
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at $Proxy46.invokeListener(Unknown Source)[:] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:560)[spring-rabbit-1.0.0.RELEASE.jar:] 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:452)[spring-rabbit-1.0.0.RELEASE.jar:] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:436)[spring-rabbit-1.0.0.RELEASE.jar:] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:420)[spring-rabbit-1.0.0.RELEASE.jar:] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$200(SimpleMessageListenerContainer.java:56)[spring-rabbit-1.0.0.RELEASE.jar:] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:505)[spring-rabbit-1.0.0.RELEASE.jar:] 
    at java.lang.Thread.run(Thread.java:680)[:1.6.0_37] 

这是怎么回事?什么是我必须生成消息ID的原因?

回答

0

问题涉及的MessageConverter豆:

<bean id="messageConverter" class="amqp.spring.converter.XStreamConverter"/> 

已被

<bean id="jsonMessageConverter" class="amqp.spring.converter.XStreamConverter"/> 
<bean id="textMessageConverter" class="amqp.spring.converter.StringConverter"/> 
<bean id="messageConverter" class="amqp.spring.converter.ContentTypeConverterFactory"> 
    <property name="converters"> 
     <map> 
      <entry key="application/json" value-ref="jsonMessageConverter"/> 
      <entry key="application/xml" value-ref="textMessageConverter"/> 
     </map> 
    </property> 
    <property name="fallbackConverter" ref="textMessageConverter"/> 
</bean> 

更换此解决这个问题,该消息被正确地路由。

相关问题