2017-01-31 25 views
0

我在异步消息传递环境(vert.x)使用​​RxJava,所以有一个看起来像这样的流程:retryWhen与计时器似乎颠覆合并行为

Observable.defer(() -> getEndpoint()) 
      .mergeWith(getCancellationMessage()) 
      .flatMap(endpoint -> useEndpoint(endpoint)) 
      .retryWhen(obs -> obs.flatMap(error -> { 
       if (wasCancelled(error)) { 
        return Observable.error(error); 
       } 
       return Observable.timer(/* args */) 
      })) 
      .subscribe(result -> useResult(result), 
         error -> handleError(error) 
     ); 

getCancellationMessage()实施返回可观察的流,每当从独立消息源接收到取消消息时都会发出错误。此流不会发出除Observable.error()以外的任何内容,并且它只会在接收到取消消息时发出错误。

如果我了解merge的工作原理,则当getCancellationMessage()发出错误时,应通过onError终止整个链。

但是,我发现如果retryWhen运营商在收到取消消息时正在等待定时器发出,则错误将被忽略,并且retryWhen循环继续,就好像从未收到取消。

我可以通过将Observable.timer()getCancellationMessage()函数合并来解决该问题,但我不明白为什么我必须首先执行此操作。

这是merge/retryWhen的交互期望吗?

编辑:

下面是什么样的事情了getCancellationMessage()功能正在做的一个例子:

Observable<T> getCancellationMessage() { 
    if (this.messageStream == null) { 
     this.messageStream = this.messageConsumer.toObservable() 
          .flatMap(message -> { 
           this.messageConsumer.unregister(); 
           if (isCancelMessage(message)) { 
            return Observable.error(new CancelError()); 
           } 
           else { 
            return Observable.error(new FatalError()); 
           } 
          }); 
    } 
    return this.messageStream; 
} 

请注意,我没有自己的this.messageConsumer实施 - 这是来自我正在使用的第三方库(vert.x),所以我不控制该Observable的实现。

据我了解,在messageConsumer.toObservable()方法返回提供的this class实例Observable.create()的结果,这将调用用户的onNext方法,每当一个新的消息has arrived

messageConsumer.unregister()的呼叫会阻止接收到任何进一步的消息。

+1

阅读文档,它没有提及enywhere是'Observable.timer'调用'onError'可言。由于'onError'没有被定时器调用,子订阅也不会调用'onError',所以它忽略了异常。老实说,我有点缺乏深入的了解与信心,回答这个问题,但你有没有尝试过的文档表明什么官方在[这里](http://reactivex.io/RxJava/javadoc/rx/Observable.html#retryWhen(RX .functions.Func1)) –

+0

我并不真正期望'Observable.timer()'产生一个错误,但是当'timer'运行时,'getCancellationMessage'发出错误是绝对正确的,而我想知道为什么它被忽略。即使在计时器过去之后,就好像'getCancellationMessage'从不发出'Observable.error()'。在'retryWhen'文档中找不到任何我可以解释我所看到的内容。 – Hoobajoob

+1

但看在我的网址公布前,如果从'观察的retryWith'不会调用图形'onError',然后导致观察到继续循环,这听起来像您遇到什么。 –

回答

2

但是,我发现如果retryWhen运算符在接收到取消消息时等待定时器发出,则错误将被忽略,并且retryWhen循环继续,就好像从未收到取消。

操作员retryWhen接通上游Throwable成通过你为了得到一个值响应于重试上游或结束流提供的序列的值,并且将其路由,从而

Observable.error(new IOException()) 
.retryWhen((Observable<Throwable> error) -> error) 
.subscribe(); 

即将重试无限期地因为内部error现在被认为是一个价值,而不是一个例外。

retryWhen本身并不知道该error值应该认为是一个不应该被重试,那是你的内部流动的工作:

Observable.defer(() -> getEndpoint()) 
    .mergeWith(getCancellationMessage()) 
    .flatMap(endpoint -> useEndpoint(endpoint)) 
    .retryWhen(obs -> obs 
      .takeWhile(error -> !(error instanceof CancellationException)) // <------- 
      .flatMap(error -> Observable.timer(/* args */)) 
) 
    .subscribe(result -> useResult(result), 
       error -> handleError(error) 
); 

在这里,我们只让错误通过,如果它不是CancellationException类型(您可以用您的错误类型替换它)。这将完成序列。

如果您想要的顺序来结束一个错误,而不是,我们需要改变flatMap逻辑来代替:

.retryWhen(obs -> obs 
     .flatMap(error -> { 
      if (error instanceof CancellationException) { 
       return Observable.error(error); 
      } 
      return Observable.timer(/* args */); 
     }) 
) 

注意,在返回flatMapObservable.empty()不会结束序列,因为它只是表明一个源合并是空的,但可能还有其他内部来源。特别要retryWhen,一个empty()将挂起序列无限期因为不会有任何信号,以指示重试或结束序列。

编辑:

根据您的字眼,我认为getCancellationMessage()是一个热门的观测。为了接收他们的事件或错误,必须观察到热观测值。当retryWhen运营商在其重试宽限期由于timer(),没有什么订阅到最mergeWithgetCancellationMessage(),因此它不能停止在该点的定时器。

你要订阅保持它,而timer执行停止它的时候了:

Observable<Object> cancel = getCancellationMessage(); 

Observable.defer(() -> getEndpoint()) 
    .mergeWith(cancel) 
    .flatMap(endpoint -> useEndpoint(endpoint)) 
    .retryWhen(obs -> obs 
     .flatMap(error -> { 
      if (error instanceof CancellationException) { 
       return Observable.error(error); 
      } 
      return Observable.timer(/* args */).takeUntil(cancel); 
     }) 
) 
    .subscribe(result -> useResult(result), 
       error -> handleError(error) 
); 

在这种情况下,如果cancel火灾而计时器执行时,retryWhen会停止计时器,并终止立即取消错误。

使用takeUntil是一种选择,因为您发现,mergeWith (cancel)再次适用。

+0

感谢您的帮助解释。我将要应用您的建议并验证他们是否解决了此问题。 – Hoobajoob

+0

不幸的是,这似乎没有改变行为。如果'retryWhen'遇到导致它返回'Observable.timer()'的错误,则在该点之后但在定时器到期之前产生的任何错误都将被忽略。 'retryWhen'甚至不会被后面的错误调用。 – Hoobajoob

+0

我相应地更新了示例代码。仍然困惑。 – Hoobajoob