2016-07-06 31 views
4

你好RxJava主人,RxJava更改线程后CONCAT地图

在我目前的Android项目,我遇到了一些问题僵局而与RxJava和SQLite播放。我的问题是:

  1. 我在一个线程
  2. 调用Web服务启动一个事务,并保存一些东西在数据库
  3. CONCAT映射另一个观察的功能
  4. 尝试写其他的东西在数据库上--->获得了僵局

这里是我的代码:

//define a scheduler for managing transaction in the same thread 
private Scheduler mScheduler = Schedulers.from(Executors.newSingleThreadExecutor()); 


Observable.just(null) 
      /* Go to known thread to open db transaction */ 
      .observeOn(mScheduler) 
      .doOnNext(o -> myStore.startTransaction()) 
      /* Do some treatments that change thread */ 
      .someWebServiceCallWithRetrofit() 
      /* Return to known thread to save items in db */ 
      .observeOn(mScheduler) 
      .flatMap(items -> saveItems(items)) 
      .subscribe(); 

public Observable<Node> saveItems(List<Item> items) { 
    Observable.from(items) 
      .doOnNext(item -> myStore.saveItem(item)) //write into the database OK 
      .concatMap(tab -> saveSubItems(item)); 
} 

public Observable<Node> saveSubItems(Item item) { 
    return Observable.from(item.getSubItems()) 
      .doOnNext(subItem -> myStore.saveSubItems(subItem)) //DEADLOCK thread is different 
} 

为什么突然所有的RxJava正在改变线程?即使我指定我希望他在我自己的调度程序中观察。我通过在saveSubItem之前添加另一个observeOn来做了一个肮脏的修复,但这可能不是正确的解决方案。

我知道,当你调用与改造web服务,响应被转发到一个新的线程(这就是为什么我创建了自己的调度找回在线程我开始我的SQL事务)。但是,我真的不明白RxJava如何管理线程。

非常感谢您的帮助。

回答

0

我所知doOnNext方法被称为在不同的线程,比以前的代码,因为它是从asynchroniously序列的其余部分运行。

示例:您可以执行多个其他呼叫,将其保存到数据库,并在doOnNext(...)内通知视图/演示者/控制者一个progres。您可以在保存到数据库或/和保存到数据库之前执行此操作。 我会建议你的是“flatMapping”代码。

所以saveItems方法是这样的(如果myStore.saveSubItems返回结果):

public Observable<Node> saveSubItems(Item item) { 
return Observable.from(item.getSubItems()) 
     .flatMap(subItem -> myStore.saveSubItems(subItem)) 
} 

使用“flatMapping”该操作在同一线程上与先前顺序运行的保证,该顺序继续再flaMap函数结束。

1

副作用运营商(一样flatMap)执行同步上任意线程调用它。尝试类似

Observable.just(null)    
      .doOnNext(o -> myStore.startTransaction()) 
      .subscribeOn(mScheduler)  // Go to known thread to open db transaction 
      /* Do some treatments that change thread */ 
      .someWebServiceCallWithRetrofit()      
      .flatMap(items -> saveItems(items)) 
      .subscribeOn(mScheduler) // Return to known thread to save items in db 
      .observeOn(mScheduler) // Irrelevant since we don't observe anything 
      .subscribe();