2016-11-19 24 views
2

.NET中有一个BlockingCollection<T>.TakeFromAny方法。它首先尝试快速获取Take,然后默认为等待基础句柄的“缓慢”方法。我想用它来倾听提供“消息”的上游生产商和提供“结果”的下游生产商。BlockingCollection <T> .TakeFromAny,对于具有不同通用类型的集合

  • Can TakeFromAny可以使用 - 或者是有另外一种方式,不需要重新实现 - 可以监听添加异构类型Blocking Collections的集合吗?

下面的代码是类型有效,自然无法编译:

object anyValue; 
var collection = new List<BlockingCollection<object>>(); 
// following fails: cannot convert 
// from 'System.Collections.Concurrent.BlockingCollection<Message>' 
// to 'System.Collections.Concurrent.BlockingCollection<object>' 
collection.Add(new BlockingCollection<Message>()); 
// fails for same reason 
collection.Add(new BlockingCollection<Result>()); 
BlockingCollection<object>.TakeFromAny(collection.ToArray(), out anyValue); 

这将有可能对采取只处理new BlockingCollection<object>()情况和投,以避免编译类型错误,虽然这样的错误(呃) - 尤其是因为打字通过方法界面丢失了。使用包装组合类型将解决后者; fsvo'解决'。


这里没有什么东西直接与问题直接相关,虽然它提供了上下文 - 对于那些有兴趣的人。提供核心基础架构功能的代码无法使用更高级别的构造(例如Rx或TPL Dataflow)。

这里是一个基本的流程模型。生产者,代理和工作者在不同的线程上运行(这些工作者可以在同一个线程上运行,具体取决于任务调度器的功能)。

[producer] message --> [proxy] message --> [worker 1] 
      <-- results    <-- results 
            message --> [worker N..] 
            <-- results 

期望是代理侦听消息(传入)和结果(返回)。代理完成一些工作,如转换和分组,并将结果用作反馈。

把代理作为一个单独的线程将它从最初的生产源中分离出来,从而完成各种猴子业务。工作任务是为了并行性,而不是异步性,线程化(在争用被减少/消除后,尽管代理中的分组)应该允许良好的扩展。

队列在代理和工作人员之间建立(而不是具有单个输入/结果的直接任务),因为在工作人员执行时,可能会有额外的传入工作消息,它可以在结束之前处理。这是为了确保工作人员能够延长/重用其在相关工作流上建立的上下文。

+0

1.是否可以阻止每个集合的线程? 2。你需要保证“只从一个集合中获取”操作,还是更像“我想处理这两个集合中的所有项目,但我不想并行执行”? – svick

+0

@svick上游生产者(写在'消息'队列中)和扇出下游消费者/生产者(他们在'结果'队列中产生结果)可以被阻止 - 这个“代理”消费者/生产者在单独运行线程上下文,并在转换后将消息从上游移动到下游消费者,并将结果作为反馈移回上游。目标是“等待接下来的事情”,从任何队列开始,然后等待更多。 – user2864740

+0

@svick我想我最初的想法/设计是针对“使用类型化阻止集合进行民意调查/选择”。 – user2864740

回答

1

我认为这里最好的选择是将两个阻塞集合的类型更改为BlockingCollection<object>,您已经提到过,包括其缺点。

如果您不能或不想做,另一个解决办法是有一个合并BlockingCollection<object>并为每个源集合项目移动从收集到的合并一个线程:

var producerCollection = new BlockingCollection<Message>(); 
var consumerCollection = new BlockingCollection<Results>(); 

var combinedCollection = new BlockingCollection<object>(); 

var producerCombiner = Task.Run(() => 
{ 
    foreach (var item in producerCollection.GetConsumingEnumerable()) 
    { 
     combinedCollection.Add(item); 
    } 
}); 

var consumerCombiner = Task.Run(() => 
{ 
    foreach (var item in consumerCollection.GetConsumingEnumerable()) 
    { 
     combinedCollection.Add(item); 
    } 
}); 

Task.WhenAll(producerCombiner, consumerCombiner) 
    .ContinueWith(_ => combinedCollection.CompleteAdding()); 

foreach (var item in combinedCollection.GetConsumingEnumerable()) 
{ 
    // process item here 
} 

这不是非常有效,因为它阻止了两个额外的线程来完成这个任务,但它是我可以想到的最好的选择,而无需使用反射来访问TakeFromAny使用的手柄。

+0

我决定与'BlockingCollection '路线一起走更多的守卫访问。在这个过程中,我发现TakeFromAny是相当有偏见的。它总是倾向于第一个收集(如果他们有快速的收集,或者如果多个等待处理同时发出信号,则慢速收回)。 – user2864740

相关问题