2017-10-04 83 views
0

我有以下代码:等待所有观察到的与rxjs

this.hubService.sendScopedCommand(Constants.hangarCommands.getHangarsOfPlayer).then((result: ICommand) => { 

    let hangars: IHangar[] = result.arguments[0]; 

    for (let hangar of hangars) { 
     this.pieceService.getGroupedPieces(hangar.pieces).subscribe(group => hangar.groupedPieces = group); 
    } 

    this.hangars$.next(hangars); 
}, (ex: any) => this.hangars$.error(ex)); 

所以基本上,sendScopeCommand通过的WebSocket发送的东西,当WebSocket的接收到响应执行then功能。在这一点上,我得到了一个对象的数组,我把它放在hangars

在这些对象中,我有一个玩家拥有的所有棋子的数组。可以有多个相同的片类型,所以我做了一个函数来将它们分组:getGroupedPieces。它的代码如下:

public getGroupedPieces(pieces: IPiece[]): Observable<IGroupedPiece[]> { 
    return Observable 
     .from(pieces) 
     .groupBy(p => p.pieceTypeId) 
     .flatMap(p => p.toArray()) 
     .map(p => { return <IGroupedPiece>{ amount: p.length, piece: p[0] }; }) 
     .toArray(); 
} 

此代码的工作原理,但我敢肯定,这是不正确的。事实上,我认为hangars甚至在for循环中的可观测值完成之前在可观测值上发射。

我想在这里等待所有这些observable完成,然后在Observable上发射hangars

+0

为什么不发射订阅被调用时的事件? –

回答

0

我想你可以尝试使用RxJS的forkJoin操作:

this.hubService.sendScopedCommand(Constants.hangarCommands.getHangarsOfPlayer).then((result: ICommand) => { 

    let hangars: IHangar[] = result.arguments[0]; 
    let hangars$$: Observable<IHangar>[] = hangars.map(hangar => { 
     return this.pieceService.getGroupedPieces(hangar.pieces) 
    }) 

    Observable 
     .forkJoin(...hangars$$) 
     .subscribe(groups => { 
     groups.forEach((group, i) => hangars[i].groupedPieces = group) 
     this.hangars$.next(hangars); 
     }) 

}, (ex: any) => this.hangars$.error(ex)); 
+0

太棒了,谢谢 – ssougnez

2

就我个人而言,我尝试尽可能少地调用订阅,特别是避免可能的订阅嵌套。

我把一个快速的角度组件放在一起给我如何处理它的样本。

对不起,如果它的原油或拼写错误(断锁骨只用一只手来使用)。

import { Component, OnInit } from '@angular/core'; 
import { Observable, Subject } from 'rxjs'; 

interface hangar { 
    pieces: number[]; 
    groupedPieces: number[]; 
} 

@Component({ 
    selector: 'app-root', 
    templateUrl: './app.component.html', 
    styleUrls: ['./app.component.css'] 
}) 
export class AppComponent { 
    title = 'app works!'; 
    //output mock 
    hangars$: Subject<hangar[]> = new Subject<hangar[]>(); 

    ngOnInit() { 
     //proof of functionality 
     this.hangars$.subscribe(h => console.log(h)); 
    } 

    //method to mock the then call from example 
    start() { 

     //mock some data 
     let hangars: hangar[] = [{ pieces: [1, 2, 3], groupedPieces: null }, { pieces: [1, 2, 3], groupedPieces: null }, { pieces: [1, 2, 3], groupedPieces: null }]; 

     //subject to handle observable clean up 
     let subManagement$: Subject<any> = new Subject<any>(); 
     let obsArr: Observable<number[]>[] = []; 

     //here were going to build an array the observables but not subscribe to them yet 
     hangars.forEach(hangar => 
      obsArr.push(
       this.getGroupedPieces(hangar.pieces) 
        .takeUntil(subManagement$) 
        .do(group => hangar.groupedPieces = group) 
      ) 
     ); 

     //real magic, this waits for all of the observables responses before emitting its value 
     Observable.combineLatest(obsArr).subscribe(
      () => this.hangars$.next(hangars), 
      null, 
      () => subManagement$.next()//cleanup 
     ); 
    } 

    //mock out your service 
    private getGroupedPieces(pcs: number[]): Observable<number[]> { 

     return Observable.of([1, 2, 3, 4]).delay(1000); 
    } 
} 
+0

如果您将Observable更改为承诺,则可以使用'Promise.all'来等待所有承诺响应并继续。 – Dekonunes