一个rxjava流我有下面的代码片段:如何简化与超时
Observable.just(1)
.flatMap(-> doSomething1)
.timeout(10, SECONDS)
.flatMap(-> doSomething2)
.timeout(10, SECONDS)
.flatMap(-> doSomething3)
.timeout(10, SECONDS)
.flatMap(-> doSomething4)
.timeout(10, SECONDS)
.subscribe();
我不想重复自己,每flatMap
添加timeout
之后。我的第一个想法是只将timeout
应用于流的开头或结尾,但这不是我想要的行为,因为它只对更接近的可观察对象应用超时。
Observable.just(1)
.flatMap(-> doSomething1)
.flatMap(-> doSomething2)
.flatMap(-> doSomething3)
.flatMap(-> doSomething4)
.timeout(10, SECONDS)
.subscribe();
Observable.just(1)
.timeout(10, SECONDS)
.flatMap(-> doSomething1)
.flatMap(-> doSomething2)
.flatMap(-> doSomething3)
.flatMap(-> doSomething4)
.subscribe();
的doSomethingX
函数执行随叫随到一些代码,可以在返回的下一个观察到其本身并不需要被包装成一个超时之前需要一段时间。
这怎么能改进?
更新:
下面更实际的例子。理想是组成一个流,我可以在失败或超时的情况下重试。我正在模拟其中一名操作员超时一次的情况,但正在重试。
@Test
public void streamToBeSimplified() throws Exception {
final AtomicBoolean retry = new AtomicBoolean(true);
Action1<Object> print = new Action1<Object>() {
@Override
public void call(Object o) {
System.out.println(" >>>" + o);
}
};
Observable.just(1)
.doOnNext(print)
.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
return Observable.just(2);
}
})
.timeout(1, TimeUnit.SECONDS)
.doOnNext(print)
.flatMap(new Func1<Object, Observable<Integer>>() {
@Override
public Observable<Integer> call(Object o) {
if(retry.getAndSet(false)) {
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return Observable.just(3);
}
})
.timeout(1, TimeUnit.SECONDS)
.doOnNext(print)
.retry(2)
.subscribe();
}
是doSomethingX一个可观察或方法调用中doSomethingX()? –
我已经添加了一个更好的示例 –