2016-07-27 28 views
0

我想轮询更改,并在达到所需值时Observable应完成(或等到超时)。现在我使用可以正常工作的过滤器,直到达到所需的值。但是我希望Observable在等待这个值时推送事件。RxJs直到达到期望值的可观察间隔

例如,我等待状态为'成功',直到状态变为'成功',状态'测试'从我的服务中返回。但由于过滤器正在等待'成功','测试'永远不会返回。

我现在代码:

return Observable 
    .interval(this.POLL_TIMEOUT) 
    .flatMap(() => this.getSingleProjectStatus(projectId, repoName)) 
    .filter(data => this.finishedStatus(data.status)) 
    .take(1) 
    .timeout(this.MAX_TIMEOUT, Observable.throw(new Error('Timeout'))); 

回答

0

你可能想takeWhile,而不是filter

return Observable 
    .interval(this.POLL_TIMEOUT) 
    .flatMap(() => this.getSingleProjectStatus(projectId, repoName)) 
    .takeWhile(data => this.finishedStatus(data.status)) 
    .timeout(this.MAX_TIMEOUT, Observable.throw(new Error('Timeout')); 

注意上述通吃除了的最后一个事件,如果你想在最后一场比赛太你需要是有点麻烦。

const source = Observable.interval(this.POLL_TIMEOUT) 
    .flatMap(() => this.getSingleProjectStatus(projectId, repoName)) 
    .share(); 

    source 
    .takeUntil(source.filter(data => this.finishedStatus(data.status))) 
    .timeout(this.MAX_TIMEOUT, Observable.throw(new Error('Timeout')); 

在这种情况下,你正在服用的所有结果,直到另一个Observable发出,在这种情况下,其他Observable仅仅是过滤而只发射成功事件的source

JsBin:http://jsbin.com/sojosuhune/edit?html,js,console,output