我正在尝试使用Rx Java进行并行化。我按照以下方式调用5个并行使用Rx Java的方法。RxJava并行执行问题
public Map findData(param1,param2){
Observable<List<DataDTO>> data1 =
Observable.create(s->{
try {
s.onNext(method1(param1,param2));
}
catch(Exception e) {
System.out.println("Exception happened while calculating Data");
s.onError(e);
}
});
Observable<List<DataDTO>> data2 =
Observable.create(s->{
try {
s.onNext(method2(param1,param2));
}
catch(Exception e) {
System.out.println("Exception happened while calculating Data");
s.onError(e);
}
});
Observable<List<DataDTO>> data3 =
Observable.create(s->{
try {
s.onNext(method3(param1,param2));
}
catch(Exception e) {
System.out.println("Exception happened while calculating Data");
s.onError(e);
}
});
Observable<List<DataDTO>> data4 =
Observable.create(s->{
try {
s.onNext(method3(param1,param2));
}
catch(Exception e) {
System.out.println("Exception happened while calculating Data");
s.onError(e);
}
});
Observable<List<DataDTO>> data5 =
Observable.create(s->{
try {
s.onNext(method4(param1,param2));
}
catch(Exception e) {
System.out.println("Exception happened while calculating Data");
s.onError(e);
}
});
Observable<List<DataDTO>> mergedBean = Observable.merge(
data1.subscribeOn(Schedulers.io()),
data2.subscribeOn(Schedulers.io()),
data3.subscribeOn(Schedulers.io()),
data4.subscribeOn(Schedulers.io()));
Observable<List<DataDTO>> mergedBean1 = Observable.merge(mergedBean,data5.subscribeOn(Schedulers.io()));
mergedBean1.subscribe(s->combineDataMethod(s,beanMap),
e -> {throw new BusinessException(e);});
System.out.println("Returning Map created beanMap="+beanMap);
return beanMap;
}
所有的方法都是并行执行的,我正在用组合的数据获得beanMap。但问题是主线程在所有其他并行线程完成之前将beanMap返回给调用方方法。所以它返回一个空的地图给调用者。在这里实施并行呼叫的正确方式是什么?
你抱怨的主线程,但不显示它运行的代码。你认为我们可以帮助你吗? –
嗨Alexei,我从一个休息服务类(JAX-RS)调用findData(param1,param2)方法,我需要操作从findData方法返回的服务中的数据。另外,可观察对象中的每个方法都是数据库查找。我试图从DB平行地查找数据并尝试返回到我的服务类。我正在Tomcat容器中运行它。 – Raj