2013-10-24 35 views
1

我有一个MessageReceiver这是从队列抽消息:您如何排除Azure MessageReceiver消息泵?

var factory = MessagingFactory.CreateFromConnectionString(connectionString); 

var receiver = factory.CreateMessageReceiver(queuePath); 

receiver.OnMessageAsync(HandleBrokeredMessageAsync); 

HandleBrokeredMessageAsync是我的代表,其接收器泵的消息之中。

当我在接收器上呼叫Close()时,它将停止从队列中抽取更多消息。为了避免潜在的竞争条件,我想确保在返回控制之前所有待处理的处理都已完成。

我已经考虑跟踪HandleBrokeredMessageAsync的每个呼叫到ConcurrentBag<T>,当它们完成时将它们从包中取出。我会用一个BlockingCollection<T>来阻止这个过程,直到排水完成,但是不清楚什么时候打电话给CompleteAdding():我会在打电话给Close()之后调用它,但是在呼叫Close()和随后发送的消息之间可能会有差距处理程序?

receiver.Close(); 
pendingMessages.CompleteAdding(); 
// Can additional messages be pumped after this? 

回答

0

看一看这里的样本,因为它使用一个ManualResetEvent的,以统筹的run()方法关闭和关闭操作。如果您在消息处理中检查并在那里停止(不接受下一条消息进行处理),然后在所有这些并行处理器完成时调用close,那么类似的方法可能适用?

https://stackoverflow.com/a/16739691/1078351