2013-03-29 16 views
0

我想有一种队列,其中一个源输入数据,而另一边会有消费者等待,当他们检测到队列不为空时开始执行数据直到它们停止。但是重要的是,如果队列被清空,他们仍然会继续观看队列,以便如果有更多数据弹出,他们将能够使用它。我发现的multiple consumer and multiple producers作为消费者嵌套在生产者,在我的情况下,我不能这样做,因为我将有一个单一的来源和消费者承诺排队,直到我阻止他们。因此不是串联的,而是消费者和生产者并行执行。单源生产者并行工作的多个并发者

将xecutig消费者和

Parallel.Invoke(() => producer(),() => consumers()); 

的问题,因此是我怎么会执行队列这有时并行

+3

有你看着'BlockingCollection '?你可以创建一个正确的类型,并将它传递给'producer()'和'consumers()'。 –

+0

@MthetheWWatson将会做 –

+0

您需要阅读文档,但是您将从每个消费者线程调用'GetConsumingEnumerable()',并将其与'foreach'一起使用。生产者线程会将东西添加到集合中,并在完成生产时调用'CompleteAdding()'。这将自动使所有消费者线程退出他们的foreach循环。 –

回答

0

空的内容并行生产者可以解决这个问题相对容易使用BlockingCollection<T>

您可以使用一个作为队列,并将其引用传递给producer()和每个consumers()

您将从每个消费者线程调用GetConsumingEnumerable(),并使用它与foreach

生产者线程会将项目添加到集合中,并在完成生产任务时调用CompleteAdding()。这将自动使所有消费者线程退出他们的foreach循环。

这里是一个基本的例子(没有错误处理)。对Thread.Sleep()的调用是模拟负载,不应以实际代码使用。

using System; 
using System.Collections.Concurrent; 
using System.Threading; 
using System.Threading.Tasks; 

namespace Demo 
{ 
    internal class Program 
    { 
     private static void Main(string[] args) 
     { 
      ThreadPool.SetMinThreads(10, 0); // To help the demo; not needed in real code. 
      var plant = new ProcessingPlant(); 
      plant.Process(); 
      Console.WriteLine("Work complete."); 
     } 
    } 

    public sealed class ProcessingPlant 
    { 
     private readonly BlockingCollection<string> _queue = new BlockingCollection<string>(); 

     public void Process() 
     { 
      Parallel.Invoke(producer, consumers); 
     } 

     private void producer() 
     { 
      for (int i = 0; i < 100; ++i) 
      { 
       string item = i.ToString(); 
       Console.WriteLine("Producer is queueing {0}", item); 
       _queue.Add(item); // <- Here's where we add an item to the queue. 
       Thread.Sleep(0); 
      } 

      _queue.CompleteAdding(); // <- Here's where we make all the consumers 
     }       // exit their foreach loops. 

     private void consumers() 
     { 
      Parallel.Invoke(
       () => consumer(1), 
       () => consumer(2), 
       () => consumer(3), 
       () => consumer(4), 
       () => consumer(5) 
      ); 
     } 

     private void consumer(int id) 
     { 
      Console.WriteLine("Consumer {0} is starting.", id); 

      foreach (var item in _queue.GetConsumingEnumerable()) // <- Here's where we remove items. 
      { 
       Console.WriteLine("Consumer {0} read {1}", id, item); 
       Thread.Sleep(0); 
      } 

      Console.WriteLine("Consumer {0} is stopping.", id); 
     } 
    } 
} 

(我知道这是使用额外的线程刚开始消费者,但我就是这么做的,以避免混淆真正的问题 - 这是演示如何使用BlockingCollection的)

相关问题