0

我需要一些帮助来实现使用RxJava2的并行异步调用& Retrofit2。 我的要求是;如何使用RxJava2和Retrofit2创建并行多个非阻塞服务请求

1)我有多个保险公司(现在我只需要两个),我需要发送多个使用该保险公司名称的并行请求。

2)如果它们中的任何一个给服务器错误,那么其余的请求不应该被阻塞。

以下是我到现在为止所尝试的;

ArrayList<String> arrInsurer = new ArrayList<>(); 
     arrInsurer.add(AppConstant.HDFC); 
     arrInsurer.add(AppConstant.ITGI); 

     RequestInterface service = getService(ServiceAPI.CAR_BASE_URL); 
     for (String insurerName : arrInsurer) { 
      service.viewQuote(Utils.getPrefQuoteId(QuoteListActivity.this), insurerName) 
        .subscribeOn(Schedulers.computation()) 
        .observeOn(AndroidSchedulers.mainThread()) 
        .subscribe(new Consumer<ViewQuoteResDTO>() { 
         @Override 
         public void accept(@NonNull ViewQuoteResDTO viewQuoteResDTO) throws Exception { 
          Log.e("Demo", viewQuoteResDTO.getPremiumData().getIDV()+""); 
          updateList(); 
         } 
        }, new Consumer<Throwable>() { 
         @Override 
         public void accept(@NonNull Throwable throwable) throws Exception { 
          Log.e("Demo", throwable.getMessage()); 
         } 
        }); 
     } 

private RequestInterface getService(String baseUrl) {  
    Gson gson = new GsonBuilder() 
      .setLenient() 
      .create(); 

    return new Retrofit.Builder() 
      .baseUrl(baseUrl)     
      .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) 
      .addConverterFactory(GsonConverterFactory.create(gson)) 
      .build().create(RequestInterface.class); 

} 

现在,上面的代码只有在两个请求都能成功响应的情况下才能正常工作。但是当任何请求作为内部服务器错误发出响应时,请求的其余部分也会被阻塞。

下面的任何一个请求给出的日志错误我得到失败响应;

E/Demo: HTTP 500 Aww Snap, Some thing happened at server. Please try back again later. 
E/Demo: unexpected end of stream on Connection{100.xxx.xxx.xx:portNo, [email protected] hostAddress=/100.xxx.xxx.xx:portNo cipherSuite=none protocol=http/1.1} 

如何处理这个错误?

回答

1

我想像任何其他Rx相关的问题,这有多个答案。我会在我们的应用程序中使用我的应用程序,并解决这个用例。希望能帮助到你。

短版本 - 这依赖于mergeDelayError。检查出here

为什么merge?因为与concat不同,它将并行执行观察值。为什么mergeDelayError?它延迟了错误...本质上它会执行每个可观察的事物并在所有事情完成时传递错误。这可以确保即使出现一个或多个错误,其他错误仍会被执行。

你必须小心一些细节。事件顺序不再保留,这意味着merge运算符可能会交织一些可观察事件(鉴于您以前如何做事,这不应该成为问题)。据我所知,即使多个可疑事件失败,你只会得到一个onError的电话。如果这两个都ok,那么你可以尝试以下方法:

List<Observable<ViewQuoteResDTO>> observables = new ArrayList<>(); 
for (String insurerName : arrInsurer) {  
    observables.add(service.viewQuote(
     Utils.getPrefQuoteId(QuoteListActivity.this), insurerName)); 
} 

Observable.mergeDelayError(observables) 
      .subscribeOn(Schedulers.computation()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(/* subscriber calls if you need them */); 

的想法是创建一个你要运行,然后使用mergeDelayError触发所有这些观测。

相关问题