2012-06-23 20 views
4

我有一个交织流,其中我split into separate sequential streams如何在使用反应式扩展并发处理流时限制缓冲

生产者

int streamCount = 3; 

new MyIEnumerable<ElementType>() 
.ToObservable(Scheduler.ThreadPool) 
.Select((x,i) => new { Key = (i % streamCount), Value = x }) 
.Subscribe(x => outputs[x.Key].OnNext(x.Value)); 

outputs[]是其中处理所述数据流在下文中定义的受试者。 .ObserveOn()用于同时处理流(多线程)。

消费者

var outputs = Enumerable.Repeat(0, streamCount).Select(_ => new Subject<char>()).ToArray();                          

outputs[0].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 0: {0}", x)); 
outputs[1].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 1: {0}", x)); 
outputs[2].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 2: {0}", x)); 

这段代码的问题是,它会读取整个枚举尽可能地快,即使输出流无法赶上。在我的情况下,枚举是一个文件流,所以这可能会导致使用大量的内存。因此,如果缓冲区达到某个阈值,我希望读数能够被阻止。

回答

3

我已经通过对生产者和消费者使用信号灯来解决此问题,如下所示。但是,我不确定这是否是一个好的解决方案(就Rx契约,编程风格等而言)。

var semaphore = new SemaphoreSlim(MAX_BUFFERED_ELEMENTS); 

// add to producer (before subscribe) 
.Do(_ => semaphore.Wait()); 

// add to consumer (before subscribe) 
.Do(_ => semaphore.Release())) 

这可能是一个好主意,一个CancelationToken将呼叫传递给Wait()并确保它被取消时,流异常停止?

2

我认为你的解决方案非常合理。最大的问题(对前一个问题有一些背景知识)是,解决方案的“内部”目前处处暴露。只要确保当你正确的代码这个你清理以下内容:

  • 裹成的一切暴露一个方法的类:IDisposable Subscribe(<index>, Action)或交替IObservable<element> ToObservable(<index>))。返回的订阅或返回的observable将具有已经完成的所有'工作',即添加的Do操作等等。事实上,它下面的字典或列表都应该与用户完全无关,否则在这里对代码的任何更改都需要在整个地方进行更改。

  • 一个CancelationToken是一个伟大的想法,一定要取消其在任OnCompletedOnError,您可以使用重载来Do做。