我有一个交织流,其中我split into separate sequential streams。如何在使用反应式扩展并发处理流时限制缓冲
生产者
int streamCount = 3;
new MyIEnumerable<ElementType>()
.ToObservable(Scheduler.ThreadPool)
.Select((x,i) => new { Key = (i % streamCount), Value = x })
.Subscribe(x => outputs[x.Key].OnNext(x.Value));
凡outputs[]
是其中处理所述数据流在下文中定义的受试者。 .ObserveOn()
用于同时处理流(多线程)。
消费者
var outputs = Enumerable.Repeat(0, streamCount).Select(_ => new Subject<char>()).ToArray();
outputs[0].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 0: {0}", x));
outputs[1].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 1: {0}", x));
outputs[2].ObserveOn(Scheduler.ThreadPool).Subscribe(x => Console.WriteLine("stream 2: {0}", x));
这段代码的问题是,它会读取整个枚举尽可能地快,即使输出流无法赶上。在我的情况下,枚举是一个文件流,所以这可能会导致使用大量的内存。因此,如果缓冲区达到某个阈值,我希望读数能够被阻止。