2015-05-29 69 views
0

我有一个Observable,其中每个项目都以可能导致异常的方式转换,但可以重试。我不希望失败破坏流,因为每个项目代表一个独立的事务。我能拿出最好的解决办法是这样的:RxJava:重试映射操作

final AtomicLong errCount = new AtomicLong(); 
    Observable.from(ImmutableList.of(1L, 2L, 3L)).flatMap(new Func1<Long, Observable<Long>>() { 
     @Override 
     public Observable<Long> call(Long aLong) { 
      return Observable.from(ImmutableList.of(aLong)).map(new Func1<Long, Long>() { 
       @Override 
       public Long call(Long aLong) { 
        if (aLong == 2 && errCount.getAndIncrement() < 1) { 
         throw new RuntimeException("retryable error"); 
        } 
        return aLong * 100; 
       } 
      }).retry(2); 
     } 
    }).forEach(new Action1<Long>() { 
     @Override 
     public void call(Long aLong) { 
      System.out.println(aLong); 
     } 
    }); 

// Desired output: 100, 200, 300 (not 100, 100, 200, 300) 

问题:

  • 重试逻辑是非常啰嗦。
  • 如果任何项目在2次重试后失败,则流被打破(不再处理更多项目)。我想要一个干净的方式来返回像Finagle的Try这样的异常和结果,所以我可以处理所有异常。

回答

1

重试逻辑真的很冗长。

您可以完全通过切换到Observable.just(t1, t2, t3)构造函数来避免使用ImmutableList。这基本上做同样的事情,但不太详细。

我看到你flatMapping为了将每个值转换为Observable。这可以防止映射单个值以取消订阅整个链时遇到的onError。所以当一个运算符抛出时,它将会退订这个值的内部可观察链。否则,一个错误会导致取消订阅并从主外部可观察值重新订阅。

如果你想保持这种行为,但减少锅炉板(除了明显切换到Java8 lambda表达式),我可以想出2个选择。

首先,重新登录和重复数据删除您的数据后重试。如果您的值具有良好的hashcodeequals实现,则只有当该集合尚未包含该值时,才可以使用过滤器追加到有状态集和onNext。

Observable.<Long> just(1L, 2L, 3L) 
     .map(new Func1<Long, Long>() { 
      @Override 
      public Long call(Long aLong) { 
       if (aLong == 2 && errCount.getAndIncrement() < 1) { 
        throw new RuntimeException("retryable error"); 
       } 
       return aLong * 100; 
      }}) 
     .retry(2) 
     .filter(new Func1<Long, Boolean>() { 
      Set<Long> state = null; 

      @Override 
      public Boolean call(Long a) { 
       if (state == null) 
        state = new HashSet<Long>(); 
       if (!state.contains(a)) { 
        state.add(a); 
        return true; 
       } 
       return false; 
      }}) 
     .forEach(new Action1<Long>() { 
      @Override 
      public void call(Long aLong) { 
       System.out.println(aLong); 
      }}); 

其次,你可以从何时重新订阅它离开的地方切换你观察到的,以简历。请注意,当使用缓冲区的操作符(observeOn,merge,flatMap)时,这可能会导致数据丢失问题。这是因为他们将继续以与下游消费者脱钩的方式消费。所以你要确保在重试之前不要缓冲。如果您正在实施支持背压的可观察源,还需要考虑其他因素。

// Should resume right where it left off 
resumableObservable.map(...).retry(2).observeOn() 

// Don't do this. ObserveOn will buffer values and resume will lose data. 
resumableObservable.map(...).observeOn().retry(2) 

// Also bad if running async observables. Merging buffers so this could have data loss. 
Observable.merge(resumableObservable.map(...)).retry(2) 

如果任何一项失败后2个重试,流断开(没有更多的项目被处理)。我想要一个干净的方式来返回像Finagle的Try这样的异常和结果,所以我可以处理所有异常。

您可以将不可靠的地图从Long -> Long更改为Long -> Tuple<Long, List<Exception>>。由于这是相当多的泛型,并且很快变得麻烦,我建议使用重试运算符的不同变体,即retryWhen(Func1<Observable<Throwable>, Observable<?>>)。这里是一个如何在你的代码中使用它的例子。

}).retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>(){ 
@Override 
public Observable<?> call(Observable<? extends Throwable> o) { 
    final AtomicInteger count = new AtomicInteger(); 
    return o.filter(new Func1<Throwable, Boolean>() { 
     @Override 
     public Boolean call(Throwable t) { 
      return t instanceof RuntimeException || count.getAndIncrement() < 5; 
     }}).delay(1, TimeUnit.SECONDS, Schedulers.immediate()); 
}}) 

使用重试的好处是,您可以在非阻塞样式的一段时间后轻松实现延迟重试。