我有无限的对象流。而且我的要求是,来自具有相同密钥的可观察流中的每个项目应该被同步处理,并且具有不同密钥的所有其他项目可能/应该并行处理。做到这一点(在大多数地方提到),最简单的方法是使用GroupByUntil
操作:Rx.NET GroupByUntil组终止,等待线程完成
var results = observableStream
.GroupByUntil(item => item.Id, group =>
group.Throttle(TimeSpan.FromSeconds(30), scheduler))
.SelectMany(group =>
group
.ObserveOn(scheduler)
.Select(item => ProcessItem(item)));
var disposable = results.Subscribe(result => SaveResults(result));
代码工作很好,直到我可以保证的ProcessItem(item)
的执行时间不超过30秒。否则group.Throttle(TimeSpan.FromSeconds(30), scheduler)
将关闭组的流,并且新项目到达并开始在新线程上处理的可能性很高。
所以基本上我需要知道我的线程已经完成处理所有具有特定键的项目,并且我需要在durationSelector
之内通知GroupByUntil
关于它的运算符参数。
有关如何实现此目的的任何想法?提前致谢。
你怎么知道你已经收到了最后一个特定的密钥? – NetMage
@NetMage其实我不会知道。我试图实现的是,只有当处理特定组的线程完成了它的工作并且队列中再没有任何东西时,我才应该开始调节(反弹)。 – Azat
'ProcessItem'是否同步?它是“异步”吗?它是否返回'IObservable'? –
Shlomo