4

划伤我VE结束了与下面的代码段在Rx JS的表面:的Rx JS订阅观察到多个观测量

var observer1 = Rx.Observer.create(
     function (x) { 
      console.log('Next: ' + x); 
     }, 
     function (err) { 
      console.log('Error: ' + err); 
     }, 
     function() { 
      console.log('Completed'); 
     } 
    ); 

    var observer2 = Rx.Observer.create(
     function (x) { 
      console.log('Next: ' + x); 
     }, 
     function (err) { 
      console.log('Error: ' + err); 
     }, 
     function() { 
      console.log('Completed'); 
     } 
    ); 


    var source1 = Rx.Observable.return(1); 
    var source2 = Rx.Observable.return(2); 

    var subscription1 = source1.subscribe(observer1); 
    var subscription2 = source2.subscribe(observer1); 

OUTPUT: 下一步:1 完成

JS BIN代码参考:http://goo.gl/DiHdWu

向两个流订阅相同的观察者只会产生第一个数据。然而,当订阅其他观察者时,事情按预期进行。有人可以解释发生了什么吗?

 var subscription1 = source1.subscribe(observer1); 
    var subscription2 = source2.subscribe(observer2); 

OUTPUT: 下一页:1 完成 下一页:2 完成

回答

4

是的,一个观察者可以监听多个观测量而不是你要使用的方式。这可以通过使用Merge,concat运营商来实现。代码参考jsbin

为什么你的代码不起作用?

对于Observer.create的每个呼叫,我们得到一个IObserver。一旦调用OnError或OnComplete,它将忽略任何未来的OnNext调用。

当我们使用一个观察者来订阅多个观察对象时,在第一个观察对象终止/完成后(即触发OnError/OnCompleted时),观察者将无法对任何进一步订阅的观察对象起作用。因为来自第一个observable的终止消息将导致观察者忽略来自任何进一步订阅的observable的消息。

对于您的问题,您需要使用运营商如mergeconcat这将在内部使用多个观察员和除最后一个可观察不传递任何可观完成消息(的OnError/OnCompleted)到外的观察者。

//Triggers observer1 for both observables(source1 & source2) 
var subscription = source1.concat(source2).subscribe(observer1); 

//Triggers observer2 for both observables(source1 & source2) 
var subscription = source1.merge(source2).subscribe(observer2);