2016-11-02 33 views
1

我是RxJava新手,我有以下问题。 说我有项目的序列和项目传播错误,我想忽略它,并继续处理其他项目。如何在RxJava出错后继续流式传输项目?

我有下面的代码片段:

Observable.from(Arrays.asList("1", "2", "3")) 
      .map(x -> { 
       if (x.equals("2")) { 
        throw new NullPointerException(); 
       } 
       return x + "-"; 
      }) 
      .onExceptionResumeNext(Observable.empty()) 
      .subscribe(System.out::println); 

我越来越:1-

但我想:1,3

我怎么能这样做?

+0

的可能的复制【如何忽略错误并继续无限流?(http://stackoverflow.com/questions/28969995/how-to-ignore-error-and-continue-infinite-stream) – Ivan

回答

2

诀窍是将值以某种方式转换为新的可观察值和平面图,如下例所示。现在,flatMap中的每个值都可以引发异常并按值处理值。因为flatMap中的子流只包含一个元素,所以在onError之后observable将被关闭并不重要。我使用RxJava2作为测试环境。

@Test 
public void name() throws Exception { 
    Observable<String> stringObservable = Observable.fromArray("1", "2", "3") 
      .flatMap(x -> { 
       return Observable.defer(() -> { 
        try { 
         if (x.equals("2")) { 
          throw new NullPointerException(); 
         } 
         return Observable.just(x + "-"); 
        } catch (Exception ex) { 
         return Observable.error(ex); 
        } 
       }).map(s -> { 
        if (s.equals("3-")) { 
         throw new IllegalArgumentException(); 
        } 
        return s + s; 
       }).take(1) 
         .zipWith(Observable.just("X"), (s, s2) -> s + s2) 
         .onErrorResumeNext(Observable.empty()); 
      }); 

    TestObserver<String> test = stringObservable.test(); 

    test.assertResult("1-1-X"); 
} 
+0

非常感谢你! 我有另一个问题: 所以如果我有十个变压器,他们每个人都可以抛出异常,所以我应该用这个代码包装他们每个? – corvax

+0

一如既往的答案是:这取决于你想要做什么。如果你不想只将map操作符放到新的observable上,而是要将其他操作符放到onErrorResumeNext之前。如果其中一个操作员失败,则该值(例如2)将失败,而返回空的可观察值。如果你想处理每个操作员的备份值,你可以将子流重新包装成一个新的流和平面图。请提供更多代码。 –