2014-10-03 42 views
1

我使用RabbitMQ来连接部件之间的程序。 RMQ版本(3.3.5)。它与回购的java客户端一起使用。RabbitMQ订阅

// Connection part 
@Inject 
public AMQService(RabbitMQConfig mqConfig) throws IOException { 
    this.mqConfig = mqConfig; 
    connectionFactory.setHost(mqConfig.getRABBIT_HOST()); 
    connectionFactory.setUsername(mqConfig.getRABBIT_USERNAME()); 
    connectionFactory.setPassword(mqConfig.getRABBIT_PASSWORD()); 
    connectionFactory.setAutomaticRecoveryEnabled(true); 
    connectionFactory.setPort(mqConfig.getRABBIT_PORT()); 
    connectionFactory.setVirtualHost(mqConfig.getRABBIT_VHOST()); 
    Connection connection = connectionFactory.newConnection(); 
    channel = connection.createChannel(); 
    channel.basicQos(1); 
} 

//Consume part 
private static void consumeResultQueue() { 
    final QueueingConsumer consumer = new QueueingConsumer(channel); 
    Future resultQueue = EXECUTOR_SERVICE.submit((Callable<Object>)() -> { 
     channel.basicConsume("resultQueue", true, consumer); 
     while (true) { 
      try { 
       QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
       String message = new String(delivery.getBody(), "UTF-8"); 
       resultListener.onMessage(message); 
      } catch (IOException | InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
    }); 
} 

我想离开使用inifinty循环。当消息可以从队列中读取时,RMQ可以通知客户端吗?没有检查?

回答

2

您可以创建一个延伸DefaultConsumer并覆盖handleDelivery的类。

public class MyConsumer extends DefaultConsumer { 

    public MyConsumer(Channel channel) { 
     super(channel); 
    } 

    @Override 
    public void handleDelivery(String consumerTag, Envelope envelope, 
          AMQP.BasicProperties properties, byte[] body) throws IOException { 
     // do your computation 
    } 
} 

并注册该消费者与channel.basicConsume(queueName, myConsumerInstance);

注意,通过这样做,handleDelivery将RabbitMQ的客户端线程池里面运行,所以你应该避免这个函数内的任何长的计算。

+0

嗯有趣。我在测试时回应。谢谢 – 2014-10-03 12:25:42