2017-02-06 41 views
2

我对RxJava相当陌生,和其他很多人一样,我试图让我的头脑处理异常。我在网上阅读了不少帖子(例如,这里的讨论how to handle exceptions thrown by observer's onNext)和认为,我得到了这些概念的基本概念。RxJava 2.0 - 处理publish()中uncaught subsciber错误的资源refCount()

在上述讨论,海报的人说,那个时候的异常在用户被抛出,RxJava执行以下操作:

实现通用操作记录故障并停止发送其事件 (任何类型),并清除由于该订户而产生的任何资源,并且携带 以及任何剩余的订阅。

这也或多或少是我所看到的,我唯一遇到的问题是“清理任何资源”位。为了说明这一点,我们假设如下示例:

我想创建一个Observable,它监听每个接收到的消息的异步事件源(例如JMS队列)和onNext()。因此,在(伪)代码,我会做一些类似的:

Observable<String> observable = Observable.create(s -> { 
    createConnectionToBroker(); 
    getConsumer().setMessageListener(message -> s.onNext(transform(message))); 
    s.setDisposable(new Disposable() { 
    public void dispose() { 
     tearDownBrokerConnection(); 
    } 
    }); 
}); 

因为我想重用了许多用户/观察员消息监听器,我不直接认购在创造可观的,但利用而不是发布()。refCount()团队。类似的东西:

Observable<String> observableToSubscribeTo = observable.publish().refCount(); 

Disposable d1 = observableToSubscribeTo.subscribe(s -> ...); 
Disposable d2 = observableToSubscribeTo.subscribe(s -> ...); 

这一切都按预期工作。代码仅在第一次订阅建立时连接到JMS,并且当最后一个观察者为dispose() d时,与代理的连接关闭。

但是,当订阅者抛出一个异常,当它是onNext()编辑时,事情似乎变得混乱。正如预期的那样,抛出的观察者是裸体的,每当发布新事件时,它就不会再被通知。我的问题似乎是,当所有剩余的订户都是dispose() d时,不再通知维护到消息代理的连接的Observable。它看起来好像抛出异常的用户处于某种僵尸状态。它在事件分发时被忽略,但它在某种程度上阻止根Observable在最后一个订阅者为dispose() d时得到通知。

据我所知,RxJava希望观察者确保不会抛出,而是恰当地处理最终的异常。不幸的是,在我想提供一个向调用者返回Observable的库的情况下,我无论如何都无法控制我的订阅者。这意味着,我永远无法保护我的图书馆免受愚蠢的观察者的伤害。

所以,我问自己:我在这里错过了什么?订阅者抛出时真的没有机会正确清理吗?这是一个错误还是只是我不理解图书馆?

任何见解非常感谢!

回答

1

如果你可以展示一些单元测试来证明这个问题(不需要JMS),那就太棒了。

另外,在RxJava 2中的下一个应该永远不会抛出;如果这是一个未定义的行为。如果你不信任你的消费者,你可以有一个最终观察到的变压器,做safeSubscribe代替普通subscribe,增加了对下游行为不端的保护:

.compose(o -> v -> o.safeSubscribe(v)) 

.compose(new ObservableTransformer<T>() { 
    @Override public Observable<T> apply(final Observable<T> source) { 
     return new Observable<T>() { 
      @Override public void subscribeActual(Observer<? super T> observer) { 
       source.safeSubscribe(observer); 
      } 
     }; 
    } 
}) 
+1

喜@akarnokd, 感谢您的关注!我刚刚写了一个基本的单元测试来演示行为,只是为了了解我不再能够重现它?!?! :-o很尴尬,我必须说: - (所以我可以假设问题出在我的胶合代码中,为了浪费你的时间而道歉!!! 感谢您对safeSubscribe()的评论。图书馆你永远不会相信使用的代码,我真的想加强它。一个行为不端的用户不应该对其他行为良好的用户产生任何副作用。 –