你想要使用的是一个BlockingCollection<T>
,默认情况下是由ConcurrentQueue<T>
支持。为了让项目从队列中当队列为空的foreach会阻塞线程,一旦一个项目变为可用解锁你会使用.GetConsumingEnumerable()
从一个foreach
public BlockingCollection<Item> queue = new BlockingCollection<Item>();
public void LoadItems()
{
var(var item in SomeDataSource())
{
queue.Add(item);
}
queue.CompleteAdding();
}
public void ConsumeItems()
{
foreach(var item in queue.GetConsumingEnumerable())
{
...
}
}
内。一旦调用.CompleteAdding()
,foreach将完成处理队列中的所有项目,但一旦它为空,它将退出该foreach块。
但是,在你这样做之前,我建议你看看TPL Dataflow,它不需要管理队列或线程了。它可以让你建立逻辑链,并且链中的每个块可以具有单独的并发级别。
public Task ProcessDataAsync(IEnumerable<SomeInput> input)
{
using(var outfile = new File.OpenWrite("outfile.txt"))
{
//Create a convert action that uses the number of processors on the machine to create parallel blocks for processing.
var convertBlock = new TransformBlock<SomeInput, string>(x => CpuIntensiveConversion(x), new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = Enviorment.ProcessorCount});
//Create a single threaded action that writes out to the textwriter.
var writeBlock = new ActionBlock<string>(x => outfile.WriteLine(x))
//Link the convert block to the write block.
convertBlock.LinkTo(writeBlock, new DataflowLinkOptions{PropagateCompletion = true});
//Add items to the convert block's queue.
foreach(var item in input)
{
await convertBlock.SendAsync();
}
//Tell the convert block we are done adding. This will tell the write block it is done processing once all items are processed.
convertBlock.Complete();
//Wait for the write to finish writing out to the file;
await writeBlock.Completion;
}
}
假设你会想从[system.collections.concurrent(https://msdn.microsoft.com/en-us/library/system.collections.concurrent(V = vs.110)的东西。 ASPX)。 – Equalsk
我会避免明确的队列管理,而不是看看TPL Dataflow库。 –