2014-10-17 14 views
1

我试图从源可观察到的情况将其拆分为两个观察对象。然后把这个消息拉下来。refCount:在连接“n”观察者之后调用底层ConnectObservable.connect()

 
       | 
     +--- A ---+ 
     |   | 
     V   V 
     B   C 
     |   | 
     +-> zip <-+ 
       | 
       V 

从一个代码点:

public Observable<Integer> doTheDev(Observable<Integer> A) { 

    // share() == publish().refCount(); 
    Observable<Integer> bridge = A.share().subscribeOn(Schedulers.computation()); 

    Observable<Integer> B = bridge.count(); 
    Observable<Integer> C = bridge.sum(); 

    return Observable.zip(B, C, (b, c) -> b + c); 
} 

作为RxJava documentation show:一旦第一可观察订阅,refCount()将调用connect()底层ConnectableObservable上。

是否有可能通过某种方式等待第n位可观察到的订阅呼叫connect()? (然后不要错过活动)

回答

0

我尝试这样做。不知道(还)是否存在更好,更简单的解决方案。

我写了一个新的操作符,其功能与refCount完全相同(实际上,我复制了refCount源代码...),但是在第一个订阅者上订阅时,它只连接第n个订阅者。

我只在我的新运营商的call方法改变这部分代码:

if (id >= numberOfSub) { 
    connect(t); 
} 

所以,在我的应用程序,我可以用它如下:

private static Observable<Double> averageOf(Observable<Double> data, Scheduler scheduler) { 

    ConnectableObservable<? extends Double> hot = data 
      .publish(); 

    // call the underlying connect() after the 2nd subscription 
    Observable<Double> bridge = Observable.create(new OnSubscribeRefCountN<>(hot, 2)) 
      .subscribeOn(scheduler); 

    Observable<Double> count = bridge.count() 
      .map(Integer::doubleValue); 

    Observable<Double> sum = bridge.reduce(0.0, (seed, e) -> seed + e); 

    return Observable.zip(sum, count, (s, c) -> s/c); 
} 

你可以在这里找到OnSubscribeRefCountN代码:https://github.com/bric3/demo-rxjava-humantalk/blob/d6227559b36fcafc08c0d8b894584400694d9a15/src/main/java/demo/humantalk/rxjava/operators/OnSubscribeRefCountN.java#L69