2017-02-16 29 views
0

我正在将AsyncTaskLoader迁移到RxJava,试图了解有关RxJava并发方法的所有细节。简单的事情都运行正常,但我用下面的代码挣扎:代码在主线程上运行,即使指定了subscribeOn

这是一个被执行顶层方法:

mCompositeDisposable.add(mDataRepository 
      .getStuff() 
      .subscribeOn(mSchedulerProvider.io()) 
      .subscribeWith(...) 

mDataRepository.getStuff()看起来是这样的:

public Observable<StuffResult> getStuff() { 
    return mDataManager 
      .listStuff() 
      .flatMap(stuff -> Observable.just(new StuffResult(stuff))) 
      .onErrorReturn(throwable -> new StuffResult(null)); 

和最终层:

public Observable<Stuff> listStuff() { 
     Log.d(TAG, ".listStuff() - "+Thread.currentThread().getName()); 
     String sql = <...>; 
     return mBriteDatabase.createQuery(Stuff.TABLE_NAME, sql).mapToList(mStuffMapper); 
} 
与上述代码

因此,log将打印出.listStuff() - main,这不正是我要找的。我不确定为什么。我的印象是,通过设置subscribeOn,从链中拉出的每个事件都将在subscribeOn方法中指定的线程上处理。

我认为正在发生的事情是,在达到mBriteDatabase之前,源-aka-final层代码不是来自RxJava世界,因此在调用createQuery之前不是事件。所以我可能需要某种包装?我试过申请.fromCallable,然而这对非处方代码的包装,和我的数据库层返回一个可观察......

回答

3

Log.d调用发生

    立即
  • listStuff被调用
  • 这是getStuff被调用后立即调用
  • 这是在您向我们展示的顶级代码片段中发生的第一件事。

如果您需要在订购发生这样做,你需要明确:

public Observable<Stuff> listStuff() { 
    String sql = <...>; 
    return mBriteDatabase.createQuery(Stuff.TABLE_NAME, sql) 
     .mapToList(mStuffMapper) 
     .doOnsubscribe(() -> Log.d(TAG, ".listStuff() - "+Thread.currentThread().getName())); 
} 
+0

谢谢!我感到困惑和过于复杂的事情非常简单。 – vkislicins

相关问题