重试逻辑真的很冗长。
您可以完全通过切换到Observable.just(t1, t2, t3)
构造函数来避免使用ImmutableList
。这基本上做同样的事情,但不太详细。
我看到你flatMapping为了将每个值转换为Observable。这可以防止映射单个值以取消订阅整个链时遇到的onError。所以当一个运算符抛出时,它将会退订这个值的内部可观察链。否则,一个错误会导致取消订阅并从主外部可观察值重新订阅。
如果你想保持这种行为,但减少锅炉板(除了明显切换到Java8 lambda表达式),我可以想出2个选择。
首先,重新登录和重复数据删除您的数据后重试。如果您的值具有良好的hashcode
和equals
实现,则只有当该集合尚未包含该值时,才可以使用过滤器追加到有状态集和onNext。
Observable.<Long> just(1L, 2L, 3L)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
if (aLong == 2 && errCount.getAndIncrement() < 1) {
throw new RuntimeException("retryable error");
}
return aLong * 100;
}})
.retry(2)
.filter(new Func1<Long, Boolean>() {
Set<Long> state = null;
@Override
public Boolean call(Long a) {
if (state == null)
state = new HashSet<Long>();
if (!state.contains(a)) {
state.add(a);
return true;
}
return false;
}})
.forEach(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println(aLong);
}});
其次,你可以从何时重新订阅它离开的地方切换你观察到的,以简历。请注意,当使用缓冲区的操作符(observeOn,merge,flatMap)时,这可能会导致数据丢失问题。这是因为他们将继续以与下游消费者脱钩的方式消费。所以你要确保在重试之前不要缓冲。如果您正在实施支持背压的可观察源,还需要考虑其他因素。
// Should resume right where it left off
resumableObservable.map(...).retry(2).observeOn()
// Don't do this. ObserveOn will buffer values and resume will lose data.
resumableObservable.map(...).observeOn().retry(2)
// Also bad if running async observables. Merging buffers so this could have data loss.
Observable.merge(resumableObservable.map(...)).retry(2)
如果任何一项失败后2个重试,流断开(没有更多的项目被处理)。我想要一个干净的方式来返回像Finagle的Try这样的异常和结果,所以我可以处理所有异常。
您可以将不可靠的地图从Long -> Long
更改为Long -> Tuple<Long, List<Exception>>
。由于这是相当多的泛型,并且很快变得麻烦,我建议使用重试运算符的不同变体,即retryWhen(Func1<Observable<Throwable>, Observable<?>>)
。这里是一个如何在你的代码中使用它的例子。
}).retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>(){
@Override
public Observable<?> call(Observable<? extends Throwable> o) {
final AtomicInteger count = new AtomicInteger();
return o.filter(new Func1<Throwable, Boolean>() {
@Override
public Boolean call(Throwable t) {
return t instanceof RuntimeException || count.getAndIncrement() < 5;
}}).delay(1, TimeUnit.SECONDS, Schedulers.immediate());
}})
使用重试的好处是,您可以在非阻塞样式的一段时间后轻松实现延迟重试。