2017-03-16 32 views
3

我正在构建一个Angular2应用程序,并且有两个BehaviourSubjects,我想将它们合并为一个订阅。我提出了两个http请求,并且希望在两人都回来时发起一个事件。我在看forkJoin vs combineLatest。看起来combineLatest会在任何一个behvaviorSubjects被更新时触发,而forkJoin只会在所有behaviorsSubjects被更新后触发。它是否正确?必须有一个普遍接受的模式,这不是吗?如何合并多个rxjs BehaviourSubjects

编辑
这里是我的angular2组件订阅我的behaviorSubjects的一个示例:

export class CpmService { 

    public cpmSubject: BehaviorSubject<Cpm[]>; 

    constructor(private _http: Http) { 
     this.cpmSubject = new BehaviorSubject<Cpm[]>(new Array<Cpm>()); 
    } 

    getCpm(id: number): void { 
     let params: URLSearchParams = new URLSearchParams(); 
     params.set('Id', id.toString()); 

     this._http.get('a/Url/Here', { search: params }) 
      .map(response => <Cpm>response.json()) 
      .subscribe(_cpm => { 
       this.cpmSubject.subscribe(cpmList => { 
        //double check we dont already have the cpm in the observable, if we dont have it, push it and call next to propigate new cpmlist everywheres 
        if (! (cpmList.filter((cpm: Cpm) => cpm.id === _cpm.id).length > 0)) { 
         cpmList.push(_cpm); 
         this.cpmSubject.next(cpmList); 
        } 
       }) 
      }); 
    } 
} 

这里是我的组件的订阅的一个片段:

this._cpmService.cpmSubject.subscribe(cpmList => { 
     doSomeWork(); 
    }); 

但是相反在单个订阅上触发doSomeWork()我只想在cpmSubject和fooSubject触发时触发doSomeWork()。

+0

一个http请求不能直接返回'BehaviorSubject' - 我猜想,你是'nexting' HTTP的响应每到一个'BehaviorSubject'或者甚至订阅'Subject'到'get/post/put'? – olsn

+0

@olsn是的,我订阅了http响应,并将我的主题与他们在服务类中的回复联系起来 – cobolstinks

+0

行为主题的公共访问是反模式。请使用带有“as Observable”的downcast的getter。所以你不能使用服务外的下一个电话 - >分离问题 –

回答

4

你可以使用zip - 运算符,它的工作原理类似于combineLatest或forkJoin,但只有当两个流都发出触发:http://reactivex.io/documentation/operators/zip.html

zipcombineLatest之间的区别是: 邮编将只会引发”并行“,而combineLatest将触发任何更新并发出每个流的最新值。 因此,假设下列2个流:

streamA => 1--2--3 
streamB => 10-20-30 

zip

  • “1,10”
  • “2,20”
  • “3,30”

combineLatest

  • “1,10”
  • “2,10”
  • “2,20”
  • “3,20”
  • “3,30”

这里也是一个活例子:

const a = new Rx.Subject(); 
 
const b = new Rx.Subject(); 
 

 
Rx.Observable.zip(a,b) 
 
    .subscribe(x => console.log("zip: " + x.join(", "))); 
 
Rx.Observable.combineLatest(a,b) 
 
    .subscribe(x => console.log("combineLatest: " + x.join(", "))); 
 

 
a.next(1); 
 
b.next(10); 
 
a.next(2); 
 
b.next(20); 
 
a.next(3); 
 
b.next(30);
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>


另一个注意事项:永远不会订阅订阅。 做这样的事情,而不是:

this._http.get('a/Url/Here', { search: params }) 
      .map(response => <Cpm>response.json()) 
      .withLatestFrom(this.cpmSubject) 
      .subscribe([_cpm, cpmList] => { 
       if (! (cpmList.filter((cpm: Cpm) => cpm.id === _cpm.id).length > 0)) { 
        cpmList.push(_cpm); 
        this.cpmSubject.next(cpmList); 
       } 
      }); 
+0

zip和combineLatest有什么区别? – cobolstinks

+0

我已更新答案 – olsn

+0

谢谢,详细的答案。我试图尝试一下,但我没有在我的Rx.Observable对象上找到zip方法。 – cobolstinks