2017-10-05 180 views
0

我有可观察的链条,它在顶部有无限可观察点,在它之后有无限可观察点。像这样:RxJava 2无限可观的力量完整链条

repo.infinitGetItems() 
     .switchMap(items -> Observable 
          .just(items) 
          .flatMap(items -> repo.nonInfinitObs(items))); 

我想要的是当repo.nonInfinitObs发送onComplete事件时完成所有链。现在它还没有完成,因为repo.infinitGetItems()是活着的。

我可以在rxJava2中完成整个链的强制完成吗?

在此先感谢!

回答

1

您可以通过takeUntil和一些流外部手段停止主序:

PublishSubject<Integer> stop = PublishSubject.create(); 

repo.infinitGetItems() 
    .takeUntil(stop) 
    .switchMap(items -> repo.nonInfinitObs(items) 
         .doOnComplete(() -> stop.onComplete()) 
); 
+0

谢谢!会试试看。 –

+0

它就像一个魅力!谢谢你的时间和回答:) –