2017-05-09 82 views
1

我使用RxJavaRealm来查询移动数据库。当查询完成时,我需要通知我的视图来更新列表,但它看起来像flatMapdoOnNext有效,但是它永远不会完成。我需要一个触发器才能知道它已经结束。RxJava Observable永不执行onCompleted

Realm.getDefaultInstance().asObservable().flatMap((realm) -> 
     realm.where(FileDoc.class).isNull("parent").isNotNull("updatedDate").findAllSortedAsync("name").asObservable() 
       .flatMap((files) -> { 
        documents.get("root").clear(); 
        documents.get("root").addAll(realm.copyFromRealm(files)); 
        return realm.where(Folder.class).isNull("parent").findAllSortedAsync("name").asObservable().doOnCompleted(() -> { 
         Log.e(TAG, "Barr!"); 
        }); 
       }) 
       .doOnNext((folders) -> { 
        documents.get("root").addAll(realm.copyFromRealm(folders)); 
       })).doOnCompleted(() -> Log.e(TAG, "Fooo!")) 
     .doOnError(error -> handleErrorEvent(error)) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .subscribeOn(AndroidSchedulers.from(backgroundLooper)) 
     .subscribe(); 

这里Barr!,并且从不打印Fooo! ......任何有线索?


修订

Realm.getDefaultInstance().asObservable().first().flatMap((realm) -> 
       realm.where(IncidentTemplate.class).equalTo("deleted", false).findAllAsync().asObservable() 
         .filter(results -> results.isLoaded()) 
         .first() 
         .doOnNext((files) -> { 
          if (!incidents.containsKey("INCIDENT")) { 
           incidents.put("INCIDENT", new ArrayList<>()); 
          } 

          incidents.get("INCIDENT").clear(); 
          incidents.get("INCIDENT").addAll(realm.copyFromRealm(files)); 

         }) 
         .doOnTerminate(() -> { 
          Log.e(TAG, "Closing realm"); 
          realm.close(); 
         })) 
       .doOnCompleted(() -> { 
        emitStoreChange(new CobaltStore.CobaltStoreChangeEvent()); 
        Log.e(TAG, "EMIT INCIDENT"); 
       }) 
       .doOnError(error -> handleErrorEvent(error)) 
       .observeOn(AndroidSchedulers.mainThread()) 
       .subscribeOn(AndroidSchedulers.from(backgroundLooper)) 
       .subscribe(); 

回答

1

1)Realm.getDefaultInstance().asObservable()

您获得本地领域实例,你将永远无法关闭,因此,你正在泄漏内存。

2)这不是asObservable()设计的工作方式。

OnClickListener完成并发出终端事件是否有意义?

例如,您单击按钮一次,然后可观察的“结束”并且永远不会再次发出任何事件,即使用户之后单击该按钮?

当然不是,所以你不应该期待它来自Realm的RealmChangeListener;这是绑定到Realm或RealmResults的内容,以便您可以聆听将来对Realm或其特定表进行的任何写入。

解决方案可能是只使用类似

Observable.fromCallable(() => { 
    try(Realm realm = Realm.getDefaultInstance()) { 
     RealmResults<FileDoc> docs = realm.where(FileDoc.class) 
      .isNull("parent") 
      .isNotNull("updatedDate") 
      .findAllSorted("name"); 
     documents.get("root").clear(); // <-- shouldn't you modify a new copy and return this? 
     List<FileDoc> unmanagedDocs = realm.copyFromRealm(files); 
     documents.get("root").addAll(unmanagedDocs); 
     return unmanagedDocs; 
    } 
} 

或者正如你所指出,.filter((data) -> data.isLoaded()).first()也工作,虽然我没有想到这一点。

+1

您指出了一个好点......在我的情况下,我将它用作可执行的可观察对象,执行并结束......我不想担心无休止的Observable会因任何原因而停止(垃圾收藏家或清理内存在Android)我发现解决我的问题是调用.first()和.filter(data.isLoaded),我现在关闭领域onTerminated() – Jaythaking

+0

此外,我没有直接返回值主UI线程,相反,我有一个商店,作为一个单一的来源的真相和用户界面获取数据,每当有变化... – Jaythaking

+0

看到我更新的答案,我会尽量按照你的意见,并修改它留下来一个永远不会结束的observable,每次修改数据库都会发出 – Jaythaking