2017-02-02 111 views
-1

什么是最好的Queue在C#中使用的数据结构,当队列需要在多个线程上可以执行Enqueue(),但只需要在单个主线程上执行Dequeue()?我的螺纹结构是这样的:队列和线程

  • 主线 - 消费者
  • 子线程1 - 生产者
  • 子线程2 - 生产者
  • 子Thread3 - 生产者

我有一个单独的Queue<T> queue,保存子线程和主线程调用产生的所有项目queue.Dequeue()直到它是空的。为此,我在主线程上调用了以下函数。

public void ConsumeItems() 
{ 
    while (queue.Count > 0) 
    { 
     var item = queue.Dequeue(); 
     ... 
    } 
} 

主线程通过每个线程循环调用此函数一次,我要确保我在一个线程安全的访问庄园,但queue我也想避免锁定queue如果可能的性能的原因。

+2

假设你会想从[system.collections.concurrent(https://msdn.microsoft.com/en-us/library/system.collections.concurrent(V = vs.110)的东西。 ASPX)。 – Equalsk

+1

我会避免明确的队列管理,而不是看看TPL Dataflow库。 –

回答

1

你想要使用的是一个BlockingCollection<T>,默认情况下是由ConcurrentQueue<T>支持。为了让项目从队列中当队列为空的foreach会阻塞线程,一旦一个项目变为可用解锁你会使用.GetConsumingEnumerable()从一个foreach

public BlockingCollection<Item> queue = new BlockingCollection<Item>(); 

public void LoadItems() 
{ 
    var(var item in SomeDataSource()) 
    { 
     queue.Add(item); 
    } 
    queue.CompleteAdding(); 
} 

public void ConsumeItems() 
{ 
    foreach(var item in queue.GetConsumingEnumerable()) 
    { 
     ... 
    } 
} 

内。一旦调用.CompleteAdding(),foreach将完成处理队列中的所有项目,但一旦它为空,它将退出该foreach块。

但是,在你这样做之前,我建议你看看TPL Dataflow,它不需要管理队列或线程了。它可以让你建立逻辑链,并且链中的每个块可以具有单独的并发级别。

public Task ProcessDataAsync(IEnumerable<SomeInput> input) 
{ 
    using(var outfile = new File.OpenWrite("outfile.txt")) 
    { 
     //Create a convert action that uses the number of processors on the machine to create parallel blocks for processing. 
     var convertBlock = new TransformBlock<SomeInput, string>(x => CpuIntensiveConversion(x), new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = Enviorment.ProcessorCount}); 

     //Create a single threaded action that writes out to the textwriter. 
     var writeBlock = new ActionBlock<string>(x => outfile.WriteLine(x)) 

     //Link the convert block to the write block. 
     convertBlock.LinkTo(writeBlock, new DataflowLinkOptions{PropagateCompletion = true}); 

     //Add items to the convert block's queue. 
     foreach(var item in input) 
     { 
       await convertBlock.SendAsync(); 
     } 

     //Tell the convert block we are done adding. This will tell the write block it is done processing once all items are processed. 
     convertBlock.Complete(); 

     //Wait for the write to finish writing out to the file; 
     await writeBlock.Completion; 
    } 
} 
+0

哎呀,我向后写了我的例子。一个生产者,许多消费者。解决方案不会改变,只是例子需要。 –