我对RxJava相当陌生,和其他很多人一样,我试图让我的头脑处理异常。我在网上阅读了不少帖子(例如,这里的讨论how to handle exceptions thrown by observer's onNext)和认为,我得到了这些概念的基本概念。RxJava 2.0 - 处理publish()中uncaught subsciber错误的资源refCount()
在上述讨论,海报的人说,那个时候的异常在用户被抛出,RxJava执行以下操作:
实现通用操作记录故障并停止发送其事件 (任何类型),并清除由于该订户而产生的任何资源,并且携带 以及任何剩余的订阅。
这也或多或少是我所看到的,我唯一遇到的问题是“清理任何资源”位。为了说明这一点,我们假设如下示例:
我想创建一个Observable,它监听每个接收到的消息的异步事件源(例如JMS队列)和onNext()。因此,在(伪)代码,我会做一些类似的:
Observable<String> observable = Observable.create(s -> {
createConnectionToBroker();
getConsumer().setMessageListener(message -> s.onNext(transform(message)));
s.setDisposable(new Disposable() {
public void dispose() {
tearDownBrokerConnection();
}
});
});
因为我想重用了许多用户/观察员消息监听器,我不直接认购在创造可观的,但利用而不是发布()。refCount()团队。类似的东西:
Observable<String> observableToSubscribeTo = observable.publish().refCount();
Disposable d1 = observableToSubscribeTo.subscribe(s -> ...);
Disposable d2 = observableToSubscribeTo.subscribe(s -> ...);
这一切都按预期工作。代码仅在第一次订阅建立时连接到JMS,并且当最后一个观察者为dispose()
d时,与代理的连接关闭。
但是,当订阅者抛出一个异常,当它是onNext()
编辑时,事情似乎变得混乱。正如预期的那样,抛出的观察者是裸体的,每当发布新事件时,它就不会再被通知。我的问题似乎是,当所有剩余的订户都是dispose()
d时,不再通知维护到消息代理的连接的Observable。它看起来好像抛出异常的用户处于某种僵尸状态。它在事件分发时被忽略,但它在某种程度上阻止根Observable在最后一个订阅者为dispose()
d时得到通知。
据我所知,RxJava希望观察者确保不会抛出,而是恰当地处理最终的异常。不幸的是,在我想提供一个向调用者返回Observable的库的情况下,我无论如何都无法控制我的订阅者。这意味着,我永远无法保护我的图书馆免受愚蠢的观察者的伤害。
所以,我问自己:我在这里错过了什么?订阅者抛出时真的没有机会正确清理吗?这是一个错误还是只是我不理解图书馆?
任何见解非常感谢!
喜@akarnokd, 感谢您的关注!我刚刚写了一个基本的单元测试来演示行为,只是为了了解我不再能够重现它?!?! :-o很尴尬,我必须说: - (所以我可以假设问题出在我的胶合代码中,为了浪费你的时间而道歉!!! 感谢您对safeSubscribe()的评论。图书馆你永远不会相信使用的代码,我真的想加强它。一个行为不端的用户不应该对其他行为良好的用户产生任何副作用。 –