2017-10-11 30 views
2

我想用Rx缓冲器的功能:的Rx .NET - 力缓冲发出

var source = new Subject<Price>(); 
var buffer = source 
    .Buffer(TimeSpan.FromSeconds(30), 5) 
    .Where(p => p.Any()); 

这意味着EMIT(发布给用户)时,缓冲区达到5秒或30秒的大小自上次EMIT已经发生了。

但我需要能够按需发射 - 例如,当我收到高优先级的序列项目。然后我想将它添加到observable(source.OnNext()),并以某种方式强制它发出(这意味着返回缓冲区中的所有元素并清除它)。

我知道,我可以添加以下代码:

var flusher = new Subject<Price>(); 
var closing = flusher.Select(x => new List<Price> {x}); 
var query = buffer.Merge(closing).Subscribe(something); 

,并调用flusher.OnNext(highPriorityItem),我会把它发出的。

但在这种情况下,我有两个独立的序列与两个不同的发射。当缓冲区已满或特定项目按顺序出现时,我需要一个发射。

Force flush count-type Observable.Buffer c#Force flush to Observable.Buffer c#似乎并不适合我

+0

'[...]似乎并不适合me'有什么特别的原因吗?后者似乎完全是你要求的...... – decPL

+0

有一些单元相关的定义和定时器,而不是缓冲区。我不知道/了解他们如何解决我的问题 – brolly87

+0

您是否尝试过实施这些?只需用'Price'类替换'Unit'即可。如果你有一个特定的问题,描述它,并希望有人可以提供帮助。 – decPL

回答

2

编辑:@Enigmativity是完全正确的,是指他的回答。留下这个完整的,希望能够更容易地确定这里的思考过程。

试试如下:

var input = new Subject<Price>(); //your input observable 

var flush = new Subject<long>(); //used to manually flush the 'buffer' for important prices 
var timeBuffer 
    = Observable.Timer(TimeSpan.FromSeconds(10)); //controls the time-based part of 'buffer' 
var sizeBuffer = input.Buffer(5).Select(l => 0L); //controls the size-based part of 'buffer' 

var bufferedInput = input.Window(()=>Observable.Merge(timeBuffer, sizeBuffer, flush)) 
         .SelectMany(w => w.ToList()) 
         .Subscribe(w => DO_SOMETHING_WITH_PRICES(w)); 

//Flush on important price (NOTE - order of the two subscriptions matter) 
input.Where(p => p.IS_IMPORTANT).Subscribe(p => flush.OnNext(0L)); 
+0

这几乎可以,但是当我通过OnNext推入重要价格时,缓冲区中的所有元素都会得到但是这个重要的价格并没有。它会在下一轮(当缓冲区大小为5或30秒后)得到满足。我希望它成为第一个发射的一部分 – brolly87

+0

这就是为什么我写道订阅的顺序很重要。确保你在订阅转发重要价格到'flush'之前订阅'缓冲区'。 – decPL

+1

谢谢,最初忽略了。现在工作! – brolly87

4

我认为decPL具有的基本理念就在这里,但他的解决办法并不稳定。根据input observable的调度程序,即使按正确的顺序订阅,您也可能会得到不可预知的结果。这是因为有多个独立订阅input。您需要全部通过.Publish(...)调用才能确保只有一个订阅。

此外,它需要一种清理方式,当订阅处置。所以它也需要运行一个.Create(...)调用。

方法如下:

var input = new Subject<Price>(); 

IObservable<IList<Price>> query = 
    input 
     .Publish(i => 
      Observable 
       .Create<IList<Price>>(o => 
       { 
        var timeBuffer = 
         Observable 
          .Timer(TimeSpan.FromSeconds(10.0)) 
          .Select(n => Unit.Default); 
        var flush = 
         i 
          .Where(p => p.IS_IMPORTANT) 
          .Select(n => Unit.Default); 
        var sizeBuffer = 
         i 
          .Buffer(5) 
          .Select(l => Unit.Default); 
        return 
         i 
          .Window(() => Observable.Merge(timeBuffer, sizeBuffer, flush)) 
          .SelectMany(w => w.ToList()) 
          .Subscribe(o); 
       })); 

query.Subscribe(w => DO_SOMETHING_WITH_PRICES(w)); 
+0

如果'IS_IMPORTANT'是唯一的特殊“冲洗”情况,您是否也可以将'flush'主体替换为Observable:'i.Where(p => p.IS_IMPORTANT).Select(_ => Unit.Default) ')? – supertopi

+0

@supertopi - 良好的通话 - 我已经修复了。 – Enigmativity