2016-12-23 77 views
2

我有一个SyncService。RxJava - 并行执行两个API调用

@Override 
public int onStartCommand(Intent intent, int flags, final int startId) { 
    Timber.i("Starting sync..."); 
    ... 
    RxUtil.unsubscribe(mSubscription); 
    mSubscription = mDataManager.syncEvents() 
      .subscribeOn(Schedulers.io()) 
      .subscribe(new Observer<Event>() { 
       @Override 
       public void onCompleted() { 
        Timber.i("Synced successfully!"); 
        stopSelf(startId); 
       } 

       @Override 
       public void onError(Throwable e) { 
        Timber.w(e, "Error syncing."); 
        stopSelf(startId); 

       } 

       @Override 
       public void onNext(Event event) { 
       } 
      }); 

    return START_STICKY; 
} 

可观察事件= mDataManager.syncEvents()是一个API调用。

我想要做一个并行呼叫:

单USERINFO = mDataManager.getUserInfo()

和呼叫stopSelf(startId);这两个电话完成后。

我该怎么办?

我试过RxJava Fetching Observables In Parallel 但这有点不同。

我想我必须使用.zip或.merge方法。但在我的情况下,一个方法调用返回Observable(事件列表)和第二个Single(一个UserInfo对象)。

我创造这可能是.ZIP方法的结果Z的结果类,但我不知道如何来填补它:

public class SyncResponse { 
    List<Event> events; 
    UserInfo userInfo; 
    ... 
} 

回答

0

您可以使用observable.zip和single.toObservable做到这一点。我相信您还需要在每个可观察值的zip调用中分别进行线程调度,以便并行执行它们。

Observable.zip(call1.subscribeOn(Schedulers.io()), call2.toObservable().subscribeOn(Schedulers.io(), zip-function) 

由于两个调用都是不同的包装类型。

拉链功能是您可以使用课堂来组合您的结果的地方。

需要注意的是,由于你的一个observable是一个单独的,所以这个带zip的方法最多只会产生一个结果,因为单个事件会在其第一个onNext事件之后发送一个onComplete事件。

如果您知道您的事件列表将会完成,您可以使用call1上的toList缓冲它们并将它们作为单个收集的事件列表发出。

0

既然你只有两个观察对象,你可以使用Android对结合它们来结合结果。

mDataManager.syncEvents() 
    .zipWith(mDataManager.getUserInfo().toObservable(), Pair::create) 
    .subscribeOn(Schedulers.io()) 
    .subscribe(pair -> { 
     Event event = pair.first; 
     UserInfo userInfo = pair.second; 

     // your code here 

     Timber.i("Synced successfully!"); 
     stopSelf(startId); 
    }, throwable -> { 
     Timber.w(e, "Error syncing."); 
     stopSelf(startId); 
    }); 

你可以做同样的不java8/retrolmbda但为什么

如果你需要收集所有事件,直到完成,并与单用户信息结合他们所有这将是有点复杂

Observable<ArrayList<String>> eventsObservable = mDataManager.syncEvents() 
    .collect(ArrayList::new, ArrayList::add); 

mDataManager.getUserInfo() 
    .zipWith(eventsObservable.toSingle(), Pair::create) 
    .subscribeOn(Schedulers.io()) 
    .subscribe(pair -> { 
     UserInfo userInfo = pair.first; 
     List<Event> event = pair.second; 

     // your code here 

     Timber.i("Synced successfully!"); 
     stopSelf(startId); 
    }, throwable -> { 
     Timber.w(e, "Error syncing."); 
     stopSelf(startId); 
    });