我试图实现一个相当简单的生产者/消费者风格的应用程序与多个生产者和一个消费者。生产者/消费者模式与批生产者
研究使我到BlockingCollection<T>
这是有用的,让我实现如下一个长期运行的消费者任务:
var c1 = Task.Factory.StartNew(() =>
{
var buffer = new List<int>(BATCH_BUFFER_SIZE);
foreach (var value in blockingCollection.GetConsumingEnumerable())
{
buffer.Add(value);
if (buffer.Count == BATCH_BUFFER_SIZE)
{
ProcessItems(buffer);
buffer.Clear();
}
}
});
的ProcessItems
函数提交缓冲到数据库和它在批量工作。然而,这个解决方案是次优的。在男爵制作期间,可能需要一段时间才能填充缓冲区,这意味着数据库已过期。
更理想的解决方案是在30秒计时器上运行该任务或用超时短接foreach
。
我跑了定时器的想法,并与此想出了:
syncTimer = new Timer(new TimerCallback(TimerElapsed), blockingCollection, 5000, 5000);
private static void TimerElapsed(object state)
{
var buffer = new List<int>();
var collection = ((BlockingCollection<int>)state).GetConsumingEnumerable();
foreach (var value in collection)
{
buffer.Add(value);
}
ProcessItems(buffer);
buffer.Clear();
}
这具有foreach
将被阻止,直到最后,击败定时器的目的明确的问题。
任何人都可以提供一个方向吗?我基本上需要定期对BlockingCollection
进行快照,并处理内容。也许BlockingCollection
是错误的类型?
我这个实现这完美的作品去了。它每隔x秒运行一次,只推送可用的(不等待)。谢谢你的帮助。 https://gist.github.com/AntSwift/d2faa4f43d1a6d594172 –
一个完整的工作版本现在可在https://github.com/AntSwift/AS.BlockingCollectionDemo –
我相信你的代码示例会受到竞争条件的影响,因为定时器机制。如果您的消费者无法在计时器期限内完成任务,则计时器将触发而不会导致产生另一个消费者。 –