2016-01-03 104 views
0

之一发射我有可观测的Observable<Integer> observable1PublishSubject<String> observable2(请参阅示例)。RxJava:组合观察对象忽略从

我的目的:

  1. 观测量并行执行;
  2. 在一个观察者中处理错误;
  3. 不在意observable1会在onNext上发出什么,只对处理onError的错误很重要,并保证observable被执行。

我需要这样的东西

observable2.mergeWith(observable1.map(i -> Integer.toString(i))) 
       .subscribe(mySubsriber); 

但是,如果没有从observable1发射。

有很好的方法来做到这一点?

样品

long time1 = 7000; 
    long time2 = 1000; 

    Observable<Integer> observable1 = Observable.create(new Observable.OnSubscribe<Integer>() { 
     @Override 
     public void call(Subscriber<? super Integer> subscriber) { 
      Handler handler = new Handler(); 
      handler.postDelayed(() -> { 
       subscriber.onNext(1); 
       subscriber.onNext(2); 
       subscriber.onNext(3); 
       subscriber.onCompleted(); 
      }, time1); 
     } 
    }).doOnNext(new Action1<Integer>() { 
     @Override 
     public void call(Integer data) { 
      cacheData(data); 
     } 
    }); 


    PublishSubject<String> observable2 = PublishSubject.create(); 
    Handler handler = new Handler(); 
    handler.postDelayed(() -> { 
     observable2.onNext("A"); 
     observable2.onNext("B"); 
     observable2.onNext("C"); 
     observable2.onCompleted(); 
    }, time2); 


Observer<String> observer = new Observer<String>() { 
      @Override 
      public void onCompleted() {     
      } 

      @Override 
      public void onError(Throwable e) { 
       //need handling errors from observable1 and observable2 
      } 

      @Override 
      public void onNext(String s) { 
       //need emitting only from observable2 
      } 
     }; 

回答