2017-01-19 36 views
11

我推到可观察像这样的一个数组...RXJS等待所有观测阵列中的完成(或错误)

var tasks$ = []; 
tasks$.push(Observable.timer(1000)); 
tasks$.push(Observable.timer(3000)); 
tasks$.push(Observable.timer(10000)); 

我想可观察到的,当所有任务$完成发射。请记住,实际上,任务$没有已知数量的可观察值。

我试过Observable.zip(tasks$).subscribe(),但是如果只有一项任务,这似乎失败了,并且正在让我相信ZIP需要偶数个元素才能按我期望的方式工作。

我试过Observable.concat(tasks$).subscribe()但是concat操作符的结果似乎只是一个观察值的数组......基本上与输入相同。你甚至不能在它上面调用订阅。

在C#中,这将类似于Task.WhenAll()。在ES6中的 保证它与Promise.all()类似。

我遇到过许多SO问题,但它们似乎都处理等待已知数量的流(例如将它们映射到一起)。

+1

这取决于您在任何Observables发送错误通知时要执行的操作。你想忽略错误还是意味着整个结果将被丢弃,你会收到错误。 – martin

回答

26

如果你想撰写观察到当所有源的可观测量完成发射,你可以使用forkJoin

import { Observable } from 'rxjs/Observable'; 
import 'rxjs/add/observable/forkJoin'; 
import 'rxjs/add/operator/first'; 

var tasks$ = []; 
tasks$.push(Observable.timer(1000).first()); 
tasks$.push(Observable.timer(3000).first()); 
tasks$.push(Observable.timer(10000).first()); 
Observable.forkJoin(...tasks$).subscribe(results => { console.log(results); }); 
+6

如果'tasks'是动态构建的并且为空,那么请小心,forkJoin将停止可观察序列。查看我的答案在这里获取更多信息http://stackoverflow.com/a/42622968/1224564 – bgondy

+0

第三个可观察对象的O没有像宣布的那样处于大写状态,因此导致可观察的是未定义的。此外,任务在订阅之前缺少结束美元符号。我无法编辑答案,因为:编辑必须至少包含6个字符;在这篇文章中还有其他改进之处吗? –

+0

我不想永远唠叨你,但我认为结果不会显示,因为可观察的流程不完整。可能只是在定时器调用或.take(n)之后添加.first()以获得更有趣的结果。 forkJoin是我正在寻找的操作员,谢谢@cartant! –

0

对我来说这是sample最好的解决办法。

const source = Observable.interval(500); 
const example = source.sample(Observable.interval(2000)); 
const subscribe = example.subscribe(val => console.log('sample', val)); 

所以..只有当第二个(例子)发出 - 你会看到第一个(源)的最后一个emited值。

在我的任务中,我等待表单验证和其他DOM事件。