我在异步消息传递环境(vert.x)使用RxJava,所以有一个看起来像这样的流程:retryWhen与计时器似乎颠覆合并行为
Observable.defer(() -> getEndpoint())
.mergeWith(getCancellationMessage())
.flatMap(endpoint -> useEndpoint(endpoint))
.retryWhen(obs -> obs.flatMap(error -> {
if (wasCancelled(error)) {
return Observable.error(error);
}
return Observable.timer(/* args */)
}))
.subscribe(result -> useResult(result),
error -> handleError(error)
);
的getCancellationMessage()
实施返回可观察的流,每当从独立消息源接收到取消消息时都会发出错误。此流不会发出除Observable.error()
以外的任何内容,并且它只会在接收到取消消息时发出错误。
如果我了解merge
的工作原理,则当getCancellationMessage()
发出错误时,应通过onError
终止整个链。
但是,我发现如果retryWhen
运营商在收到取消消息时正在等待定时器发出,则错误将被忽略,并且retryWhen
循环继续,就好像从未收到取消。
我可以通过将Observable.timer()
与getCancellationMessage()
函数合并来解决该问题,但我不明白为什么我必须首先执行此操作。
这是merge
/retryWhen
的交互期望吗?
编辑:
下面是什么样的事情了getCancellationMessage()
功能正在做的一个例子:
Observable<T> getCancellationMessage() {
if (this.messageStream == null) {
this.messageStream = this.messageConsumer.toObservable()
.flatMap(message -> {
this.messageConsumer.unregister();
if (isCancelMessage(message)) {
return Observable.error(new CancelError());
}
else {
return Observable.error(new FatalError());
}
});
}
return this.messageStream;
}
请注意,我没有自己的this.messageConsumer
实施 - 这是来自我正在使用的第三方库(vert.x),所以我不控制该Observable的实现。
据我了解,在messageConsumer.toObservable()
方法返回提供的this class实例Observable.create()
的结果,这将调用用户的onNext
方法,每当一个新的消息has arrived。
对messageConsumer.unregister()
的呼叫会阻止接收到任何进一步的消息。
阅读文档,它没有提及enywhere是'Observable.timer'调用'onError'可言。由于'onError'没有被定时器调用,子订阅也不会调用'onError',所以它忽略了异常。老实说,我有点缺乏深入的了解与信心,回答这个问题,但你有没有尝试过的文档表明什么官方在[这里](http://reactivex.io/RxJava/javadoc/rx/Observable.html#retryWhen(RX .functions.Func1)) –
我并不真正期望'Observable.timer()'产生一个错误,但是当'timer'运行时,'getCancellationMessage'发出错误是绝对正确的,而我想知道为什么它被忽略。即使在计时器过去之后,就好像'getCancellationMessage'从不发出'Observable.error()'。在'retryWhen'文档中找不到任何我可以解释我所看到的内容。 – Hoobajoob
但看在我的网址公布前,如果从'观察的retryWith'不会调用图形'onError',然后导致观察到继续循环,这听起来像您遇到什么。 –