2017-08-23 70 views
0

我有一个列表Observables,我目前通过Observable.concatDelayError()订阅。我的要求发生了变化,因此我现在想要有条件地延迟错误Observable.concatDelayError()有条件地继续

如果一个Observable发出一个错误,我想在这一点上决定是否允许剩下的Observable继续或者序列终止。

一个用例是如果错误类型为TimeoutException,那么我会中止其余的Observables;否则,我继续处理其余的Observables。理想情况下,无论我中止还是继续错误,我仍然喜欢最后报告的错误,因为concatDelayError()目前的行为。

我想我正在寻找的东西沿线:Observable.concatDelayError(Iterable<Observable<T>> sources, Func1<Throwable, Boolean> predicate)

回答

1

编辑:

保留非超时错误是更复杂:

PublishSubject<Object> timeout = PublishSubject.create(); 

Observable.from(sources).concatMapDelayError(v -> v.onErrorResumeNext(e -> { 
    if (e instanceof TimeoutException) { 
     timeout.onError(e); 
     return Observable.empty(); 
    } 
    return Observable.error(e); 
})) 
.takeUntil(timeout) 
.subscribe(...); 
+0

感谢@akarnokd。除了最后我需要知道发生任何**错误的事实之外,这种方法运作良好。所以,如果抛出TimeoutException,我想中止流;如果发生其他错误,我希望流继续进行,但最终知道遇到错误。 –

+0

更新了答案。 – akarnokd

+0

最优秀的@akarnokd。这很好用!一方面的问题:在超时情况下返回“Observable.empty()”有什么好处吗?事情仍然按照预期的方式工作 - IF中止并返回错误。在语义上,似乎也是有意义的,即所有情况下都返回错误;尽管,最终,'timeout.onError(e)'是最重要的部分。 –