2010-10-14 38 views

回答

5

如果您使用的是.NET 4,那么很多这是免费的。

如果您已经拥有所有项目,可以使用Parallel.ForEach。如果您需要生产者/消费者队列,则可以使用BlockingCollection<T>来包装其中一个并发集合(例如ConcurrentQueue<T>ConcurrentStack<T>)。你如何使用它取决于你;有一个blog post here进入一个详细的例子,并且可能还有其他类似的帖子。 (您可能需要查看Parallel Team Blog获取更多资料。)

+0

+1获取到4.0 :),我只是写这个也。 – TalentTuner 2010-10-14 06:25:29

+0

'Parallel.ForEach'阻塞调用线程,直到它完成所有的线程,所以这不会真的帮助'异步处理队列项目'现在呢?还是问题真的意味着“平行”而不是“异步”? – 2010-10-14 06:34:28

+0

这很好,但不幸的是,我使用.Net 3.5 – Sumee 2010-10-14 07:36:19

2

使用.NET 4任务。

var t = Task<int>.Factory.StartNew(() => ProcessItem()); 

使用ConcurrencyOptions设置该处理的最大并行度。

如果您想自己推出,请使用BlockingCollection<T>,它为线程安全集合提供了阻塞和边界功能,并为消费者实现了一个单独的线程(或多个线程)。

3

如果你不幸你可以看看生产者/消费者模式不被使用.NET 4

这里是我的代码,我拆开,我对混乱道歉,但你应该能够通过添加到项目并重新编译来使用它,然后使用生成的dll创建您的进程。

枚举为ChannelState:

public enum ChannelState 
{ 
    WaitingForSend, 
    WaitingForReceive, 
    Open 
} 

接口:

public interface IChannel<TMessage> 
{ 
    // Methods 
    TMessage Receive(); 
    void Send(TMessage message); 

    // Properties 
    bool CanReceive { get; } 
    bool CanSend { get; } 
    ChannelState State { get; } 
} 

using System; 
public interface IReceiver<TMessage> 
{ 
    // Events 
    event EventHandler<MessageReceivedEventArgs<TMessage>> MessageReceived; 

    // Methods 
    void Activate(); 
    void Deactivate(); 

    // Properties 
    bool IsActive { get; } 
} 

具体类:

using System.Collections.Generic; 
using System.Threading; 
using System; 
public class BufferedChannel<TMessage> : IChannel<TMessage> 
{ 
    // Fields 
    private int _blockedReceivers; 
    private int _blockedSenders; 
    private Queue<TMessage> _buffer; 
    private int _capacity; 
    private EventWaitHandle _capacityAvailableEvent; 
    private EventWaitHandle _messagesAvailableEvent; 

    // Methods 
    public BufferedChannel() 
    { 
     this._buffer = new Queue<TMessage>(); 
     this._messagesAvailableEvent = new EventWaitHandle(false, EventResetMode.AutoReset); 
     this._capacityAvailableEvent = new EventWaitHandle(true, EventResetMode.AutoReset); 
     this._capacity = 50; 
    } 

    public BufferedChannel(int bufferSize) 
    { 
     this._buffer = new Queue<TMessage>(); 
     this._messagesAvailableEvent = new EventWaitHandle(false, EventResetMode.AutoReset); 
     this._capacityAvailableEvent = new EventWaitHandle(true, EventResetMode.AutoReset); 
     this._capacity = 50; 
     if (bufferSize <= 0) 
     { 
      throw new ArgumentOutOfRangeException("bufferSize", bufferSize, ExceptionMessages.ChannelsBufferSizeMustBeGreaterThanZero); 
     } 
     this._capacity = bufferSize; 
    } 

    public TMessage Receive() 
    { 
     Interlocked.Increment(ref this._blockedReceivers); 
     try 
     { 
      this._messagesAvailableEvent.WaitOne(); 
     } 
     catch 
     { 
      lock (this._buffer) 
      { 
       Interlocked.Decrement(ref this._blockedReceivers); 
      } 
      throw; 
     } 
     lock (this._buffer) 
     { 
      Interlocked.Decrement(ref this._blockedReceivers); 
      this._capacityAvailableEvent.Set(); 
      if ((this._buffer.Count - 1) > this._blockedReceivers) 
      { 
       this._messagesAvailableEvent.Set(); 
      } 
      return this._buffer.Dequeue(); 
     } 
    } 

    public void Send(TMessage message) 
    { 
     Interlocked.Increment(ref this._blockedSenders); 
     try 
     { 
      this._capacityAvailableEvent.WaitOne(); 
     } 
     catch 
     { 
      lock (this._buffer) 
      { 
       Interlocked.Decrement(ref this._blockedSenders); 
      } 
      throw; 
     } 
     lock (this._buffer) 
     { 
      Interlocked.Decrement(ref this._blockedSenders); 
      this._buffer.Enqueue(message); 
      if (this._buffer.Count < this.BufferSize) 
      { 
       this._capacityAvailableEvent.Set(); 
      } 
      this._messagesAvailableEvent.Set(); 
     } 
    } 

    // Properties 
    public int BufferCount 
    { 
     get 
     { 
      lock (this._buffer) 
      { 
       return this._buffer.Count; 
      } 
     } 
    } 

    public int BufferSize 
    { 
     get 
     { 
      lock (this._buffer) 
      { 
       return this._capacity; 
      } 
     } 
     set 
     { 
      lock (this._buffer) 
      { 
       if (value <= 0) 
       { 
        throw new ArgumentOutOfRangeException("BufferSize", value, ExceptionMessages.ChannelsBufferSizeMustBeGreaterThanZero); 
       } 
       this._capacity = value; 
       if ((this._blockedSenders > 0) && (this._capacity > this._buffer.Count)) 
       { 
        this._capacityAvailableEvent.Set(); 
       } 
      } 
     } 
    } 

    public bool CanReceive 
    { 
     get 
     { 
      return true; 
     } 
    } 

    public bool CanSend 
    { 
     get 
     { 
      return true; 
     } 
    } 

    public ChannelState State 
    { 
     get 
     { 
      if (this._blockedSenders > 0) 
      { 
       return ChannelState.WaitingForReceive; 
      } 
      if (this._blockedReceivers > 0) 
      { 
       return ChannelState.WaitingForSend; 
      } 
      return ChannelState.Open; 
     } 
    } 
} 


using System; 
using System.Collections.Generic; 
using System.Threading; 
using System.ComponentModel; 
using System.Runtime.CompilerServices; 

public sealed class Receiver<TMessage> : Component, IReceiver<TMessage> 
{ 
    // Fields 
    private volatile bool _continue; 
    private object _controlLock; 
    private volatile bool _disposed; 
    private Thread _receiverThread; 
    private bool _receiving; 
    private object _receivingLock; 
    private object _threadLock; 
    [CompilerGenerated] 
    private IChannel<TMessage> channel; 

    // Events 
    public event EventHandler<MessageReceivedEventArgs<TMessage>> MessageReceived; 

    // Methods 
    public Receiver(IChannel<TMessage> channel) 
    { 
     this._controlLock = new object(); 
     this._threadLock = new object(); 
     this._receivingLock = new object(); 
     if (channel == null) 
     { 
      throw new ArgumentNullException("channel"); 
     } 
     this.Channel = channel; 
    } 

    public void Activate() 
    { 
     this.CheckDisposed(); 
     lock (this._controlLock) 
     { 
      if (this._receiverThread != null) 
      { 
       throw new InvalidOperationException(); 
      } 
      this._continue = true; 
      this._receiverThread = new Thread(new ThreadStart(this.RunAsync)); 
      this._receiverThread.IsBackground = true; 
      this._receiverThread.Start(); 
     } 
    } 

    private void CheckDisposed() 
    { 
     if (this._disposed) 
     { 
      throw new ObjectDisposedException(base.GetType().Name); 
     } 
    } 

    public void Deactivate() 
    { 
     lock (this._controlLock) 
     { 
      if (this._continue) 
      { 
       this._continue = false; 
       lock (this._threadLock) 
       { 
        if (this._receiverThread != null) 
        { 
         this.SafeInterrupt(); 
         this._receiverThread.Join(); 
         this._receiverThread = null; 
        } 
       } 
      } 
     } 
    } 

    protected override void Dispose(bool disposing) 
    { 
     base.Dispose(disposing); 
     if (disposing) 
     { 
      this.Deactivate(); 
      this._disposed = true; 
     } 
    } 

    private void OnMessageReceived(TMessage message) 
    { 
     EventHandler<MessageReceivedEventArgs<TMessage>> messageReceived = this.MessageReceived; 
     if (messageReceived != null) 
     { 
      messageReceived(this, new MessageReceivedEventArgs<TMessage>(message)); 
     } 
    } 

    private void RunAsync() 
    { 
     while (this._continue) 
     { 
      TMessage message = default(TMessage); 
      bool flag = false; 
      try 
      { 
       lock (this._receivingLock) 
       { 
        this._receiving = true; 
       } 
       message = this.Channel.Receive(); 
       flag = true; 
       lock (this._receivingLock) 
       { 
        this._receiving = false; 
       } 
       Thread.Sleep(0); 
      } 
      catch (ThreadInterruptedException) 
      { 
      } 
      if (!this._continue) 
      { 
       if (flag) 
       { 
        this.Channel.Send(message); 
        return; 
       } 
       break; 
      } 
      this.OnMessageReceived(message); 
     } 
    } 

    private void SafeInterrupt() 
    { 
     lock (this._receivingLock) 
     { 
      lock (this._threadLock) 
      { 
       if (this._receiving && (this._receiverThread != null)) 
       { 
        this._receiverThread.Interrupt(); 
       } 
      } 
     } 
    } 

    // Properties 
    protected override bool CanRaiseEvents 
    { 
     get 
     { 
      return true; 
     } 
    } 

    public IChannel<TMessage> Channel 
    { 
     [CompilerGenerated] 
     get 
     { 
      return this.channel; 
     } 
     [CompilerGenerated] 
     private set 
     { 
      this.channel = value; 
     } 
    } 

    public bool IsActive 
    { 
     get 
     { 
      lock (this._controlLock) 
      { 
       return (this._receiverThread != null); 
      } 
     } 
    } 
} 

using System; 
using System.Runtime.CompilerServices; 
public class MessageReceivedEventArgs<TMessage> : EventArgs 
{ 
    // Fields 
    [CompilerGenerated] 
    private TMessage message; 

    // Methods 
    public MessageReceivedEventArgs(TMessage message) 
    { 
     this.Message = message; 
    } 

    // Properties 
    public TMessage Message 
    { 
     [CompilerGenerated] 
     get 
     { 
      return this.message; 
     } 
     [CompilerGenerated] 
     private set 
     { 
      this.message = value; 
     } 
    } 
} 

using System.Threading; 
public class BlockingChannel<TMessage> : IChannel<TMessage> 
{ 
    // Fields 
    private TMessage _message; 
    private EventWaitHandle _messageReceiveEvent; 
    private EventWaitHandle _messageReceiveyEvent; 
    private object _sendLock; 
    private ChannelState _state; 
    private object _stateLock; 

    // Methods 
    public BlockingChannel() 
    { 
     this._state = ChannelState.Open; 
     this._stateLock = new object(); 
     this._messageReceiveyEvent = new EventWaitHandle(false, EventResetMode.AutoReset); 
     this._messageReceiveEvent = new EventWaitHandle(false, EventResetMode.AutoReset); 
     this._sendLock = new object(); 
    } 

    public TMessage Receive() 
    { 
     this.State = ChannelState.WaitingForSend; 
     this._messageReceiveyEvent.WaitOne(); 
     this._messageReceiveEvent.Set(); 
     this.State = ChannelState.Open; 
     return this._message; 
    } 

    public void Send(TMessage message) 
    { 
     lock (this._sendLock) 
     { 
      this._message = message; 
      this.State = ChannelState.WaitingForReceive; 
      this._messageReceiveyEvent.Set(); 
      this._messageReceiveEvent.WaitOne(); 
     } 
    } 

    // Properties 
    public bool CanReceive 
    { 
     get 
     { 
      return true; 
     } 
    } 

    public bool CanSend 
    { 
     get 
     { 
      return true; 
     } 
    } 

    public ChannelState State 
    { 
     get 
     { 
      lock (this._stateLock) 
      { 
       return this._state; 
      } 
     } 
     private set 
     { 
      lock (this._stateLock) 
      { 
       this._state = value; 
      } 
     } 
    } 
}