2017-08-24 71 views
0

我有一个这样的数据流:如何在RxJava2中默默跳过异常?

Observable 
    .fromFuture(
     CompletableFuture.supplyAsync { // First remote call returns Future<List<Type>> 
      listOf(1, 2, 3, 57005, 5) 
     }, 
     Schedulers.computation() 
    ) 
    .flatMap { it.toObservable() } // I turn that list into a stream of single values to process them one by one 
    .map { 
     CompletableFuture.supplyAsync { // This remote call may fail if it does not like the input. I want to skip that failures and continue the stream like the fail never occurred. 
      if (it == 0xDEAD) { 
       throw IOException("Dead value!") 
      } 

      it 
     } 
    } 
    .flatMap { 
     Observable.fromFuture(it) // Turn that Futures into a stream of Observables once again 
    } 
    .doOnNext { 
     println(it) // Debug 
    } 
    .blockingSubscribe() 

我已经更换了业务逻辑(实际返回Future S)与CompletableFuture.supplyAsync。 而且,是的,这是Kotlin,但我想你有这个意图。

当我评论 “死” 值(57005)输出为:

1 
4 
9 
25 

但是,如果 “死” 值出现在流,失败:

1 
4 
9 
Exception in thread "main" java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.io.IOException: Dead value! 
    at io.reactivex.internal.util.ExceptionHelper.wrapOrThrow(ExceptionHelper.java:45) 
    at io.reactivex.internal.operators.observable.ObservableBlockingSubscribe.subscribe(ObservableBlockingSubscribe.java:86) 
    at io.reactivex.Observable.blockingSubscribe(Observable.java:5035) 
    at by.dev.madhead.rx.TestKt.main(test.kt:41) 
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Dead value! 
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) 
... 

我一个新手在RX,所以很快搜索出一个解决方案:onExceptionResumeNextObservable.fromFuture(it) - >Observable.fromFuture(it).onExceptionResumeNext { Observable.empty<Int>() }。但是现在我的应用程序永远挂起(在产生我期望的输出之后)。 看起来像流永远不会结束。

我应该“关机”Observable莫名其妙或什么? 或者更一般地说,使用RX时是否是一个好方法? 我是否应该以另一种方式重新考虑它?

回答

1

燕子的例外是这样的:

Observable.fromFuture(it).onErrorResumeNext(Observable.empty()) 
+0

这实际上做的伎俩,愚蠢的我。谢谢! – madhead