2011-10-23 42 views
23

我想知道是否存在ConcurrentQueue的实现/包装,类似于BlockingCollection,其中从集合中取出不会阻塞,而是取而代之的是异步,并会导致异步等待,直到项目被放置在队列。awaitable基于任务的队列

我已经想出了我自己的实现,但它似乎没有按预期执行。我想知道如果我正在重塑已经存在的东西。

这里是我的实现:

public class MessageQueue<T> 
{ 
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>(); 

    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
     new ConcurrentQueue<TaskCompletionSource<T>>(); 

    object queueSyncLock = new object(); 

    public void Enqueue(T item) 
    { 
     queue.Enqueue(item); 
     ProcessQueues(); 
    } 

    public async Task<T> Dequeue() 
    { 
     TaskCompletionSource<T> tcs = new TaskCompletionSource<T>(); 
     waitingQueue.Enqueue(tcs); 
     ProcessQueues(); 
     return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task; 
    } 

    private void ProcessQueues() 
    { 
     TaskCompletionSource<T> tcs=null; 
     T firstItem=default(T); 
     while (true) 
     { 
      bool ok; 
      lock (queueSyncLock) 
      { 
       ok = waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem); 
       if (ok) 
       { 
        waitingQueue.TryDequeue(out tcs); 
        queue.TryDequeue(out firstItem); 
       } 
      } 
      if (!ok) break; 
      tcs.SetResult(firstItem); 
     } 
    } 
} 
+0

哦呸.... ...... –

+21

@AdamSack:确实,但你的评论并没有帮助我。 – spender

回答

36

我不知道自由的手段的无锁解决方案,但您可以查看新的Dataflow library,这是Async CTP的一部分。一个简单的BufferBlock<T>应该足够了,例如:

BufferBlock<int> buffer = new BufferBlock<int>(); 

生产和消费是最容易通过的数据流块类型的扩展方法来完成。

生产是简单的:

buffer.Post(13); 

和消费是异步就绪:

int item = await buffer.ReceiveAsync(); 

我建议,如果可能,您使用的数据流;使这样一个缓冲区既有效又正确比第一次出现更困难。

+0

这看起来非常有希望...明天会看看。谢谢。它看起来非常像CCR端口。 – spender

+2

而不是在睡前偷看!看起来Dataflow非常适合我的需求。它似乎弥合了TPL提供的和CCR提供的(我曾经取得巨大成功的)之间的差距。这让我感到乐观的是,CCR的出色工作并没有被浪费掉。这是正确的答案(还有一些闪亮的新东西让我的牙齿进入!)谢谢@StephenCleary。 – spender

1

这可能是矫枉过正为您的使用情况下(给出的学习曲线),但Reactive Extentions提供了所有你所能想异步组成的粘合剂。

您基本上订阅了更改,它们会在您可用时推送给您,并且您可以让系统在单独的线程上推送更改。

+0

我至少对部分内容非常熟悉,但在生产中使用它有点难以理解,因为其他人可能不得不维护代码。我真的在挖掘异步/等待所带来的简单性,并将其引入到一个以前非常复杂的服务器产品中,我试图将所有的异步技术都保存在一项技术之下。 – spender

-1

你可以只使用一个BlockingCollection(使用默认ConcurrentQueue),敷在调用TakeTask这样你就可以await它:

var bc = new BlockingCollection<T>(); 

T element = await Task.Run(() => bc.Take()); 
+4

不错的主意,但我不满意阻止。我将有几千个客户,每个客户都有自己的消息队列。任何阻挡都会使船舶沉没,因为它会使线程无所事事。我想要一个可等待的,非阻塞任务的原因是,我可以在线程池中保留所有操作而不会导致线程池饿死。 – spender

0

这里是我目前使用的实施。

public class MessageQueue<T> 
{ 
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>(); 
    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
     new ConcurrentQueue<TaskCompletionSource<T>>(); 
    object queueSyncLock = new object(); 
    public void Enqueue(T item) 
    { 
     queue.Enqueue(item); 
     ProcessQueues(); 
    } 

    public async Task<T> DequeueAsync(CancellationToken ct) 
    { 
     TaskCompletionSource<T> tcs = new TaskCompletionSource<T>(); 
     ct.Register(() => 
     { 
      lock (queueSyncLock) 
      { 
       tcs.TrySetCanceled(); 
      } 
     }); 
     waitingQueue.Enqueue(tcs); 
     ProcessQueues(); 
     return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task; 
    } 

    private void ProcessQueues() 
    { 
     TaskCompletionSource<T> tcs = null; 
     T firstItem = default(T); 
     lock (queueSyncLock) 
     { 
      while (true) 
      { 
       if (waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem)) 
       { 
        waitingQueue.TryDequeue(out tcs); 
        if (tcs.Task.IsCanceled) 
        { 
         continue; 
        } 
        queue.TryDequeue(out firstItem); 
       } 
       else 
       { 
        break; 
       } 
       tcs.SetResult(firstItem); 
      } 
     } 
    } 
} 

它的工作原理不够好,但有相当多的争论对queueSyncLock,因为我做了很多使用CancellationToken的取消一些等待任务。当然,这导致相当少堵我将与BlockingCollection但看到...

我想知道如果有一个更流畅,锁定殊途同归,最终的

2

我atempt(它有创建一个“承诺”时引发的事件,它可以通过一个外部生产者可以用来知道什么时候才能产生更多的项目):

public class AsyncQueue<T> 
{ 
    private ConcurrentQueue<T> _bufferQueue; 
    private ConcurrentQueue<TaskCompletionSource<T>> _promisesQueue; 
    private object _syncRoot = new object(); 

    public AsyncQueue() 
    { 
     _bufferQueue = new ConcurrentQueue<T>(); 
     _promisesQueue = new ConcurrentQueue<TaskCompletionSource<T>>(); 
    } 

    /// <summary> 
    /// Enqueues the specified item. 
    /// </summary> 
    /// <param name="item">The item.</param> 
    public void Enqueue(T item) 
    { 
     TaskCompletionSource<T> promise; 
     do 
     { 
      if (_promisesQueue.TryDequeue(out promise) && 
       !promise.Task.IsCanceled && 
       promise.TrySetResult(item)) 
      { 
       return;          
      } 
     } 
     while (promise != null); 

     lock (_syncRoot) 
     { 
      if (_promisesQueue.TryDequeue(out promise) && 
       !promise.Task.IsCanceled && 
       promise.TrySetResult(item)) 
      { 
       return; 
      } 

      _bufferQueue.Enqueue(item); 
     }    
    } 

    /// <summary> 
    /// Dequeues the asynchronous. 
    /// </summary> 
    /// <param name="cancellationToken">The cancellation token.</param> 
    /// <returns></returns> 
    public Task<T> DequeueAsync(CancellationToken cancellationToken) 
    { 
     T item; 

     if (!_bufferQueue.TryDequeue(out item)) 
     { 
      lock (_syncRoot) 
      { 
       if (!_bufferQueue.TryDequeue(out item)) 
       { 
        var promise = new TaskCompletionSource<T>(); 
        cancellationToken.Register(() => promise.TrySetCanceled()); 

        _promisesQueue.Enqueue(promise); 
        this.PromiseAdded.RaiseEvent(this, EventArgs.Empty); 

        return promise.Task; 
       } 
      } 
     } 

     return Task.FromResult(item); 
    } 

    /// <summary> 
    /// Gets a value indicating whether this instance has promises. 
    /// </summary> 
    /// <value> 
    /// <c>true</c> if this instance has promises; otherwise, <c>false</c>. 
    /// </value> 
    public bool HasPromises 
    { 
     get { return _promisesQueue.Where(p => !p.Task.IsCanceled).Count() > 0; } 
    } 

    /// <summary> 
    /// Occurs when a new promise 
    /// is generated by the queue 
    /// </summary> 
    public event EventHandler PromiseAdded; 
} 
+0

我认为这是最好的解决方案。我已经实现了这一点并进行了广泛的测试。一些注意事项:对!promise.Task.IsCanceled的调用是不必要的。我添加了一个ManualResetEventSlim来跟踪bufferQueue何时为空,以便调用者可以阻止等待队列清空。 –

+0

你[应该处置](http://stackoverflow.com/a/21653382/298609)'CancellationTokenRegistration'你从'cancellationToken.Register'调用中获得。 – Paya