2015-05-12 61 views
7

我想异步执行2个网络调用 - 我正在使用Retrofit + RxJava来完成此操作。这个逻辑来自一个简单的Runner类来测试解决方案。注意:这主要涉及服务器端的RxJava。正确使用Retrofit + RxJava的combineLatest

我的代码如下所示:

public static void main(String[] args) throws Exception { 
    Api api = ...; 

    Observable.combineLatest(
     api.getStates(), 
     api.getCmsContent(), 
     new Func2<List<States>, CmsContent, String>() { 
     @Override public String call(List<State> states, CmsContent content) { 
      ... 
      return "PLACEHOLDER"; 
     } 
     }) 
     .observeOn(Schedulers.immediate()) 
     .subscribeOn(Schedulers.immediate()) 
     .subscribe(new Observer<String>() { 
     @Override public void onCompleted() { 
      System.out.println("COMPLETED"); 
     } 

     @Override public void onError(Throwable e) { 
      System.out.println("ERROR: " + e.getMessage()); 
     } 

     @Override public void onNext(String s) { 
      // I don't care what's returned here 
     } 
     }); 
} 

三个问题:

  1. Observable.combineLatest使用时要执行多个REST异步调用,并继续在所有呼叫都完成了最好的操作?我的Func2实现当前返回String。在执行2个API调用之后,我将在Func2#call()方法中处理结果。我不在乎什么是返回 - 但是,我必须有更好的方法来处理这个问题 - 我是否正确?
  2. 使用上面的代码正确执行API调用。但是当我运行该程序时,main方法没有用正确的Process finished with exit code 0来完成。什么可能导致代码挂起?

更新 - 2015年5月14日

基础上的建议,我已经改变了逻辑如下:

public static void main(String[] args) throws Exception { 
    Api api = ...; 

    Observable.zip(
     api.getStates(), 
     api.getCmsContent(), 
     new Func2<List<States>, CmsContent, Boolean>() { 
     @Override public Boolean call(List<State> states, CmsContent content) { 
      // process data 
      return true; 
     } 
     }) 
     .subscribeOn(Schedulers.io()) 
     .toBlocking() 
     .first(); 
} 

这看起来像解决我是寻找。我打算用一段时间来看看是否遇到麻烦。

回答

1

1)没有最好的是使用zip()。如果两个(或更多)apis中的一个返回“较慢”的不同结果/它具有缓存的本质,则最新结合是很好的。

2)Fun2有助于合并结果。在onNext()或onError()中处理结果更好(体系结构)。您可以使用简单的Pair<T,Y>类将结果从Func2传递到onNext()。

3)没有错。上述结果应该在onNext()中处理,而不是在onComplete中处理。根据Retrofit's source code,结果仅在onNext()中传递(当然是正确的)。

希望那些帮助。

5

1)如果你知道你在两条路径上都会有一个值,那么它就好比zip

2)您想要做什么?您会在Func2中获得一对值,如果您不真正关心与onNext一起旅行的内容,请返回您选择的值。

3)Schedulers.immediate()在某种意义上不是真正的调度程序,而且很容易出现相同的池死锁情况。你真的不需要使用它。如果要在完成异步工作之前阻止主线程,请使用例如toBlocking().first()

+0

谢谢,我已经使用您的建议更新了我的代码示例。 – Kasa

1

我意识到,我很担心晚了一年就这一点,但发表在2015年5月14日的OP编辑不符合他原来的规定:

我想执行2个网络电话异步

  1. 观测量getStatesgetCmsContent,除非他们在单独的线程分别认购将不会同时执行。这是他在帖子中省略的一个关键点,以前的答案都没有提到。

    Observable.fromCallable(() -> doStuff()) 
        .subscribeOn(Schedulers.computation()); 
    

正如@akarnokd说,在情况下,两个流具有单个值,zipcombineLatest行为类似。合并函数将阻塞,直到getStatesgetCmsContent都返回,但像上面显示的那样,它们中的每一个都在单独的线程上同时执行。

  • 另一种解决方案依赖于合并List<States>CmsContent到达时的能力。考虑到他的代码,显然有某种“数据持有者”(未显示),因为他返回的是Boolean,而不是合并数据。以下,forEach同时执行。

    Observable.just(api.getStates(), api.getCmsContent()) 
    // subscribe on separate thread as shown previously 
    .flatMap(this::buildObservable) 
    .toBlocking() 
    // executes concurrently 
    .forEach(item -> { 
        // merge into "data holder"   
    }); 
    
  • 当然,该代码具有不被强类型所以这是一个作出选择的问题。

    相关问题