2017-03-04 21 views
-1

我有2个观测值。 1)同步订单(可完成)2)获取所有订单。 我想继续同步产品,直到我从后端获得所需的产品。这是每5分钟轮询后端5次以检索订单确认。在可完成的可观察点上重试

apiService 
.syncOrders() 
.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() { 
     @Override 
     public Observable<Integer> call(final Observable<? extends Void> observable) { 
      // Retry 5 more times with 5 seconds delay 
      return observable.delay(5, TimeUnit.SECONDS).zipWith(Observable.range(START_RETRY, 
                        MAX_RETRIES), 
                   new Func2<Void, Integer, Integer>() { 
                    @DebugLog 
                    @Override 
                    public Integer call(Void v, 
                         Integer integer) { 
                     return integer; 
                    } 
                   }); 
     } 
    }).andThen(apiService.streamOrders().flatMap(new Func1<List<Order>, Observable<Order>>() { 
     @Override 
     public Observable<Order> call(List<Order> orderList) { 
      return Observable.from(orderList); 
     } 
    }).filter(new Func1<Order, Boolean>() { 
     @DebugLog 
     @Override 
     public Boolean call(Order order) { 
      return order.orderRef() == orderId; 
     } 
    }).first()); 
+0

如何[retryWhen](HTTP ://reactivex.io/RxJava/javadoc/rx/Completable.html#retryWhen(rx.functions.Func1))? – tynn

回答

1

重复Completable,通常完成将不会触发andThen永远。你必须重新设计你的流程,例如运行Observable.interval 5分钟内,flatMap它的值写入第一completable并附andThen成内部流动,例如:

Observable.interval(0, 5, TimeUnit.MINUTES) 
.onBackpressureLatest() 
.flatMap(tick -> 
    apiService.syncOrders() 
    .andThen(apiService.streamOrders().flatMapIterable(list -> list)) 
    .retryWhen(error -> error.delay(5, TimeUnit.SECONDS)) 
) 
.filter(v -> ...) 
.subscribe(...); 
+0

我试了一下,但是这改变了对apiService.streamOrders()的重试。我想尽快尝试同步,只要没有从流中找到订单。 –