我有一个基本的RxJava 2 Observable
我正在主线程上运行。然后我调用observeOn
在另一个线程上运行一些计算。然后我使用autoConnect
来订阅count
和另一个副作用操作(写出计算结果)。在这个例子中,这显示为System.out.println
,但假装这是一个需要一段时间的操作。我想避免计数完成时可能出现的争用情况,但是在另一个线程上写出计算的最后一个元素的结果并未完成。如何计算并阻止在另一个线程上执行的Observable?
要做到这一点,我创建了从0呼叫Future
,呼吁副作用的操作blockingSubscribe
,最后叫.get()
于未来,以确保在两个观察到的操作完成主线程块。有没有更好的解决方案,以避免写上最后一个元素?我甚至需要这样做吗?还是有一些保证Observables给我失踪?
还值得注意的是,我想避免重复Observable的初始元素的计算,因为这可能发生在很多昂贵的步骤之后。
我的示例代码如下所示。
Stream<Integer> ints = ImmutableList.of(1, 2, 3, 4, 5, 6)
.stream().map(i -> {
System.out.println("emitting:" + i);
return i;
});
Observable<Integer> observable = Observable.fromIterable(() -> ints.iterator())
.observeOn(Schedulers.computation())
.map(x -> x + 1)
.publish()
.autoConnect(2);
Future<Long> count = observable.count().toFuture();
// pretend that instead of printing here, this operation
// could take some time to complete
observable.blockingSubscribe(i -> System.out.println("got " + i));
Long c = count.get();
return count;
有没有另一种方法来做到这一点?我想基本上增加了一个副作用的地图调用,这基本上是这样做的,但是如果我有很多事情想要订阅第一个可观察的事物,这会很快变得麻烦,而且它不像在我看来最好的设计。 –
你是指另一个地图通话?哪里?好吧,这只是一个建议,关键是你可以有一个单一的流,你也可以有2个流,你可以需要replay()这个observable,并使用concat()来确保顺序执行。但无论如何,我认为使用阻塞码流并不理想。 – yosriz
至于最好的设计,这很大程度上取决于在这里完成的实际工作,是与计数有关的繁重工作?是与两者有关的源观察?你需要一个可观察的输出吗?无论如何,即使使用单个流,您也可以将其拆分并在应用程序的不同部分添加逻辑 – yosriz