2017-03-14 129 views
0

我正在尝试使用Rx Java进行并行化。我按照以下方式调用5个并行使用Rx Java的方法。RxJava并行执行问题

public Map findData(param1,param2){ 

       Observable<List<DataDTO>> data1 = 
          Observable.create(s->{ 
            try { 
             s.onNext(method1(param1,param2)); 
            } 
            catch(Exception e) { 
             System.out.println("Exception happened while calculating Data"); 
             s.onError(e); 
            } 

         }); 

         Observable<List<DataDTO>> data2 = 
           Observable.create(s->{ 
             try { 
              s.onNext(method2(param1,param2)); 
             } 
             catch(Exception e) { 
              System.out.println("Exception happened while calculating Data"); 
              s.onError(e); 
             } 
          }); 


         Observable<List<DataDTO>> data3 = 
           Observable.create(s->{ 
             try { 
              s.onNext(method3(param1,param2)); 
             } 
             catch(Exception e) { 
              System.out.println("Exception happened while calculating Data"); 
              s.onError(e); 
             } 
          }); 

         Observable<List<DataDTO>> data4 = 
           Observable.create(s->{ 
             try { 
              s.onNext(method3(param1,param2)); 
             } 
             catch(Exception e) { 
              System.out.println("Exception happened while calculating Data"); 
              s.onError(e); 
             } 
          }); 

         Observable<List<DataDTO>> data5 = 
           Observable.create(s->{ 
             try { 
              s.onNext(method4(param1,param2)); 
             } 
             catch(Exception e) { 
              System.out.println("Exception happened while calculating Data"); 
              s.onError(e); 
             } 
          }); 

      Observable<List<DataDTO>> mergedBean = Observable.merge(
         data1.subscribeOn(Schedulers.io()), 
        data2.subscribeOn(Schedulers.io()), 
        data3.subscribeOn(Schedulers.io()), 
        data4.subscribeOn(Schedulers.io())); 

     Observable<List<DataDTO>> mergedBean1 = Observable.merge(mergedBean,data5.subscribeOn(Schedulers.io())); 

    mergedBean1.subscribe(s->combineDataMethod(s,beanMap), 
         e -> {throw new BusinessException(e);}); 

    System.out.println("Returning Map created beanMap="+beanMap); 

     return beanMap; 
    } 

所有的方法都是并行执行的,我正在用组合的数据获得beanMap。但问题是主线程在所有其他并行线程完成之前将beanMap返回给调用方方法。所以它返回一个空的地图给调用者。在这里实施并行呼叫的正确方式是什么?

+0

你抱怨的主线程,但不显示它运行的代码。你认为我们可以帮助你吗? –

+0

嗨Alexei,我从一个休息服务类(JAX-RS)调用findData(param1,param2)方法,我需要操作从findData方法返回的服务中的数据。另外,可观察对象中的每个方法都是数据库查找。我试图从DB平行地查找数据并尝试返回到我的服务类。我正在Tomcat容器中运行它。 – Raj

回答

1

您将需要转换的ObservableBlockingObservable

mergedBean1.toBlocking().subscribe(...); 

这将等到源可观察完成。但是,为什么不简单地返回Observable<Map>?这样你不需要明确地等待,并且可以将其与其他并行操作组合。

此外,你的代码应该是下面或多或少相当于:

Observable.just(
    Observable.defer(() -> Observable.just(method1(param1,param2))), 
    Observable.defer(() -> Observable.just(method1(param1,param2))), 
    Observable.defer(() -> Observable.just(method1(param1,param2))), 
    Observable.defer(() -> Observable.just(method1(param1,param2))), 
    Observable.defer(() -> Observable.just(method1(param1,param2))), 
    Observable.defer(() -> Observable.just(method1(param1,param2))) 
) 
.map(o -> o.subscribeOn(Schedulers.io())) 
.compose(Observable::merge) 
.doOnNext(s -> combineDataMethod(s,beanMap)) 
.doOnError(e -> System.out.println("Exception happened while calculating Data")) 
.onErrorResumeNext(e -> Observable.error(new BusinessException(e))) 
.toBlocking() 
.subscribe(); 
+0

感谢Tassos,让我试试这个 – Raj

+0

嗨@Tassos我收到一个错误,说“方法toBlocking()是未定义的类型Observable <列表>”。 – Raj

+0

感谢@Tassos它的工作。我使用了blockingSubscribe() – Raj