我有以下代码:RxJava:观察到的,默认的线程
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull final ObservableEmitter<String> s) throws Exception {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
s.onNext("1");
s.onComplete();
}
});
thread.setName("background-thread-1");
thread.start();
}
}).map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
String threadName = Thread.currentThread().getName();
logger.logDebug("map: thread=" + threadName);
return "map-" + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {}
@Override
public void onNext(String s) {
String threadName = Thread.currentThread().getName();
logger.logDebug("onNext: thread=" + threadName + ", value=" + s);
}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {
String threadName = Thread.currentThread().getName();
logger.logDebug("onComplete: thread=" + threadName);
}
});
而这里的输出:
map: thread=background-thread-1
onNext: thread=background-thread-1, value=map-1
onComplete: thread=background-thread-1
重要的细节:我打电话从另一个线程subscribe
方法(main
线程在Android中)。
所以看起来像Observable
类是同步的,在默认情况下,它发出事件(s.onNext
)在同一个线程执行的一切(运营商如map
+通知用户),对不对?我想知道......它是有意的行为还是我误解了某些东西?其实我期望至少onNext
和onComplete
回调将在调用者的线程上被调用,而不是在发送事件的那个上。我是否正确理解在这种特殊情况下实际的调用者线程无关紧要?至少在异步生成事件时。
另一个值得关注的 - 如果我收到了一些可观察到从一些外部源的参数(即我不产生对我自己)......有没有办法对我来说它的用户检查其是否是同步或异步,我只需明确指定我想通过subscribeOn
和observeOn
方法接收回调的位置,对吧?
谢谢!