2016-09-03 29 views
6

我看到ReactiveX(RxJava)具有操作timeout,这将适用于每一个项目在订阅流检查的第一个响应的项目​​。但我只想用超时检查第一个响应,而不关心以下响应的超时。我如何用RxJava的操作员优雅地实现这个要求?RxJava只有超时

回答

2

一种方式做到这一点如下:

Observable<Response> respStream = respStream(); 
ConnectableObservable<Response> sharedRespStream = respStream.publish(); 

Observable<String> first = sharedRespStream.first().timeout(2, TimeUnit.SECONDS); 
Observable<String> rest = sharedRespStream.skip(1); 
Observable<String> result = first.mergeWith(rest); 

sharedRespStream.connect(); 

result.subscribe(response -> handleResponse(response), error -> handleError(error)); 

的代码是自我解释:份额应避免重复请求,申请超时发出的第一个项目,并与项目下列第一个合并。

3

这是一个更实用的方法。这是Scala中,但应该被转录到Java:

val myTimeout : Observable[Nothing] = Observable timer (10 seconds) flatMap (_ => Observable error new TimeoutException("I timed out!")) 

myStream amb myTimeout 

amb操作符返回发出第一可观察的价值。

0

最好的选择是使用一个timeout overload,它返回每个项目的超时观察值,并且还有一个用于订阅(这是你感兴趣的)。

observable.timeout(() -> Observable.empty() 
      .delay(10, TimeUnit.SECONDS), o -> Observable.never()) 

我会解释,第一func0将在订阅运行,并会发出空观察到的(发射完成)由您希望的时间延迟。 如果在任何物品到达之前的时间过去了,就会出现你想要的超时时间。 第二个参数FUNC1将决定项目,你有没有用之间的超时的,所以我们只是传递永不(不完整或做任何事情)

另一种选择是 以下卢西亚诺的建议,你可以做这样的:

public static class TimeoutFirst<T> implements Transformer<T,T> { 

    private final long timeout; 
    private final TimeUnit unit; 

    private TimeoutFirst(long timeout, TimeUnit unit) { 
     this.timeout = timeout; 
     this.unit = unit; 
    } 

    @Override 
    public Observable<T> call(Observable<T> observable) { 
     return Observable.amb(observable, 
       Observable.timer(timeout, unit).flatMap(aLong -> Observable.error(new TimeoutException("Timeout after " + timeout + " " + unit.name())))); 
    } 
} 

public static <T> Transformer<T, T> timeoutFirst(long timeout, TimeUnit seconds) { 
    return new TimeoutFirst<>(timeout, seconds); 
} 

这是一个非常漂亮的使用amb的解决方案。