2014-06-27 194 views
3

我试图实现一个相当简单的生产者/消费者风格的应用程序与多个生产者和一个消费者。生产者/消费者模式与批生产者

研究使我到BlockingCollection<T>这是有用的,让我实现如下一个长期运行的消费者任务:

var c1 = Task.Factory.StartNew(() => 
{ 
    var buffer = new List<int>(BATCH_BUFFER_SIZE); 

    foreach (var value in blockingCollection.GetConsumingEnumerable()) 
    { 
     buffer.Add(value); 
     if (buffer.Count == BATCH_BUFFER_SIZE) 
     { 
      ProcessItems(buffer); 
      buffer.Clear(); 
     } 
    } 
}); 

ProcessItems函数提交缓冲到数据库和它在批量工作。然而,这个解决方案是次优的。在男爵制作期间,可能需要一段时间才能填充缓冲区,这意味着数据库已过期。

更理想的解决方案是在30秒计时器上运行该任务或用超时短接foreach

我跑了定时器的想法,并与此想出了:

syncTimer = new Timer(new TimerCallback(TimerElapsed), blockingCollection, 5000, 5000); 

private static void TimerElapsed(object state) 
{ 
    var buffer = new List<int>(); 
    var collection = ((BlockingCollection<int>)state).GetConsumingEnumerable(); 

    foreach (var value in collection) 
    { 
     buffer.Add(value); 
    } 

    ProcessItems(buffer); 
    buffer.Clear(); 
} 

这具有foreach将被阻止,直到最后,击败定时器的目的明确的问题。

任何人都可以提供一个方向吗?我基本上需要定期对BlockingCollection进行快照,并处理内容。也许BlockingCollection是错误的类型?

回答

5

而不是在定时器回调中使用GetConsumingEnumerable,而是使用其中的一种方法,将结果添加到列表中,直到它返回false或者您已达到令人满意的批处理大小。

BlockingCollection.TryTake Method (T) - 可能你需要什么,你根本不想进行进一步的等待。

BlockingCollection.TryTake Method (T, Int32)

BlockingCollection.TryTake Method (T, TimeSpan)

您可以轻松地提取出到扩展(未经测试):

public static IList<T> Flush<T> 
(this BlockingCollection<T> collection, int maxSize = int.MaxValue) 
{ 
    // Argument checking. 

    T next; 
    var result = new List<T>(); 

    while(result.Count < maxSize && collection.TryTake(out next)) 
    { 
     result.Add(next); 
    } 

    return result; 
} 
+0

我这个实现这完美的作品去了。它每隔x秒运行一次,只推送可用的(不等待)。谢谢你的帮助。 https://gist.github.com/AntSwift/d2faa4f43d1a6d594172 –

+0

一个完整的工作版本现在可在https://github.com/AntSwift/AS.BlockingCollectionDemo –

+0

我相信你的代码示例会受到竞争条件的影响,因为定时器机制。如果您的消费者无法在计时器期限内完成任务,则计时器将触发而不会导致产生另一个消费者。 –