2011-11-18 26 views
3

使用RabbitMQ有一种使用方法,类似于MSSMQ,可以从队列中弹出1000条消息,然后将其插入数据库并从那里继续。RabbitMQ和C#

我似乎无法做到这一点与一个渠道的订阅,然后对订阅中的BasicDeliveryEventArgs做一个foreach,用这个if语句在给定的时间处理我想要处理的最大消息数。

预先感谢 然而,这仍然需要从队列

using (IConnection connection = factory.CreateConnection()) 
{ 
    using (IModel channel = connection.CreateModel()) 
    { 
     channel.QueueDeclare("****", true, false, false, null); 

     var subscription = new Subscription(channel, "****", false); 
     int maxMessages = 5; 
     int i = 0; 
     foreach (BasicDeliverEventArgs eventArgs in subscription) 
     { 
      if (++i == maxMessages) 
      { 
       Console.WriteLine("Took 5 messages"); 
       subscription.Ack(eventArgs); 
       break; 
      } 
     } 
    } 
} 
+1

我不明白这个问题,特别是“与那做一个if语句”部分。你能澄清一下吗? –

+0

更新了帖子 – user1053237

+0

对我仍然没有意义。你想达到什么目的? –

回答

9

我假设你想通过他们的群体配料成较大的交易,而优化消息加载到数据库中的所有22K的消息而不是每个消息的交易费用。随着强制性的警告,这样做意味着信息的大型团体可以一起失败,即使只有其中一个会导致一个问题,这里是你怎么会去一下......

QOS设置在通道上:

channel.BasicQos(0, 1000, false); 

这将预取1000条消息并阻止进一步的流量,直到您确认某事。请注意,它不会以1000个块为单位获取。相反,它确保最多可以在任何时候预取最多1000个UNACK'ed消息。模拟块传输与首先处理1000条消息一样简单,然后一次确认全部消息。

请参阅herehere以获取比我更权威的解释。

还有一点:即使您还没有创建1000条消息的配额,只要有消息可用,您可能需要刷新队列。您应该可以通过在foreach循环内调用queue.BasicGet()直到它干涸,然后将所有内容(包括从subscription中提取的消息)发送到数据库来完成此操作。警告:我自己没有尝试过,所以我可能会说垃圾,但我认为它会起作用。这种方法的优点在于它可以立即将消息推入数据库,而无需等待一整批1000条消息。如果数据库落后于处理太多小事务,预取积压将在每个周期之间更多地填满。