我想用Rx缓冲器的功能:的Rx .NET - 力缓冲发出
var source = new Subject<Price>();
var buffer = source
.Buffer(TimeSpan.FromSeconds(30), 5)
.Where(p => p.Any());
这意味着EMIT(发布给用户)时,缓冲区达到5秒或30秒的大小自上次EMIT已经发生了。
但我需要能够按需发射 - 例如,当我收到高优先级的序列项目。然后我想将它添加到observable(source.OnNext()
),并以某种方式强制它发出(这意味着返回缓冲区中的所有元素并清除它)。
我知道,我可以添加以下代码:
var flusher = new Subject<Price>();
var closing = flusher.Select(x => new List<Price> {x});
var query = buffer.Merge(closing).Subscribe(something);
,并调用flusher.OnNext(highPriorityItem),我会把它发出的。
但在这种情况下,我有两个独立的序列与两个不同的发射。当缓冲区已满或特定项目按顺序出现时,我需要一个发射。
Force flush count-type Observable.Buffer c#和Force flush to Observable.Buffer c#似乎并不适合我
'[...]似乎并不适合me'有什么特别的原因吗?后者似乎完全是你要求的...... – decPL
有一些单元相关的定义和定时器,而不是缓冲区。我不知道/了解他们如何解决我的问题 – brolly87
您是否尝试过实施这些?只需用'Price'类替换'Unit'即可。如果你有一个特定的问题,描述它,并希望有人可以提供帮助。 – decPL