2017-06-25 30 views
1

我有一个基本的RxJava 2 Observable我正在主线程上运行。然后我调用observeOn在另一个线程上运行一些计算。然后我使用autoConnect来订阅count和另一个副作用操作(写出计算结果)。在这个例子中,这显示为System.out.println,但假装这是一个需要一段时间的操作。我想避免计数完成时可能出现的争用情况,但是在另一个线程上写出计算的最后一个元素的结果并未完成。如何计算并阻止在另一个线程上执行的Observable?

要做到这一点,我创建了从0​​呼叫Future,呼吁副作用的操作blockingSubscribe,最后叫.get()于未来,以确保在两个观察到的操作完成主线程块。有没有更好的解决方案,以避免写上最后一个元素?我甚至需要这样做吗?还是有一些保证Observables给我失踪?

还值得注意的是,我想避免重复Observable的初始元素的计算,因为这可能发生在很多昂贵的步骤之后。

我的示例代码如下所示。

Stream<Integer> ints = ImmutableList.of(1, 2, 3, 4, 5, 6) 
    .stream().map(i -> { 
     System.out.println("emitting:" + i); 
     return i; 
    }); 

Observable<Integer> observable = Observable.fromIterable(() -> ints.iterator()) 
    .observeOn(Schedulers.computation()) 
    .map(x -> x + 1) 
    .publish() 
    .autoConnect(2); 

Future<Long> count = observable.count().toFuture(); 
// pretend that instead of printing here, this operation 
// could take some time to complete 
observable.blockingSubscribe(i -> System.out.println("got " + i)); 
Long c = count.get(); 

return count; 

回答

2

,而不是创建2流可以在单一的数据流做的所有工作:

Observable<Integer> observable = Observable.fromIterable(() -> ints.iterator()) 
     .observeOn(Schedulers.computation()) 
     .map(x -> x + 1); 

observable.doOnNext(integer -> { 
    //your heavy work here 
}) 
     .count() 
     .subscribe(aLong -> { 
      //do something with the final count 
     }); 

使用副作用doOnNext()做大量的计算,并且在后count()运营商订阅,做最后一点的东西。 这会保证你的执行顺序,通知不会同时发生,这是Observable合同的一部分,每个doOnNext()都会连续发生,出于同样的原因,最后onNext()在用户处会发生最后一次,因为count()运营商是基于在源ObservableonComplete通知)完成时,因此将在所有物品被发射并在doOnNext()处理之后发生。

+0

有没有另一种方法来做到这一点?我想基本上增加了一个副作用的地图调用,这基本上是这样做的,但是如果我有很多事情想要订阅第一个可观察的事物,这会很快变得麻烦,而且它不像在我看来最好的设计。 –

+0

你是指另一个地图通话?哪里?好吧,这只是一个建议,关键是你可以有一个单一的流,你也可以有2个流,你可以需要replay()这个observable,并使用concat()来确保顺序执行。但无论如何,我认为使用阻塞码流并不理想。 – yosriz

+0

至于最好的设计,这很大程度上取决于在这里完成的实际工作,是与计数有关的繁重工作?是与两者有关的源观察?你需要一个可观察的输出吗?无论如何,即使使用单个流,您也可以将其拆分并在应用程序的不同部分添加逻辑 – yosriz

相关问题