2017-09-14 140 views
0

我目前正在学习如何使用RxJava,并且我想要计算从数据库中获取的数据,然后异步计算它们,但要等待它们全部被计算。等待Observable订阅完成

List<Data> list = getDatasFromDatabase(); 

list.forEach(data -> compute(data)); // How can I make this async using RxJava ? 

// Here all Datas are computed 
return list; 

我怎样才能使这种阻塞:

Observable.from(list).forEach(data -> compute(data)); 

回答

0

可以使用toBlocking()运算符来创建BlockingObservable。但是,我不清楚您需要的地方或原因?如果compute()是同步方法,那么您的第一个解决方案将完成您所需的所有工作,因为它将一次处理每个项目,并在最后一个compute()完成时完成。

相反,如果你想compute()以异步方式完成,那么你可能会想:

Observable.from(list) 
    .subscribeOn(backgroundThreadScheduler) 
    .flatMap(data -> Observable.fromCallable(d -> compute(d))) 
    .toList() 
    .observeOn(foregroundScheduler) 
    .subscribe(revisedList -> doSomethingWith(revisedList)); 

如果需要doSomethingWith()同步与周围的代码来完成,你可以在subscriber()toBlocking()操作。不推荐这一步,因为最终会阻塞前台线程。