2014-06-18 36 views
0

我有一个多线程的弹簧应用程序,我在这里创建主题交换,声明队列,用路由键绑定它们。同步发送和接收消息。我能够将消息发送到话题交换,并查看使用routingKey将消息发布到队列中。Rabbit MQ同步在多线程应用程序中发送和接收

但是,在收到消息时,我看到消费者在每次迭代中都注册了队列,并且没有取消注册。我正在创建QueueingConsumer接收邮件,可能有另一种方法可以做同样的事情,请让我知道。下面是receiveMessage方法的片段。

public ObjectMessage receiveMessage(final String readQueue, final UUID correlationId, final boolean isBroadcastMessage, final int readTimeout, final int readAttempts) 
{ 
    this.configurationLock.lock(); 
    this.transmissionSemaphore.release(1); 
    this.configurationLock.unlock(); 
    try 
    { 
     for (int i = 0; i < readAttempts; i++) 
     { 
      ObjectMessage returnValue = null; 
      try 
      { 
       returnValue = this.receiveMessage(readQueue, correlationId, isBroadcastMessage, readTimeout); 
      } 
      catch (final Exception e) 
      { 
       logger.error(e); 
      } 
      if (returnValue != null) 
      { 
       logger.warn("Message received from queue - " + readQueue); 
       return returnValue; 
      } 
     } 
     if (correlationId != null) 
     { 
      throw new MessageNotFoundException(correlationId); 
     } 
     return null; 
    } 
    finally 
    { 
     try 
     { 
      this.transmissionSemaphore.acquire(1); 
     } 
     catch (final InterruptedException e) 
     { 
      Thread.interrupted(); 
     } 
    } 
} 


private ObjectMessage receiveMessage(final String routingKey, final UUID correlationId, final boolean isBroadcastMessage, final int readTimeout) throws Exception 
{ 
    logger.debug("receiveMessage - routingKey:" + routingKey + ",correlationId:" + correlationId + ",isBroadcastMessage:" + isBroadcastMessage + ",readTimeout:" 
      + readTimeout); 
    this.configurationLock.lock(); 
    this.transmissionSemaphore.release(1); 
    this.configurationLock.unlock(); 

    Connection connection = null; 
    Channel channel = null; 
    QueueingConsumer consumer = null; 
    try 
    { 
     // Binding the topic exchange with queue using routing key 
     final String queueName = "clientConfigurationQueue"; 
     final CachingConnectionFactory cachingConnectionFactory = this.getCachingConnectionFactory(routingKey); 
     if (isBroadcastMessage) 
     { 
      this.declareTopicAmqpInfrastructure(cachingConnectionFactory, routingKey, queueName); 
     } 
     QueueingConsumer.Delivery delivery; 

     connection = cachingConnectionFactory.createConnection(); 
     channel = connection.createChannel(false); 

     consumer = new QueueingConsumer(channel); 

     if (correlationId == null) 
     { 
      channel.basicConsume(queueName, true, consumer); 
      delivery = consumer.nextDelivery(readTimeout); 
     } 
     else 
     { 
      channel.basicConsume(queueName, false, consumer); 
      while (true) 
      { 
       delivery = consumer.nextDelivery(readTimeout); 
       if (delivery != null) 
       { 
        final String correlationId = delivery.getProperties().getCorrelationId(); 

        if (correlationId.equals(correlationId)) 
        { 
         channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 
         break; 
        } 
        else 
        { 
         channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true); 
        } 
       } 
       else 
       { 
        break; 
       } 
      } 
     } 

     ObjectMessage objectMessage = null; 
     if (delivery != null) 
     { 
      logger.debug("Message received with correlationId - " + delivery.getProperties().getCorrelationId() + " for queue - " + queueName); 
      logger.debug("Message received with Body - " + SerializationUtils.deserialize(delivery.getBody())); 
      objectMessage = new ObjectMessage(); 
      objectMessage.setCorrelationId(delivery.getProperties().getCorrelationId()); 
      objectMessage.setMessage(delivery.getBody()); 
     } 
     else 
     { 
      logger.debug("Message not received from queueName - " + queueName); 
     } 

     return objectMessage; 
    } 
    catch (final IOException | ShutdownSignalException | ConsumerCancelledException | InterruptedException e) 
    { 
     logger.error("Unable to receive message - " + e); 
     throw new Exception(e); 
    } 
    finally 
    { 
     try 
     { 
      this.transmissionSemaphore.acquire(1); 
     } 
     catch (final InterruptedException e) 
     { 
      Thread.interrupted(); 
     } 

     try 
     { 
      if (connection != null) 
      { 
       connection.close(); 
      } 

      if (channel != null) 
      { 
       channel.close(); 
      } 
     } 
     catch (final Exception ignore) 
     { 

     } 
    } 
} 

private void declareTopicAmqpInfrastructure(final CachingConnectionFactory cachingConnectionFactory, final String routingKey, String queueName) 
{ 
    final Connection connection = cachingConnectionFactory.createConnection(); 
    final Channel channel = connection.createChannel(false); 
    try 
    { 
     channel.exchangeDeclare("topicExchange", ExchangeTypes.TOPIC, true, false, null); 
     channel.queueDeclare(queueName, true, false, false, null); 
     channel.queueBind(queueName, "topicExchange", routingKey); 
    } 
    catch (final IOException e) 
    { 
     logger.error("Unable to declare rabbit queue, exchange and binding - " + e); 
    } 
    finally 
    { 
     connection.close(); 
     try 
     { 
      channel.close(); 
     } 
     catch (final IOException ignore) 
     { 

     } 
    } 
} 
+1

你所描述的对我来说毫无意义;请提供更多详细信息(堆栈跟踪等)。如果您可以发布重现该应用的示例应用(例如,在Gist上),那会更好。 –

+0

提供有关示例代码的更多详细信息 – GRaj

回答

0

通过您的编辑,您完全改变了您的问题;你原来的问题说你挂在createConnection()。如果你使用Spring AMQP,你为什么不使用它的更高级别的抽象?您永远不会取消您的消费者 - 您需要跟踪basicConsume返回的consumerTag,并在完成后使用basicCancel取消。

+0

对于完全改变问题感到抱歉,因为我几乎无法理解与主题和扇出交换相关的AMQP概念,因此产生了一些混乱。 通过更高级别的抽象,你的意思是使用RabbitTemplate发送和接收方法? – GRaj

+0

是的,和异步接收的消息侦听器容器。 –

+0

消费者名单立即下降。非常感谢Gary。你一直是一位伟大的导师。 – GRaj

相关问题