2016-11-13 47 views
3

我对rxJava相当陌生,尝试自己的东西。我希望得到一些建议,如果我做对了。RxJava:如何等待所有订阅完成?

用例:在我的应用程序的第一次运行中,成功登录后,我必须下载并保存在本地数据库中的多个字典供应用程序运行。用户必须等到下载过程结束。

当前解决方案:我正在使用retrofit 2与rxjava适配器来获取数据。我使用zip operator将所有观察对象捆绑成一个。所有下载完成后,回调触发并保存到数据库中。

没有什么胜于一些代码更好:

Observable<List<OrderType>> orderTypesObservable = backendService.getOrderTypes(); 
Observable<List<OrderStatus>> orderStatusObservable = mockBackendService.getOrderStatuses(); 
Observable<List<Priority>> prioritiesObservable = backendService.getPriorities(); 

return Observable.zip(orderTypesObservable, 
     orderStatusObservable, 
     prioritiesObservable, 
     (orderTypes, orderStatuses, priorities) -> { 
      orderTypeDao.deleteAll(); 
      orderTypeDao.insertInTx(orderTypes); 
      orderStatusDao.deleteAll(); 
      orderStatusDao.insertInTx(orderStatuses); 
      priorityDao.deleteAll(); 
      priorityDao.insertInTx(priorities); 

      return null; 
     }); 

问题

我应该使用ZIP运营商或者是有一个更好的适合我的原因是什么?

这样做似乎有点凌乱。这只是代码的一部分,我目前有12个字典需要加载。有没有办法重构它?

我想尽快插入一个字典数据完成下载,并把它下载失败重试机制。我怎样才能做到这一点?

回答

2

我觉得在你的情况下最好使用Completable,因为你只关心任务完成。

Completable getAndStoreOrderTypes = backendService.getOrderTypes() 
    .doOnNext(types -> *store to db*) 
    .toCompletable(); 

Completable getAndStoreOrderStatuses = backendService.getOrderStatuses() 
    .doOnNext(statuses -> *store to db*) 
    .toCompletable(); 

Completable getAndStoreOrderPriorities = backendService.getOrderPriorities() 
    .doOnNext(priorities -> *store to db*) 
    .toCompletable(); 

return Completable.merge(getAndStoreOrderTypes, 
         getAndStoreOrderStatuses, 
         getAndStoreOrderPriorities); 

如果您需要串行执行 - 使用Completable.concat()代替merge()

重试机制,如果下载失败

使用起来得心应手retry()操作

1

这是不好的,扔空值对象到的Rx流(在压缩您返回null,它是坏的)。 尽量不要这样做。

你的情况,你有1 API调用和2个行动来响应保存到数据库中,这样你就可以创建flatMap链。

它看起来像:

backendService.getOrderTypes() 
.doOnNext(savingToDatabaseLogic) 
.flatMap(data -> mockBackendService.getOrderStatuses()) 
.doOnNext(...) 
.flatMap(data -> backendService.getPriorities()) 
.doOnNext(...) 

,如果你想在错误情况作出反应,特别是可观察的,你可以添加onErrorResumeNext(exception-> Observable.empty())和链将继续即使事情发生

此外,您还可以像BaseDao创造的东西,它可以保存任何DAO对象。