可以说我有两个Observable A和B. 当服务连接到我的程序时,A发出true。如果服务启动,则B发出true,否则为false。Observable的占位符?
我想使用combineLatest结合A和B,然后采取适当的行动。
问题是B在我连接到服务之前不可用。所以我的问题是,如果我可以有B的Observable-wrapper和占位符,直到B可用?然后,当B可用时,包装应该发出来自B的值。
可以说我有两个Observable A和B. 当服务连接到我的程序时,A发出true。如果服务启动,则B发出true,否则为false。Observable的占位符?
我想使用combineLatest结合A和B,然后采取适当的行动。
问题是B在我连接到服务之前不可用。所以我的问题是,如果我可以有B的Observable-wrapper和占位符,直到B可用?然后,当B可用时,包装应该发出来自B的值。
我想出了一个简单的解决方案。
我创建了一个主题C,这样
BehaviorSubject<Boolean> C = BehaviorSubject.create(false);
通过这样做,我能够像这样
Observable.combineLatest(A, C, bla bla....
然后当B变得可用我做
B.subcribe(C);
也许你可以使用delaySubscription来实现。这里有一个例子:
ExecutorService executorService = Executors.newCachedThreadPool();
Future<Void> future = executorService.submit(() -> {
Thread.sleep(3000);
//service started
return null;
});
Observable<Void> oFuture = Observable.from(future, Schedulers.newThread());
Observable<Long> oA = Observable.interval(1, TimeUnit.SECONDS);
Observable<Long> oB = Observable.interval(100, TimeUnit.MILLISECONDS);
Observable<Long> delayedObservable = oB.delaySubscription(new Func0<Observable<Void>>() {
@Override public Observable<Void> call() {
return oFuture;
}
});
Observable<String> stringObservable = Observable.combineLatest(oA, delayedObservable, (i1, i2) -> i1 + "-" + i2);
executorService.submit(() -> {
stringObservable.forEach(System.out::println);
});
Thread.sleep(10000);
executorService.shutdownNow();
你真正说的是B依赖于A.大多数情况下,当你有一个依赖于anoth的观察值时呃你可以使用.flatMap()来表达这种关系。
Observable<Boolean> createObservableA() {
return ...; // Same as you currently have for creating Observable A.
}
Observable<Boolean> createObservableB() {
return createObservableA()
.flatMap(serviceConnected -> {
if (!serviceConnected) {
// Don't emit anything to subscribers yet.
return Observable.empty();
}
return ...; // Service is now connected create Observable B.
});
}
谢谢你,我想出了一个简单的解决方案,看到了自己的答案。 – 2014-11-03 18:33:57