2011-03-31 216 views
1

我正在为Web服务编写一个相对简单的“代理”应用程序。总体思路是,TCP服务器(带有异步连接)将从客户端读取(字符串)数据,并将该数据(作为读取回调函数的一部分)放入两个队列(Q1和Q2)之一。另一个线程将读取这些队列中的数据并将其传递给Web服务。 Q1中的数据应优先于Q2中的任何数据。.NET生产者 - 消费者问题

我一直在阅读有关生产者/消费者模式,这似乎是我想要实现的关于队列。由于我的入队和出队操作将在不同的线程上发生,似乎很明显,我的队列需要线程安全并支持某种锁定机制?这是一个.NET 4.0应用程序,我在新的BlockingCollection和ConcurrentQueue类中看到了文档,但我不确定在这种情况下究竟有什么区别或者如何实现它们。任何人都可以对此有所了解吗?谢谢!

回答

2

我会做像下面的类。当您生成一个项目以将其添加到其中一个队列时,您可以拨打Enqueue()。这种方法总是立即返回(几乎)。在另一个线程中,当您准备好使用某个物品时,请致电Dequeue()。它试图首先从高优先级队列中取出。如果在任何队列中没有可用项目,则呼叫阻止。当你完成生产时,你可以拨打Complete()。在该呼叫已经完成并且两个队列都是空的情况下,下一个呼叫(或当前被阻止的呼叫)到Dequeue()将抛出InvalidOperationException

如果您的制片人长时间比您的消费者更快,您应该排队(new BlockingCollection<T>(capacity))。但在这种情况下,如果只有一个线程同时产生低优先级和高优先级项目,那么高优先级项目可能需要等待低优先级项目。你可以通过一个线程来产生高优先级的项目和一个低优先级的项目来解决这个问题。或者你只能绑定高优先级的队列,并希望你不会一次获得一百万个低优先级的项目。

class Worker<T> 
{ 
    BlockingCollection<T> m_highPriorityQueue = new BlockingCollection<T>(); 
    BlockingCollection<T> m_lowPriorityQueue = new BlockingCollection<T>(); 

    public void Enqueue(T item, bool highPriority) 
    { 
     BlockingCollection<T> queue; 
     if (highPriority) 
      queue = m_highPriorityQueue; 
     else 
      queue = m_lowPriorityQueue; 

     queue.Add(item); 
    } 

    public T Dequeue() 
    { 
     T result; 

     if (!m_highPriorityQueue.IsCompleted) 
     { 
      if (m_highPriorityQueue.TryTake(out result)) 
       return result; 
     } 

     if (!m_lowPriorityQueue.IsCompleted) 
     { 
      if (m_lowPriorityQueue.TryTake(out result)) 
       return result; 
     } 

     if (m_highPriorityQueue.IsCompleted && m_lowPriorityQueue.IsCompleted) 
      throw new InvalidOperationException("All work is done."); 
     else 
     { 
      try 
      { 
       BlockingCollection<T>.TakeFromAny(
        new[] { m_highPriorityQueue, m_lowPriorityQueue }, 
        out result); 
      } 
      catch (ArgumentException ex) 
      { 
       throw new InvalidOperationException("All work is done.", ex); 
      } 

      return result; 
     } 
    } 

    public void Complete() 
    { 
     m_highPriorityQueue.CompleteAdding(); 
     m_lowPriorityQueue.CompleteAdding(); 
    } 
} 
+0

非常感谢!我非常欣赏这些信息和建议! – user685869 2011-04-01 14:00:03

0

BlockingCollection默认使用ConcurrentQueue。应该适合你的应用。如果您将F#与邮箱和异步块一起使用,可能会更容易。我早些时候做了一个普通实现的示例文章。

Map/reduce with F# Agent or MailboxProcessor