2017-04-16 83 views
5

我有以下代码:RxJava:观察到的,默认的线程

Observable.create(new ObservableOnSubscribe<String>() { 
      @Override 
      public void subscribe(@NonNull final ObservableEmitter<String> s) throws Exception { 
       Thread thread = new Thread(new Runnable() { 
        @Override 
        public void run() { 
         s.onNext("1"); 
         s.onComplete(); 
        } 
       }); 
       thread.setName("background-thread-1"); 
       thread.start(); 
      } 
     }).map(new Function<String, String>() { 
      @Override 
      public String apply(@NonNull String s) throws Exception { 
       String threadName = Thread.currentThread().getName(); 
       logger.logDebug("map: thread=" + threadName); 
       return "map-" + s; 
      } 
     }).subscribe(new Observer<String>() { 
      @Override 
      public void onSubscribe(Disposable d) {} 

      @Override 
      public void onNext(String s) { 
       String threadName = Thread.currentThread().getName(); 
       logger.logDebug("onNext: thread=" + threadName + ", value=" + s); 
      } 

      @Override 
      public void onError(Throwable e) {} 

      @Override 
      public void onComplete() { 
       String threadName = Thread.currentThread().getName(); 
       logger.logDebug("onComplete: thread=" + threadName); 
      } 
     }); 

而这里的输出:

map: thread=background-thread-1 
onNext: thread=background-thread-1, value=map-1 
onComplete: thread=background-thread-1 

重要的细节:我打电话从另一个线程subscribe方法(main线程在Android中)。

所以看起来像Observable类是同步的,在默认情况下,它发出事件(s.onNext)在同一个线程执行的一切(运营商如map +通知用户),对不对?我想知道......它是有意的行为还是我误解了某些东西?其实我期望至少onNextonComplete回调将在调用者的线程上被调用,而不是在发送事件的那个上。我是否正确理解在这种特殊情况下实际的调用者线程无关紧要?至少在异步生成事件时。

另一个值得关注的 - 如果我收到了一些可观察到从一些外部源的参数(即我不产生对我自己)......有没有办法对我来说它的用户检查其是否是同步或异步,我只需明确指定我想通过subscribeOnobserveOn方法接收回调的位置,对吧?

谢谢!

回答

4

RxJava是unopinionated关于并发。如果您不使用observeOn/subscribeOn等任何其他mechanisem,它将在订阅线程上产生值。请不要在运营商中使用Thread等低级构造,否则可能会破坏合同。

由于使用线程时,onNext将从调用线程调用(“后台线程1”)。订阅发生在调用(UI-Thread)上。链中的每个运算符都将从'background-thread-1'调用线程调用。订阅onNext也将从'background-thread-1'被调用。

如果要产生不上调用线程使用的值:subscribeOn。如果你想把线程切换回主要用途observeOn链中的某个地方。最有可能在订阅它之前。

实施例:

Observable.just(1,2,3) // creation of observable happens on Computational-Threads 
      .subscribeOn(Schedulers.computation()) // subscribeOn happens only once in chain. Last will overwrite 
      .map(integer -> integer) // map happens on Computational-Threads 
      .observeOn(AndroidSchedulers.mainThread()) // Will switch every onNext to Main-Thread 
      .subscribe(integer -> { 
       // called from mainThread 
      }); 

这里是一个很好explanitation。 http://tomstechnicalblog.blogspot.de/2016/02/rxjava-understanding-observeon-and.html