2015-02-08 66 views
5

如果我有一个Observalbe:RxJava Observable和Subscriber用于跳过异常?

List<Integer> ints = Lists.newArrayList(1, 2, 0, 3, 4); 
Observable<Integer> o1 = Observable.from(ints); 

我想12生成另一个观察到,这鸿沟:

Observable<Integer> o2 = o1.map(i -> 12/i); 
o2.subscribe(
    v -> logger.info ("Subscriber value {}", v) , 
    t -> logger.error("Subscriber onError {} : {}", t.getClass() , t.getMessage()) 
); 

很明显它会得到错误,当它遇到停止“0”:

RxTest - Subscriber value 12 
RxTest - Subscriber value 6 
RxTest - Subscriber onError class java.lang.ArithmeticException :/by zero 

但是如果我想让观察者(o2)跳过异常呢?

我看着RxJava的文档error handling,没有办法跳过这个错误。 onErrorResumeNext()onExceptionResumeNext()需要备份/备份Observable,这不是我想要的。 onErrorReturn需要指定返回值。

所有三种错误处理方法都无法恢复原始观察者。例如:

Observable<Integer> o2 = o1.map(i -> 12/i) 
     .onErrorReturn(t -> 0); 

它打印:

RxTest - Subscriber value 12 
RxTest - Subscriber value 6 
RxTest - Subscriber value 0 

不打印剩下的12/3 12/4和

唯一的解决办法似乎继电器在map功能:

Observable<Integer> o2 = o1.map(i -> { 
    try { 
    return Optional.of(12/i); 
    } catch (ArithmeticException e) { 
    return Optional.empty(); 
    } 
}).filter(Optional::isPresent) 
    .map(o -> (Integer) o.get()); 

它的工作,但它很麻烦。我不知道是否操纵Observable(如map

以上是关于Observable跳过异常时,有什么办法可以轻松地跳过任何RuntimeException。以下是有关在Subscriber跳过例外:

的情况是一样的:

List<Integer> ints = Lists.newArrayList(1, 2, 0 , 3 , 4); 
Observable<Integer> o1 = Observable.from(ints); 
o1.subscribe(
    i -> logger.info("12/{} = {}", i, 12/i), 
    t -> logger.error("{} : {}", t.getClass() , t.getMessage()), 
() -> logger.info("onCompleted") 
); 

它打印出:

当例外onNext发生时,它会触发onError和NOT响应Observable的任何数据。如果我希望订户吞下异常,我必须尝试捕捉onNext()中的ArithmeticException。有没有更清洁的解决方案?

看来当SubscriberonNext()面临错误,在(onNext)内无法处理时,它应该停止,对吧?这是一个好的设计吗?

+1

我看它的样子,'onNext()'等同于'Iterator.next()' - 当有迭代集合的问题 - 将抛出一个异常,并迭代捞出(它不't“resume”iterating) - 例如,请参阅[ConcurrentModificationException](http://docs.oracle.com/javase/7/docs/api/java/util/ConcurrentModificationException.html)。这与我们在这里的行为是一样的。也就是说,@benjchristensen或许能够对这个问题有更多的了解 – alfasin 2015-02-08 05:27:20

+0

“可选”映射可能是我最喜欢的处理方式。这很清楚,过滤器很好地消化了事件。封装它是一个很好的方法。我会在scala中使用'Try'类型,然后进行过滤。 – BeepDog 2018-02-21 21:49:31

回答

1

试试这个方法。

Observable<Integer> o2 = o1.map(i -> 12/i).onErrorFlatMap(e -> Observable.empty()); 

在上面的代码中,错误事件被替换为空的流,原始流正在恢复。因此跳过错误。

+0

对不起,在RxJava 1.0中没有'Observable.onErrorFlatMap()'... – smallufo 2015-02-08 06:32:58

+0

哦,对不起,我没有注意到版本。但是可以使用'materialize'和'dematerialize'来实现'onErrorFlatMap'类似的组合器,尽管有点困难。 – findall 2015-02-08 06:37:57

+0

我也很好奇,为什么这种方法在1.0中消失。 – smallufo 2015-02-08 06:39:32

5
Observable.just(1, 2, 3, 0, 4) 
      .flatMap(i -> Observable.defer(() -> Observable.just(12/i)) 
        .onErrorResumeNext(Observable.just(0))); 

它是一种方法,但请记住,RxJava假定错误是真正没有得到解决的(您可以预期值为0)。另一方面,如果你想忽略除以0的异常,也许你应该在分割之前过滤/映射你的值。

+0

我在这里改变的唯一东西是 'Observable.from(1,2,3,0,4)'到'Observable.just(1,2,3,0,4)',因为在1.0.0中它不会用'from'编译。 – meddle 2015-02-09 08:50:05

0
import rx.lang.scala._ 

import scala.concurrent.duration.DurationDouble 
import scala.util.{Failure, Success, Try} 

val o1 = List(1, 2, 3, 0, 4) toObservable 
val o2 = List(1, 2, 3, 4, 5) toObservable 

val o3 = o1 zip o2 map {case (i1, i2) => i2/i1 } // this will trigger a division by 0 error 

val o4 = o3 lift { 
     subscriber: Subscriber[Try[Int]] => new Subscriber[Int](subscriber) { 
     override def onNext(v: Int) { 
      if (!subscriber.isUnsubscribed) 
      subscriber.onNext(Success(v)) 
     } 

     override def onError(e: Throwable) { 
      if (!subscriber.isUnsubscribed) 
      subscriber.onNext(Failure(e)) 
     } 

     override def onCompleted() { 
      if (!subscriber.isUnsubscribed) 
      subscriber.onCompleted() 
     } 
     } 
    } 

o4 subscribe(println(_))