2013-10-26 20 views
-1

我们有一个应用程序定期接收多媒体消息,并应回复它们。同时通过多个线程处理数据

我们目前使用单线程,首先接收消息,然后逐个处理它们。这是工作,但速度很慢。

所以我们现在正在考虑同时进行多个线程的同一个过程。

任何简单的方法来允许并行处理传入的记录,但避免错误地处理两个线程相同的记录?

+1

简洁大方 - >将引发讨论... – igrimpe

+0

你如何接收记录?如果使用TCP/IP,您可以有一个监听线程,这将为每个传入记录创建一个线程。 –

+0

很难给出具体的答案。您可以从.NET中的Parallel.For方法到具有多个可伸缩辅助角色的Azure服务总线。如果你可以指定更多的信息,甚至一些示例代码,我们应该能够提供更具体的建议。 –

回答

1

任何简单的方法来允许并行处理传入记录,但避免错误地处理两个线程相同的记录?

是的,它实际上是不是太硬,你想要做什么是所谓的“生产者 - 消费者模式”

如果你的消息接收器只能一次处理一个线程,但你的信息“处理器“可以一次你只需要使用一个BlockingCollection来存储需要处理

public sealed class MessageProcessor : IDisposable 
{ 
    public MessageProcessor() 
     : this(-1) 
    { 
    } 

    public MessageProcessor(int maxThreadsForProcessing) 
    { 
     _maxThreadsForProcessing = maxThreadsForProcessing; 
     _messages = new BlockingCollection<Message>(); 
     _cts = new CancellationTokenSource(); 

     _messageProcessorThread = new Thread(ProcessMessages); 
     _messageProcessorThread.IsBackground = true; 
     _messageProcessorThread.Name = "Message Processor Thread"; 
     _messageProcessorThread.Start(); 
    } 

    public int MaxThreadsForProcessing 
    { 
     get { return _maxThreadsForProcessing; } 
    } 

    private readonly BlockingCollection<Message> _messages; 
    private readonly CancellationTokenSource _cts; 
    private readonly Thread _messageProcessorThread; 
    private bool _disposed = false; 
    private readonly int _maxThreadsForProcessing; 


    /// <summary> 
    /// Add a new message to be queued up and processed in the background. 
    /// </summary> 
    public void ReceiveMessage(Message message) 
    { 
     _messages.Add(message); 
    } 

    /// <summary> 
    /// Signals the system to stop processing messages. 
    /// </summary> 
    /// <param name="finishQueue">Should the queue of messages waiting to be processed be allowed to finish</param> 
    public void Stop(bool finishQueue) 
    { 
     _messages.CompleteAdding(); 
     if(!finishQueue) 
      _cts.Cancel(); 

     //Wait for the message processor thread to finish it's work. 
     _messageProcessorThread.Join(); 
    } 

    /// <summary> 
    /// The background thread that processes messages in the system 
    /// </summary> 
    private void ProcessMessages() 
    { 
     try 
     { 
      Parallel.ForEach(_messages.GetConsumingEnumerable(), 
         new ParallelOptions() 
         { 
          CancellationToken = _cts.Token, 
          MaxDegreeOfParallelism = MaxThreadsForProcessing 
         }, 
         ProcessMessage); 
     } 
     catch (OperationCanceledException) 
     { 
      //Don't care that it happened, just don't want it to bubble up as a unhandeled exception. 
     } 
    } 

    private void ProcessMessage(Message message, ParallelLoopState loopState) 
    { 
     //Here be dragons! (or your code to process a message, your choice :-)) 

     //Use if(_cts.Token.IsCancellationRequested || loopState.ShouldExitCurrentIteration) to test if 
     // we should quit out of the function early for a graceful shutdown. 
    } 

    public void Dispose() 
    { 
     if(!_disposed) 
     { 
      if(_cts != null && _messages != null && _messageProcessorThread != null) 
       Stop(true); //This line will block till all queued messages have been processed, if you want it to be quicker you need to call `Stop(false)` before you dispose the object. 

      if(_cts != null) 
       _cts.Dispose(); 

      if(_messages != null) 
       _messages.Dispose(); 

      GC.SuppressFinalize(this); 
      _disposed = true; 
     } 
    } 

    ~MessageProcessor() 
    { 
     //Nothing to do, just making FXCop happy. 
    } 

} 

我强烈建议你阅读免费书籍Patterns for Parallel Programming工作在多个消息的工作,它会在约一些这方面的细节。整个章节详细解释了生产者 - 消费者模型。


UPDATE:有一些性能问题GetConsumingEnumerable()Parallel.ForEach(,而是使用库ParallelExtensionsExtras和它的新的扩展方法GetConsumingPartitioner()

public static Partitioner<T> GetConsumingPartitioner<T>(
    this BlockingCollection<T> collection) 
{ 
    return new BlockingCollectionPartitioner<T>(collection); 
} 

private class BlockingCollectionPartitioner<T> : Partitioner<T> 
{ 
    private BlockingCollection<T> _collection; 

    internal BlockingCollectionPartitioner(
     BlockingCollection<T> collection) 
    { 
     if (collection == null) 
      throw new ArgumentNullException("collection"); 
     _collection = collection; 
    } 

    public override bool SupportsDynamicPartitions { 
     get { return true; } 
    } 

    public override IList<IEnumerator<T>> GetPartitions(
     int partitionCount) 
    { 
     if (partitionCount < 1) 
      throw new ArgumentOutOfRangeException("partitionCount"); 
     var dynamicPartitioner = GetDynamicPartitions(); 
     return Enumerable.Range(0, partitionCount).Select(_ => 
      dynamicPartitioner.GetEnumerator()).ToArray(); 
    } 

    public override IEnumerable<T> GetDynamicPartitions() 
    { 
     return _collection.GetConsumingEnumerable(); 
    } 
}