2012-08-14 64 views
16

我正在使用BlockingCollection来实现生产者/消费者模式。我有一个异步循环,用要处理的数据填充集合,然后在稍后时间由客户端访问。数据包到达稀疏,我希望轮询在不使用阻塞呼叫的情况下完成。异步取消阻止收集

实质上,我正在寻找类似BeginTakeEndTake的东西,它不存在于阻塞集合中,以便我可以在回调中使用内部线程池。无论如何,它不一定是BlockingCollection。任何我需要的东西都会很棒。

这就是我现在得到的。 _bufferedPacketsBlockingCollection<byte[]>

public byte[] Read(int timeout) 
{ 
    byte[] result; 
    if (_bufferedPackets.IsCompleted) 
    { 
     throw new Exception("Out of packets"); 
    } 
    _bufferedPackets.TryTake(out result, timeout);  
    return result; 
} 

我想这是这样的,在伪代码:

public void Read(int timeout) 
{ 
    _bufferedPackets.BeginTake(result => 
     { 
      var bytes = _bufferedPackets.EndTake(result); 
      // Process the bytes, or the resuting timeout 
     }, timeout, _bufferedPackets); 
} 

什么是我对这个选择?我不希望任何线程处于等待状态,因为有很多其他IO东西要处理,并且我会很快耗尽线程。

更新:我已经重写有问题的代码,以不同的方式使用异步过程,实质上交换基础上的回调,如果有超时限制内等待请求。这工作正常,但如果有一种方法可以做到这一点,而不依赖于定时器并交换周围可能导致竞争条件且很难写入(并理解)的lambda的方法,它仍然会很棒。我已经用自己的异步队列实现解决了这个问题,但如果有更标准和经过充分测试的选项,它仍然非常棒。

+0

目前,我认为没有任何TPL集合提供除了ObservableCollection之外的异步方法。你怎么看 ? – 2012-08-14 10:12:21

+0

你可以把它包装在一个任务中,任务= Task.Factory.StartNew (()=> {//你的代码返回字节[]});'但是这不是智慧,并且必须有更好的方法。 – MoonKnight 2012-08-14 10:16:32

+0

包装在任务中将消耗一个将被锁定在等待句柄中的任务。由于有很多任务正在进行,这将永远占用一项任务,这将使我不幸地在游泳池中耗尽任务。 – Dervall 2012-08-14 11:01:37

回答

0

所以看起来并不是一个内置的选择,我出去努力做我想做的实验。事实证明,为了使这项工作与旧的异步模式的其他用户大致相同,需要做大量的工作。

public class AsyncQueue<T> 
{ 
    private readonly ConcurrentQueue<T> queue; 
    private readonly ConcurrentQueue<DequeueAsyncResult> dequeueQueue; 

    private class DequeueAsyncResult : IAsyncResult 
    { 
     public bool IsCompleted { get; set; } 
     public WaitHandle AsyncWaitHandle { get; set; } 
     public object AsyncState { get; set; } 
     public bool CompletedSynchronously { get; set; } 
     public T Result { get; set; } 

     public AsyncCallback Callback { get; set; } 
    } 

    public AsyncQueue() 
    { 
     dequeueQueue = new ConcurrentQueue<DequeueAsyncResult>(); 
     queue = new ConcurrentQueue<T>(); 
    } 

    public void Enqueue(T item) 
    { 
     DequeueAsyncResult asyncResult; 
     while (dequeueQueue.TryDequeue(out asyncResult)) 
     { 
      if (!asyncResult.IsCompleted) 
      { 
       asyncResult.IsCompleted = true; 
       asyncResult.Result = item; 

       ThreadPool.QueueUserWorkItem(state => 
       { 
        if (asyncResult.Callback != null) 
        { 
         asyncResult.Callback(asyncResult); 
        } 
        else 
        { 
         ((EventWaitHandle) asyncResult.AsyncWaitHandle).Set(); 
        } 
       }); 
       return; 
      } 
     } 
     queue.Enqueue(item); 
    } 

    public IAsyncResult BeginDequeue(int timeout, AsyncCallback callback, object state) 
    { 
     T result; 
     if (queue.TryDequeue(out result)) 
     { 
      var dequeueAsyncResult = new DequeueAsyncResult 
      { 
       IsCompleted = true, 
       AsyncWaitHandle = new EventWaitHandle(true, EventResetMode.ManualReset), 
       AsyncState = state, 
       CompletedSynchronously = true, 
       Result = result 
      }; 
      if (null != callback) 
      { 
       callback(dequeueAsyncResult); 
      } 
      return dequeueAsyncResult; 
     } 

     var pendingResult = new DequeueAsyncResult 
     { 
      AsyncState = state, 
      IsCompleted = false, 
      AsyncWaitHandle = new EventWaitHandle(false, EventResetMode.ManualReset), 
      CompletedSynchronously = false, 
      Callback = callback 
     }; 
     dequeueQueue.Enqueue(pendingResult); 
     Timer t = null; 
     t = new Timer(_ => 
     { 
      if (!pendingResult.IsCompleted) 
      { 
       pendingResult.IsCompleted = true; 
       if (null != callback) 
       { 
        callback(pendingResult); 
       } 
       else 
       { 
        ((EventWaitHandle)pendingResult.AsyncWaitHandle).Set(); 
       } 
      } 
      t.Dispose(); 
     }, new object(), timeout, Timeout.Infinite); 

     return pendingResult; 
    } 

    public T EndDequeue(IAsyncResult result) 
    { 
     var dequeueResult = (DequeueAsyncResult) result; 
     return dequeueResult.Result; 
    } 
} 

我不是太肯定的IsComplete属性的同步,而我不是如何dequeueQueue只拿到后续Enqueue调用清理太热。我不确定什么时候是正确的时间来等待手柄,但这是迄今为止我所得到的最佳解决方案。

请不要以任何方式考虑此生产质量代码。我只是想展示一下我如何保持所有线程不用等待锁定而旋转的一般要点。我确信这充满了各种各样的边缘情况和错误,但它满足了要求,我想回馈给那些遇到问题的人。

+0

我不完全理解你的线程模型。无论回调是否已完成或超时,您都可以在EndDequeue结果中进行访问。如果你遍历了IsCompleted属性,直到你得到一个答案,你仍然阻塞一个线程。 – 2012-08-29 22:40:30

+0

它不会因为它在一个定时器上运行而被阻塞,该定时器不会启动线程,而只是在该池上排队任务。 Enddequeue也会在超时时被调用,结果将在这些情况下被默认。 – Dervall 2012-08-30 05:11:43

+0

我的意思是谁叫EndDequeue?开始/结束模式确保您可以产生工作,但在某些时候需要调用相应的End方法。你目前所做的就是在Enqueue方法中注册回调并调用它们,并在此期间通过定时器稍微等待一些数据的到来。你可以在没有定时器的情况下通过一个观察者队列来完成它,在这个队列中Enqueue存储它们的入队时间。当数据到达时,您将所有过时的观察者出列并跳过它们并调用尚未到期的回调。 – 2012-08-30 06:32:45

0

我可能会误解你的情况,但是你不能使用非阻塞集合吗?

我创造了这个例子来说明:

using System; 
using System.Collections.Concurrent; 
using System.Threading; 
using System.Threading.Tasks; 

namespace AsyncTakeFromBlockingCollection 
{ 
    class Program 
    { 
     static void Main(string[] args) 
     { 
      var queue = new ConcurrentQueue<string>(); 

      var producer1 = Task.Factory.StartNew(() => 
      { 
       for (int i = 0; i < 10; i += 1) 
       { 
        queue.Enqueue("======="); 
        Thread.Sleep(10); 
       } 
      }); 

      var producer2 = Task.Factory.StartNew(() => 
      { 
       for (int i = 0; i < 10; i += 1) 
       { 
        queue.Enqueue("*******"); 
        Thread.Sleep(3); 
       } 
      }); 

      CreateConsumerTask("One ", 3, queue); 
      CreateConsumerTask("Two ", 4, queue); 
      CreateConsumerTask("Three", 7, queue); 

      producer1.Wait(); 
      producer2.Wait(); 
      Console.WriteLine(" Producers Finished"); 
      Console.ReadLine(); 
     } 

     static void CreateConsumerTask(string taskName, int sleepTime, ConcurrentQueue<string> queue) 
     { 
      Task.Factory.StartNew(() => 
      { 
       while (true) 
       { 
        string result; 
        if (queue.TryDequeue(out result)) 
        { 
         Console.WriteLine(" {0} consumed {1}", taskName, result); 
        } 
        Thread.Sleep(sleepTime); 
       } 
      }); 
     } 
    } 
} 

这里是程序的输出

enter image description here

我相信BlockingCollection旨在包裹并发收集和提供一种机制,允许多个消费者阻止;等待生产者。这种用法似乎与您的要求相悖。

我发现这article about the BlockingCollection class是有帮助的。

+0

不幸的是我不能这么做。 IO完成时会有很多单独的队列被稀疏地填满。可能有成千上万种。这些仅在发生另一个IO事件时消耗,该事件在IO完成回调中运行,因此无法阻止。实质上,很少有生产者和消费者,都在IO完成上运行。消费者需要知道该集合是否在给定的超时内添加了项目,而不将自己置于阻塞调用中。 – Dervall 2012-08-24 06:22:43

+0

消费者如何映射到队列?每个队列有一名消费者?消费者是否遍历队列寻找产品? – rtev 2012-08-24 12:10:15

+0

消费者异步到达IO回调,并且需要将事情从集合中取出,或者等待一段时间,然后放弃在此期间集合没有充满数据的情况。没有迭代,它基本上是一个队列 – Dervall 2012-08-24 12:14:27

0

我很确定BlockingCollection<T>不能做到这一点,你不得不推出自己的。我想出了这个:

class NotifyingCollection<T> 
{ 
    private ConcurrentQueue<Action<T>> _subscribers = new ConcurrentQueue<Action<T>>(); 
    private ConcurrentQueue<T> _overflow = new ConcurrentQueue<T>(); 

    private object _lock = new object(); 

    public void Add(T item) 
    { 
     _overflow.Enqueue(item); 
     Dispatch(); 
    } 

    private void Dispatch() 
    { 
     // this lock is needed since we need to atomically dequeue from both queues... 
     lock (_lock) 
     { 
      while (_overflow.Count > 0 && _subscribers.Count > 0) 
      { 
       Action<T> callback; 
       T item; 

       var r1 = _overflow.TryDequeue(out item); 
       var r2 = _subscribers.TryDequeue(out callback); 

       Debug.Assert(r1 && r2); 
       callback(item); 
       // or, optionally so that the caller thread's doesn't take too long ... 
       Task.Factory.StartNew(() => callback(item)); 
       // but you'll have to consider how exceptions will be handled. 
      } 
     } 
    } 

    public void TakeAsync(Action<T> callback) 
    { 
     _subscribers.Enqueue(callback); 
     Dispatch(); 
    } 
} 

我用它调用TakeAsync()Add()充当回调线程的线程。当您拨打Add()TakeAsync()时,它会尝试将所有排队的项目分配到排队的回叫。这样就没有创建线程,只是坐在那里睡觉,等待被发信号。

该锁定有点丑陋,但您可以在不锁定的情况下排队并在多个线程上订阅。我找不到一种方法来执行相当于的操作,只有在其他队列上没有使用该锁的情况下才可以退出。

注意:我只用最少的几个线程测试了它。

+0

嗯,是的。唯一的问题是,你将使用你的方法创建一个锁定的线程,考虑到非常高的负载,这将是危险的。 TryTake呼叫块。在您有成千上万个这样的调用的情况下,您将耗尽任务线程池中的线程并锁定您的应用程序。 – Dervall 2012-08-29 19:32:59

+0

啊!你完全不需要阻塞线程;我以为你只是想确保调用线程不会被阻塞。我现在明白了。 – atanamir 2012-08-29 20:29:04

+0

好的,修改我的答案,我认为会做你正在寻找的... – atanamir 2012-08-29 21:38:21