2017-10-09 45 views
0

我只有这个:如何使用RxJava获得来自Realm的物品列表的Observable?

public Observable<List<Movie>> getAll() { 
    return Observable.just(Movie.class) 
      .flatMap(t -> Observable.just(t) 
        .doOnSubscribe(disposable -> realm.executeTransaction(realm1 -> realm1.where(Movie.class).findAll())) 
        .onErrorResumeNext((ObservableSource<? extends Class<Movie>>) observer -> Observable.empty()) 
        .map(all -> realm.where(Movie.class).findAll()) 
      ); 
} 

但它看起来真的很丑))

一切会好起来的,如果有其中的可能性,以避免重复的代码,并保存realm.where(Movie.class).findAll ()并在map()中重用。 RealmResults方法addAll已弃用。

+1

我想你应该阅读[这](https://academy.realm.io/posts/creating-a-reactive-data-layer-with-realm-and-rxjava2/),并且检查[这](https://stackoverflow.com/a/44341905/6774854)回答。 – masp

+0

我不认为,这将工作,因为Observable.empty只会发射onComplete。所以你会得到一个完成的observable。 –

+0

@masp感谢链接我的答案,不幸的是,那边的提问者放弃了他的问题,所以我不能将其标记为重复。悲惨的人生... – EpicPandaForce

回答

0

也如描述here

private io.reactivex.Flowable<List<Movie>> getAll() { 
    return io.reactivex.Flowable.create(new FlowableOnSubscribe<List<Movie>>() { 
     @Override 
     public void subscribe(FlowableEmitter<List<Movie>> emitter) 
       throws Exception { 
      Realm realm = Realm.getDefaultInstance(); 
      RealmResults<Movie> results = realm.where(Movie.class).findAllAsync(); 
      final RealmChangeListener<RealmResults<Movie>> listener = _realm -> { 
       if(!emitter.isUnsubscribed() && results.isLoaded()) { 
        emitter.onNext(results); 
       } 
      }; 
      emitter.setDisposable(Disposables.fromRunnable(() -> { 
       results.removeChangeListener(listener); 
       realm.close(); 
      })); 
      results.addChangeListener(listener); 
     } 
    }, BackpressureStrategy.LATEST) 
    .subscribeOn(AndroidSchedulers.mainThread()) 
    .unsubscribeOn(AndroidSchedulers.mainThread()); 

但作为@masp说,评论,您可以阅读更多有关在设计使用my article about this on realm.io that was published a month ago与境界一RxJava2反应数据层。

因此,与境界4.0.0-RC1及以上的,实际上你可以只是做

private io.reactivex.Flowable<List<Movie>> getAll(Realm realm) { 
    if(realm.isAutoRefresh()) { 
     return realm.where(Movie.class) 
       .findAllAsync() 
       .asFlowable() 
       .filter(RealmResults::isLoaded); 
    } else { // for background threads 
     return Flowable.just(realm.where(Movie.class).findAll()); 
    } 
} 
0

您可以随时提取的另一种方法找到的电影被重用的代码。即使在功能性API中,我们也应该避免中断DRY。

public void code() { 
    public Observable<List<Movie>> getAll() { 
     return Observable.just(Movie.class) 
       .flatMap(t -> Observable.just(t) 
         .doOnSubscribe(disposable -> realm.executeTransaction(realm1 -> findMovie((Object) realm1))) 
         .onErrorResumeNext((ObservableSource<? extends Class<Movie>>) observer -> Observable.empty()) 
         .map(all -> findMovie((Object) all)) 
       ); 
    } 

} 

private Object findMovie(Object realm1) { 
    return realm1.where(Movie.class).findAll(); 
} 
相关问题