2
我试图使用Rx实现一个场景,其中有两个热点观察点。流1和流2.根据流1的数据,我需要启动流2或停止流2.然后将两个流数据组合成一个使用CombineLatest。下面的id代码,我能够想出。有条件地组合两个Rx流
有没有更好的实现方法?
而我怎样才能使它更通用,就像我将有流1,然后流2 .. n为每个流从2 .. n有条件条件2 .. n利用数据流1检查如果其他流需要启动或不能再在CombineLatest的方式将所有的数据
CODE:
IDisposable TfsDisposable = null;
// stream 1
var hotObs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1));
// stream 2
var hotObs2 = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1)).Publish();
var observerHot = hotObs.Do(a =>
{
// Based on Condition to start the second stream
if (ConditionToStartStream2)
{
TfsDisposable = TfsDisposable ?? hotObs2.Connect();
}
})
.Do(a =>
{
// Based on condition 2 stop the second stream
if (ConditionToStopStream2)
{
TfsDisposable?.Dispose();
TfsDisposable = null;
}
}).Publish();
// Merge both the stream using Combine Latest
var finalMergedData = hotObs.CombineLatest(hotObs2, (a, b) => { return string.Format("{0}, {1}", a, b); });
// Display the result
finalMergedData.Subscribe(a => { Console.WriteLine("result: {0}", a); });
// Start the first hot observable
observerHot.Connect();
嗨,我尝试使用上面提到的表达,但在订阅可观察的,我没有得到任何输出数据流。你能解释一下吗? –
@BalrajSingh - 我得到了一个输出,但它没有像我期待的那样行事 - 我有什么问题。我认为这将会接近你所需要的。我会尽快处理,并提供解释。 – Enigmativity
我确实看到了开始第二个流的条件。但是没有条件结束它。即使结果不是必需的,第二个流会继续运行吗? –