2016-10-25 34 views
0

一个rxjava流我有下面的代码片段:如何简化与超时

Observable.just(1) 
    .flatMap(-> doSomething1) 
    .timeout(10, SECONDS) 
    .flatMap(-> doSomething2) 
    .timeout(10, SECONDS) 
    .flatMap(-> doSomething3) 
    .timeout(10, SECONDS) 
    .flatMap(-> doSomething4) 
    .timeout(10, SECONDS) 
    .subscribe(); 

我不想重复自己,每flatMap添加timeout之后。我的第一个想法是只将timeout应用于流的开头或结尾,但这不是我想要的行为,因为它只对更接近的可观察对象应用超时。

Observable.just(1) 
    .flatMap(-> doSomething1) 
    .flatMap(-> doSomething2) 
    .flatMap(-> doSomething3) 
    .flatMap(-> doSomething4) 
    .timeout(10, SECONDS) 
    .subscribe(); 

Observable.just(1) 
    .timeout(10, SECONDS) 
    .flatMap(-> doSomething1) 
    .flatMap(-> doSomething2) 
    .flatMap(-> doSomething3) 
    .flatMap(-> doSomething4) 
    .subscribe(); 

doSomethingX函数执行随叫随到一些代码,可以在返回的下一个观察到其本身并不需要被包装成一个超时之前需要一段时间。

这怎么能改进?

更新:

下面更实际的例子。理想是组成一个流,我可以在失败或超时的情况下重试。我正在模拟其中一名操作员超时一次的情况,但正在重试。

@Test 
public void streamToBeSimplified() throws Exception { 
    final AtomicBoolean retry = new AtomicBoolean(true); 

    Action1<Object> print = new Action1<Object>() { 
     @Override 
     public void call(Object o) { 
      System.out.println(" >>>" + o); 
     } 
    }; 

    Observable.just(1) 
      .doOnNext(print) 
      .flatMap(new Func1<Integer, Observable<Integer>>() { 
       @Override 
       public Observable<Integer> call(Integer integer) { 
        return Observable.just(2); 
       } 
      }) 
      .timeout(1, TimeUnit.SECONDS) 
      .doOnNext(print) 
      .flatMap(new Func1<Object, Observable<Integer>>() { 
       @Override 
       public Observable<Integer> call(Object o) { 

        if(retry.getAndSet(false)) { 
         try { 
          Thread.sleep(2000L); 
         } catch (InterruptedException e) { 
          e.printStackTrace(); 
         } 
        } 
        return Observable.just(3); 
       } 
      }) 
      .timeout(1, TimeUnit.SECONDS) 
      .doOnNext(print) 
      .retry(2) 
      .subscribe(); 

} 
+0

是doSomethingX一个可观察或方法调用中doSomethingX()? –

+0

我已经添加了一个更好的示例 –

回答

1

您可以创建一个辅助方法是这样的:

private Observable doThings() { 
    return Observable.just(1) 
     .flatMap(__ -> withTimeout(doSomething1, 10, TimeUnit.SECONDS)) 
     .flatMap(__ -> withTimeout(doSomething2, 10, TimeUnit.SECONDS)); 
     // etc 
} 

private static <T> Observable<T> withTimeout(Observable<T> observable, long time, TimeUnit timeUnit) { 
    return observable 
      .timeout(time, timeUnit); 
} 
+0

添加了一个更好的示例。 'flatMap'实际上收到一个'Func1',所以没有'Observable'来封装。也许我应该改变我对这个作品的思考方式。 –