2016-07-02 38 views
1

我们有微服务架构,我们通过网络进行服务间呼叫。 我们在顶级服务中使用RxJava,这导致创建大量的并行请求到底层服务。 正因为如此,我得到了“无路由到主机错误”或“连接错误”。 为此,我想减慢RxJava Observable的排放,以便在创建新连接之前先关闭连接。 下面是示例代码:如何延迟RxJava中的可观察发射

package com.demo.rxjava.rxjaxa.creation; 
    import rx.Observable; 
    import rx.Subscriber; 
    import rx.schedulers.Schedulers; 

    public class Delay { 

     public static void main(String[] args) throws InterruptedException { 
      Observable.just(1, 2, 3, 4, 5).subscribeOn(Schedulers.io()) 
        .flatMap(integer -> { 
         return function1(integer); 
        }).observeOn(Schedulers.io()) 
        .subscribe(new Subscriber<String>() { 
         @Override 
         public void onNext(String item) { 
          System.out.println("Next: " + item); 
         } 

         @Override 
         public void onError(Throwable error) { 
          System.err.println("Error: " + error.getMessage()); 
         } 

         @Override 
         public void onCompleted() { 
          System.out.println("Sequence complete."); 
         } 
        }); 
     } 

    public Observable<String> function1(String id) { 
       // This is where we make network call 
       Observable<Response> response = Rx.newClient(RxObservableInvoker.class) 
         .target("http://example.com/resource") 
         .request() 
         .queryParam("id", id) 
         .rx() 
         .get(); 
       response.obserOn(Schedulers.from(threadExecutor)).flatMap(response->{ 
        return response.extractResponse(); 
       }); 
    } 
} 

回答

0

为了耽误你可以使用压缩并结合在你的第一个Observable.from发出的每一个项目去的X时间间隔的特定步骤。

/** 
* If we want to delay the every single item emitted in the pipeline we will need a hack, 
* one possible hack is use zip operator and combine every item emitted with an interval so every item emitted has to wait until interval emit the item. 
*/ 
@Test 
public void delay() { 
    long start = System.currentTimeMillis(); 
    Subscription subscription = Observable.zip(Observable.from(Arrays.asList(1, 2, 3)), Observable.interval(200, TimeUnit.MILLISECONDS), (i, t) -> i) 
              .subscribe(n -> System.out.println("time:" + (System.currentTimeMillis() - start))); 
    new TestSubscriber((Observer) subscription).awaitTerminalEvent(3000, TimeUnit.MILLISECONDS); 
} 

这将打印在这里

time:537 
    time:738 
    time:936 

更多practicle例子https://github.com/politrons/reactive

+0

这只会延迟发射的开始,我想延迟每个特定间隔的可观测发射。 –

+0

查看我的新回复 – paul

+0

它可以工作,但是在生产代码中它给出了缺少背压的错误。 –

0

而不是拖延你的要求,你应该有请求到底部服务发生在Scheduler限制并行活动。例如:

int maxParallel = 4; 
Scheduler scheduler = Schedulers.from(
    Executors.newFixedThreadPool(maxParallel)); 
... 
observable 
    .flatMap(x -> 
     submitToBottomService(x) 
     .subscribeOn(scheduler)) 
    .subscribe(subscriber); 

顺便说一句,你提到关闭连接。 Observable.using运算符用于关闭被动上下文中的资源(它在关闭和取消订阅时关闭资源)。如果你还没有使用它,然后看看它。

+0

我们正在Scheduler上运行底层服务调用,但是我们正在为底层服务创建异步调用,所以它在第一次调用返回结果之前创建了很多连接。 –

+0

应该使用调度程序进行异步调用,您想提供更多代码吗? –

+0

我无法分享原始代码,但使用sudo代码更新了问题 –