2015-11-10 26 views
1

我想知道是否有办法采取可观察的流并使用* While运算符,特别是TakeWhile,SkipWhile和BufferWhile,以便它们的订阅者不会收到一个.OnComplete当bool'while'条件满员时?“永不停息”不经意间,BufferWhile和SkipWhile RX.Net序列

当我开始使用.TakeWhile/SkipWhile和BufferWhile运算符时,我认为它们不会终止/ .OnComplete(),但仅在布尔条件满足时才会发出(不)。

它可能会更有意义用一个例子:

我有指示,即一个实例是否忙或不和数据的可观察到的流的布尔标志:

private bool IsBusy { get;set; } 
private bool IgnoreChanges { get;set; } 

private IObservable<int> Producer { get;set; } 
private IDisposable ConsumerSubscription { get;set; } 

..和使用/设置在RX流(多个)类似的(简化的)

private void SetupRx() 
{ 
    ConsumerSubscription = Producer 
     .SkipWhile(_ => IgnoreChanges == true) // Drop the producer's stream of ints whenever the IgnoreChanges flag is set to true, but forward them whenever the IgnoreChanges flag is set to false 
     .BufferWhile(_ => IsBusy == true) // for all streamed instances buffer them as long as we are busy handling the previous one(s) 
     .Subscribe(i => DoSomething(i)); 
} 

private void DoSomething(int i) 
{ 
    try 
    { 
     IsBusy = true; 
     // ... do something 
    } 
    finally 
    { 
     IsBusy = false; 
    } 
} 

的.SkipeWhile/.BufferWhile不应完整/的onComplete(..)每当IsBusy/IgnoreChanges标志SWI从真到假再回来,但保持流活着。

这是不知何故可以用RX.Net开箱即可和/或有人知道如何做到这一点?

+0

只是为了澄清,RX.net中没有BufferWhile(我的错误) –

回答

4

要从IObservable<T>源丢弃OnCompleted消息,ConcatObservable.Never<T>()简单:

source.TakeWhile(condition).Concat(Observable.Never<T>()) 

手动订阅的IObservable<T>源,认购结束,只有当你手动退订,你可以使用PublishIConnectableObservable<T>

var connectableSource = source.Publish(); 
// To subscribe to the source: 
var subscription = connectableSource.Connect(); 
... 
// To unsubscribe from the source: 
subscription.Dispose(); 

所有这些说,我认为你正在接近这个错误。如果它正确完成,你将不需要上述技巧。看看你的查询:

ConsumerSubscription = Producer 
    // Drop the producer's stream of ints whenever the IgnoreChanges flag 
    // is set to true, but forward them whenever the IgnoreChanges flag is set to false 
    .SkipWhile(_ => IgnoreChanges == true) 
    // For all streamed instances buffer them as long as we are busy 
    // handling the previous one(s) 
    .BufferWhile(_ => IsBusy == true) 
    .Subscribe(i => DoSomething(i)); 

您应该使用.Where(_ => !IgnoreChanges)代替.SkipWhile(_ => IgnoreChanges)

您应该使用.Buffer(_ => IsBusy.SkipWhile(busy => busy))BehaviorSubject<bool> IsBusy而不是.BufferWhile(_ => IsBusy)

完整的代码应该是这样的:

private BehaviorSubject<bool> IsBusy { get;set; } 
private bool IgnoreChanges { get;set; } 

private IObservable<int> Producer { get;set; } 
private IDisposable ConsumerSubscription { get;set; } 

private void SetupRx() 
{ 
    ConsumerSubscription = Producer 
     .Where(_ => !IgnoreChanges) 
     .Buffer(_ => IsBusy.SkipWhile(busy => busy)) 
     .Subscribe(buffer => DoSomething(buffer)); 
} 

private void DoSomething(IList<int> buffer) 
{ 
    try 
    { 
     IsBusy.OnNext(true); 
     // Do something 
    } 
    finally 
    { 
     IsBusy.OnNext(false); 
    } 
} 

下一个改进是试图摆脱BehaviorSubject<bool> IsBusy的。主题是你想要避免的,因为它们是你必须管理的状态。

+0

感谢蒂莫西提供的关于简化我的想法的提示!我注意到在RX中没有BufferWhile,所以我弄了一点,为我的三个用例写了三个扩展方法:https://github.com/jbattermann/JB.Common/blob/master/JB。 Common.Reactive/Linq/ObservableExtensions.cs ...SkipWhile/TakeWhile实际上非常容易编写,对于(我的).Buffer虽然还不满意,但是很好。但是总的来说 - 感谢对Publish,Where等方面的回复和说明。干杯! –