2015-09-10 38 views
0

什么是从RabbitMQ并行和按顺序消费“主题”消息批次的最佳方式是什么?消费RabbitMQ消息与订单和上下文

我们有一个服务器为许多客户处理数据。每次处理客户的数据时,都会将一堆消息发送给RMQ。另一方面,我们有一个消耗数据并将其存储在数据库中的进程。

消费过程缓慢,我们想要并行化并使其可扩展。问题是单个客户的数据不能同时由两个消费者处理。 生产者现在可以随时运行,并且可以将消息添加到队列中,即使对于队列中已有消息的客户也是如此。

其中一个建议是创造,如果它正在处理数据,将每个客户指定的新的数据库表。消费者只会向未被其他消费者处理的消费者索要消息,并且会在该消费者的数据库中注册自己。

我不愿意使用的解决方案,因为它需要连接到数据库,并在数据库中保存运行时的状态。

我希望能找到,可能我们的消费者/生产商代码和RMQ范围内处理的解决方案。

有人建议,有书面下客户RMQ“主题”,并有消费者的短信读取一个“主题”。消息将被添加到每个批次或客户消息的单独队列(或“主题”)。消费者将消费“顾客”消息并使用其数据从主队列中选择“主题”。

问题是,当制片人希望将新数据添加到主队列为已经在当前正在处理的主队列数据的客户,会发生什么。

我们如何同步RMQ的产量?

+0

您是否尝试过使用交易?我对他们没有第一手经验,但看起来你在寻找什么(只要这些消息来源于同一个制作人会话) – SJuan76

回答

0

我想你可以看看rabbitmq教程可用。

http://www.rabbitmq.com/tutorials/tutorial-five-java.html

示例代码:

import com.rabbitmq.client.*; 

import java.io.IOException; 

public class ReceiveLogsTopic { 
    private static final String EXCHANGE_NAME = "topic_logs"; 

    public static void main(String[] argv) throws Exception { 
    ConnectionFactory factory = new ConnectionFactory(); 
    factory.setHost("localhost"); 
    Connection connection = factory.newConnection(); 
    Channel channel = connection.createChannel(); 

    channel.exchangeDeclare(EXCHANGE_NAME, "topic"); 
    String queueName = channel.queueDeclare().getQueue(); 

    if (argv.length < 1) { 
     System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); 
     System.exit(1); 
    } 

    for (String bindingKey : argv) { 
     channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); 
    } 

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 

    Consumer consumer = new DefaultConsumer(channel) { 
     @Override 
     public void handleDelivery(String consumerTag, Envelope envelope, 
           AMQP.BasicProperties properties, byte[] body) throws IOException { 
     String message = new String(body, "UTF-8"); 
     System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'"); 
     } 
    }; 
    channel.basicConsume(queueName, true, consumer); 
    } 
} 

方法来执行代码,并保存在一个文件中:

java -cp $CP ReceiveLogsTopic "#" > logfile.log 

我希望它可以帮助和给你一个想法。

其实最好的方法是使用数据库,但如果你没有确定它意味着你可以通过在一个文件中的消息给一个尝试,跟踪和重用。

也就是说,您可以在执行时将详细信息保存在文件中,并且可以在运行时根据需要对其进行跟踪。

注意:我已附上教程中给出的示例代码,因为即使链接将来发生更改,任何人都可以跟踪详细信息。