我们有微服务架构,我们通过网络进行服务间呼叫。 我们在顶级服务中使用RxJava,这导致创建大量的并行请求到底层服务。 正因为如此,我得到了“无路由到主机错误”或“连接错误”。 为此,我想减慢RxJava Observable的排放,以便在创建新连接之前先关闭连接。 下面是示例代码:如何延迟RxJava中的可观察发射
package com.demo.rxjava.rxjaxa.creation;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
public class Delay {
public static void main(String[] args) throws InterruptedException {
Observable.just(1, 2, 3, 4, 5).subscribeOn(Schedulers.io())
.flatMap(integer -> {
return function1(integer);
}).observeOn(Schedulers.io())
.subscribe(new Subscriber<String>() {
@Override
public void onNext(String item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
}
public Observable<String> function1(String id) {
// This is where we make network call
Observable<Response> response = Rx.newClient(RxObservableInvoker.class)
.target("http://example.com/resource")
.request()
.queryParam("id", id)
.rx()
.get();
response.obserOn(Schedulers.from(threadExecutor)).flatMap(response->{
return response.extractResponse();
});
}
}
这只会延迟发射的开始,我想延迟每个特定间隔的可观测发射。 –
查看我的新回复 – paul
它可以工作,但是在生产代码中它给出了缺少背压的错误。 –