2017-08-17 30 views
1

我想合并/合并多个观察对象当他们每个完成时执行一个最后函数。 merge运营商似乎并行执行每个订阅,这是我需要的,但如果他们中的任何一个抛出一个错误执行被暂停。rxjs5合并和错误处理

RxJS 4版有一个运营商mergeDelayError应该继续执行,直到所有的人都完成了所有订阅,但该运营商不版本5实现。

我应该回到不同的操作员吗?

var source1 = Rx.Observable.of(1,2,3).delay(3000); 
var source2 = Rx.Observable.throw(new Error('woops')); 
var source3 = Rx.Observable.of(4,5,6).delay(1000); 

// Combine the 3 sources into 1 

var source = Rx.Observable 
    .merge(source1, source2, source3) 
    .finally(() => { 

    // finally is executed before all 
    // subscriptions are completed. 

    console.log('finally'); 

    }); 

var subscription = source.subscribe(
    x => console.log('next:', x), 
    e => console.log('error:', e), 
() => console.log('completed')); 

JSBin

回答

2

我想你可以通过使用catch()模拟相同的行为。您只需将其追加到每一个源可观察:

const sources = [source1, source2, source3].map(obs => 
    obs.catch(() => Observable.empty()) 
); 

Rx.Observable 
    .merge(sources) 
    .finally(...) 
    ... 
+0

谢谢 - 'catch'似乎不起作用。相反,我只是用'onErrorResumeNext'映射了所有东西,这在我的情况下是可以的。 – null

+0

@null catch()运算符不起作用如何?我认为它应该工作... – martin

+0

我不知道为什么。看到这个JSBin:[http://jsbin.com/qaluyaq/edit?js,console](http://jsbin.com/qaluyaq/edit?js,console) – null

1

如果你不想吞咽你的错误,但希望他们推迟到了最后,你可以:

const mergeDelayErrors = []; 
const sources = [source1, source2, source3].map(obs => obs.catch((error) => { 
    mergeDelayErrors.push(error); 
    return Rx.Observable.empty(); 
})); 

return Rx.Observable 
    .merge(...sources) 
    .toArray() 
    .flatMap(allEmissions => { 
    let spreadObs = Rx.Observable.of(...allEmissions); 
    if (mergeDelayErrors.length) { 
     spreadObs = spreadObs.concat(Rx.Observable.throw(mergeDelayErrors)); 
    } 
    return spreadObs; 
    }) 

您可能只想抛出第一个错误,或创建一个CompositeError。我不确定mergeDelayErrors最初在引发多个错误时的表现。

不幸的是,因为这实现必须等待,直到发射前的错误完成所有观测,它也一直等待,直到下一次发射前完成所有观测。这很可能不是mergeDelayError的原始行为,它应该以流的形式发出,而不是在最后发出。

+0

谢谢你,如你所愿 –

0

我们可以避免通过收集错误并在端部发射它们阻塞流。

function mergeDelayError(...sources) { 
    const errors = []; 
    const catching = sources.map(obs => obs.catch(e => { 
    errors.push(e); 
    return Rx.Observable.empty(); 
    })); 
    return Rx.Observable 
    .merge(...catching) 
    .concat(Rx.Observable.defer(
    () => errors.length === 0 ? Rx.Observable.empty() : Rx.Observable.throw(errors))); 
} 


const source1 = Rx.Observable.of(1,2,3); 
const source2 = Rx.Observable.throw(new Error('woops')); 
const source3 = Rx.Observable.of(4,5,6); 

mergeDelayError(source1, source2, source3).subscribe(
    x => console.log('next:', x), 
    e => console.log('error:', e), 
() => console.log('completed'));