2017-08-29 55 views
5

我有两个BlockingCollection<T>对象,collection1collection2。我想要消耗这些集合中的物品,并优先处理collection1中的物品。也就是说,如果两个藏品都有物品,我想首先从collection1获取物品。如果他们都没有物品,我想等待物品可用。如何从任何两个BlockingCollections中获取优先于第一个集合的项目?

我有以下代码:

public static T Take<T>(
    BlockingCollection<T> collection1, 
    BlockingCollection<T> collection2) where T:class 
{ 
    if (collection1.TryTake(out var item1)) 
    { 
     return item1; 
    } 

    T item2; 

    try 
    { 
     BlockingCollection<T>.TakeFromAny(
      new[] { collection1, collection2 }, 
      out item2); 
    } 
    catch (ArgumentException) 
    { 
     return null; 
    } 

    return item2; 
} 

此代码有望重返nullCompleteAdding叫上两个集合,它们都是空的。

我用这个代码的主要问题是,对于TakeFromAny方法的文档指定TakeFromAny将引发ArgumentException如果CompleteAdding被称为的“集合”:

的ArgumentException

馆藏的说法是一个0长度的数组或包含一个null元素或CompleteAdding()已被调用集合。

如果在任何集合上调用CompleteAdding,它会抛出吗?或两个集合?

如果CompleteAdding被调用并且收集仍然有一些项目,它会抛出?

我需要一个可靠的方法来做到这一点。

在此代码中,我试图从collection1开始,因为TakeFromAny的文档没有对如果这两个集合具有项目的项目采集顺序提供任何保证。

这也意味着如果两个收藏都是空的,然后他们在同一时间收到物品,那么我可能会首先从collection2获得物品,这很好。

编辑:

我之所以将项目添加到两个集合(而不是简单的单一集合)是第一个集合不具有上限,并且第二收集一样。

更多细节为那些有兴趣谁,为什么我需要这样的:

我在叫ProceduralDataflow一个开源项目使用此。在这里看到更多的细节https://github.com/ymassad/ProceduralDataflow

数据流系统中的每个处理节点有两个集合,一个集合将包含第一次进来的项目(所以我需要放慢生产者,如果需要),另一个集合将包含项目进入第二次(或第三次,......)次(由于数据流中的循环)。

一个集合没有上限的原因是我不想因数据流中的循环造成死锁。

回答

3

首先,简单回答您的具体问题。

如果在任何集合上调用CompleteAdding,它会抛出吗?或两个集合?

Both(all) - 但仅限于任何集合中没有可用元素时。

如果CompleteAdding被调用且收集仍然有一些项目,它会抛出?

不。如果集合中有可用元素,它将从集合中删除并返回给调用者。

结论

显然,文档还不清楚。该部分

CompleteAdding()已呼吁收集

应该已经制定不同的 - 像

或存在的任何没有可用的元素收藏和CompleteAdding()已拨打全部收藏

理由

嗯,我知道依靠实施不是一个好的做法,但是当文档是不清楚的,实行的是唯一可靠的官方消息,我能想到的。所以采取reference sourceTakeFromAnyTryTakeFromAny调用私人方法TryTakeFromAnyCore。它开始于以下情况:

ValidateCollectionsArray(collections, false); 

false这里是一个bool参数调用isAddOperation和使用ValidateCollectionsArray里面如下:

if (isAddOperation && collections[i].IsAddingCompleted) 
{ 
    throw new ArgumentException(
     SR.GetString(SR.BlockingCollection_CantAddAnyWhenCompleted), "collections"); 
} 

这是可能的地方扔ArgumentException用于收藏与CompleteAdding()之一被称为。正如我们所看到的,情况并非如此(问题1)。

然后执行继续下面的“快速通道”:

//try the fast path first 
for (int i = 0; i < collections.Length; i++) 
{ 
    // Check if the collection is not completed, and potentially has at least one element by checking the semaphore count 
    if (!collections[i].IsCompleted && collections[i].m_occupiedNodes.CurrentCount > 0 && collections[i].TryTake(out item)) 
     return i; 
} 

这证明了问题的答案#2。

最后,如果没有可用的元素的所有藏品,实施以“慢速路”通过调用另一个私有方法TryTakeFromAnyCoreSlow,用下面的评论是在实施行为的基本解释:

//Loop until one of these conditions is met: 
// 1- The operation is succeeded 
// 2- The timeout expired for try* versions 
// 3- The external token is cancelled, throw 
// 4- The operation is TryTake and all collections are marked as completed, return false 
// 5- The operation is Take and all collection are marked as completed, throw 

对于我们的问题的答案是#1和情况#5(注意单词全部)。顺便说一句,它也显示了TakeFromAnyTryTakeFromAny - 情况#4和#5之间的唯一区别,即throwreturn -1

+0

谢谢。这样做是有道理的,但文件不清楚。在'TryTakeFromAnyCoreSlow'方法中,第5个条件表示“被标记为完成”。我假设这意味着'CompleteAdding'被调用并且集合是非空的。我将代码追踪到'GetHandles'方法,我认为情况就是这样。 –

+0

我希望MSDN上有一些文档来澄清所有这些。 –

+0

确实。基本上['IsCompleted'](https://msdn.microsoft.com/en-us/library/dd267315(v = vs.110).aspx)属性 - *此集合是否已标记为完成添加,并且空。* –

相关问题