2014-06-18 189 views
7

我想使用rxjava来构建一个示例。该示例应编排一个ReactiveWareService和一个ReactiveReviewService,以重新调用WareAndReview组合。如何等待异步Observable来完成

ReactiveWareService 
     public Observable<Ware> findWares() { 
     return Observable.from(wareService.findWares()); 
    } 

ReactiveReviewService: reviewService.findReviewsByItem does a ThreadSleep to simulate a latency! 

public Observable<Review> findReviewsByItem(final String item) { 
return Observable.create((Observable.OnSubscribe<Review>) observer -> executor.execute(() -> { 
    try { 
     List<Review> reviews = reviewService.findReviewsByItem(item); 
     reviews.forEach(observer::onNext); 
     observer.onCompleted(); 
    } catch (Exception e) { 
     observer.onError(e); 
    } 
})); 
} 

public List<WareAndReview> findWaresWithReviews() throws RuntimeException { 
final List<WareAndReview> wareAndReviews = new ArrayList<>(); 

wareService.findWares() 
    .map(WareAndReview::new) 
.subscribe(wr -> { 
     wareAndReviews.add(wr); 
     //Async!!!! 
     reviewService.findReviewsByItem(wr.getWare().getItem()) 
      .subscribe(wr::addReview, 
       throwable -> System.out.println("Error while trying to find reviews for " + wr) 
      ); 
    } 
); 

//TODO: There should be a better way to wait for async reviewService.findReviewsByItem completion! 
try { 
    Thread.sleep(3000); 
} catch (InterruptedException e) {} 

return wareAndReviews; 
} 

鉴于我不想回可观察到的,我怎么能等待异步观察,(findReviewsByItem)来完成?

回答

-3

另一种方法是在开始之前声明一个CountdownLatch。然后在onCompleted()中的那个latch上调用countDown()。然后,您可以用该闩锁上的await()替换Thread.sleep()。

public class Example { 

    Scheduler scheduler = Schedulers.from(executor); 

    public Observable<Review> findReviewsByItem(final String item) { 
     return Observable.just(item) 
       .subscribeOn(scheduler) 
       .flatMapIterable(reviewService::findReviewsByItem); 
    } 
    public List<WareAndReview> findWaresWithReviews() { 
     return wareService 
       .findWares() 
       .map(WareAndReview::new) 
       .flatMap(wr -> { 
        return reviewService 
          .findReviewsByItem(wr.getWare().getItem()) 
          .doOnNext(wr::addReview) 
          .lastOrDefault(null) 
          .map(v -> wr); 
       }) 
       .toList() 
       .toBlocking() 
       .first(); 
    } 
} 

每当你想组合服务这样的,想到的第一flatMap

12

你大部分的例子可以用标准RxJava运营商一起工作的顺利进行重写。如果真的有必要,你不需要为每个子Observable进行阻塞,但只需要在最后使用toBlocking()