2

我试图使用Rx实现一个场景,其中有两个热点观察点。流1和流2.根据流1的数据,我需要启动流2或停止流2.然后将两个流数据组合成一个使用CombineLatest。下面的id代码,我能够想出。有条件地组合两个Rx流

  1. 有没有更好的实现方法?

  2. 而我怎样才能使它更通用,就像我将有流1,然后流2 .. n为每个流从2 .. n有条件条件2 .. n利用数据流1检查如果其他流需要启动或不能再在CombineLatest的方式将所有的数据

CODE:

 IDisposable TfsDisposable = null; 

     // stream 1 
     var hotObs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1)); 


     // stream 2 
     var hotObs2 = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1)).Publish(); 


     var observerHot = hotObs.Do(a => 
     { 
      // Based on Condition to start the second stream 
      if (ConditionToStartStream2) 
      { 
       TfsDisposable = TfsDisposable ?? hotObs2.Connect(); 
      } 
     }) 
     .Do(a => 
     { 
      // Based on condition 2 stop the second stream 
      if (ConditionToStopStream2) 
      { 
       TfsDisposable?.Dispose(); 
       TfsDisposable = null; 
      } 
     }).Publish(); 


     // Merge both the stream using Combine Latest 
     var finalMergedData = hotObs.CombineLatest(hotObs2, (a, b) => { return string.Format("{0}, {1}", a, b); }); 

     // Display the result 
     finalMergedData.Subscribe(a => { Console.WriteLine("result: {0}", a); }); 

     // Start the first hot observable 
     observerHot.Connect(); 

回答

1

有这样一出戏:

var hotObs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1.0)); 
var hotObs2 = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(0.3)); 

var query = 
    hotObs2.Publish(h2s => 
     hotObs.Publish(hs => 
      hs 
       .Select(a => a % 7 == 0 ? h2s : Observable.Empty<long>()) 
       .Switch() 
       .Merge(hs))); 

这需要两个observable,并使用在lambda内发布它们的重载来发布它们。它使它们在lambda范围内变得很热,并且防止需要管理对.Connect()的调用。

然后,我只是执行条件检查(在这种情况下是a甚至),然后返回其他流,如果没有返回一个空的流。

然后.Switch变成IObservable<long>变成IObservable<long>只通过从最新的内部可观察到的价值。

最后它与原始hs流合并。

有了上面这个例子中,我得到了以下的输出:

 
0 
1 
2 
3 
1 
2 
3 
4 
5 
6 
7 
23 
24 
25 
8 
+0

嗨,我尝试使用上面提到的表达,但在订阅可观察的,我没有得到任何输出数据流。你能解释一下吗? –

+0

@BalrajSingh - 我得到了一个输出,但它没有像我期待的那样行事 - 我有什么问题。我认为这将会接近你所需要的。我会尽快处理,并提供解释。 – Enigmativity

+0

我确实看到了开始第二个流的条件。但是没有条件结束它。即使结果不是必需的,第二个流会继续运行吗? –