你好RxJava主人,RxJava更改线程后CONCAT地图
在我目前的Android项目,我遇到了一些问题僵局而与RxJava和SQLite播放。我的问题是:
- 我在一个线程
- 调用Web服务启动一个事务,并保存一些东西在数据库
- CONCAT映射另一个观察的功能
- 尝试写其他的东西在数据库上--->获得了僵局
这里是我的代码:
//define a scheduler for managing transaction in the same thread
private Scheduler mScheduler = Schedulers.from(Executors.newSingleThreadExecutor());
Observable.just(null)
/* Go to known thread to open db transaction */
.observeOn(mScheduler)
.doOnNext(o -> myStore.startTransaction())
/* Do some treatments that change thread */
.someWebServiceCallWithRetrofit()
/* Return to known thread to save items in db */
.observeOn(mScheduler)
.flatMap(items -> saveItems(items))
.subscribe();
public Observable<Node> saveItems(List<Item> items) {
Observable.from(items)
.doOnNext(item -> myStore.saveItem(item)) //write into the database OK
.concatMap(tab -> saveSubItems(item));
}
public Observable<Node> saveSubItems(Item item) {
return Observable.from(item.getSubItems())
.doOnNext(subItem -> myStore.saveSubItems(subItem)) //DEADLOCK thread is different
}
为什么突然所有的RxJava正在改变线程?即使我指定我希望他在我自己的调度程序中观察。我通过在saveSubItem之前添加另一个observeOn来做了一个肮脏的修复,但这可能不是正确的解决方案。
我知道,当你调用与改造web服务,响应被转发到一个新的线程(这就是为什么我创建了自己的调度找回在线程我开始我的SQL事务)。但是,我真的不明白RxJava如何管理线程。
非常感谢您的帮助。