2017-07-19 95 views
0

我在写下面的代码片段,从firebase数据库中获取保存的食物列表,然后使用该列表,我再次从firebase数据库获取单个食物详细信息。RxAndroid,如何检测是否可观察已完成发射

以下代码正常工作,除非我无法弄清楚如何让第二个flatMap知道第一个flatMap的发射已完成(所有食物列表已处理完毕)。所以我无法调用onCompleted()方法,因此无法检测整个进程何时结束。

看一看在下面的代码片段评论:

Observable.create<List<PersonalizedFood>> { 

      FirebaseDTDatabase.getSavedDietFoodQuery(user.uid).addListenerForSingleValueEvent(object : ValueEventListener { 
       override fun onCancelled(p0: DatabaseError?) { 

       } 

       override fun onDataChange(p0: DataSnapshot?) { 
        val list = ArrayList<PersonalizedFood>() 
        p0?.let { 
         for (dateObject in p0.children) { 
          for (foodItem in dateObject.children) { 
           val food = foodItem.getValue(FBPersonalizedFood::class.java) as FBPersonalizedFood 
           list.add(PersonalizedFood(food)) 
          } 
         } 
        } 
        it.onNext(list) 
        it.onCompleted() 
       } 
      }) 
     }.subscribeOn(Schedulers.io()).flatMap { 
      Observable.from(it) // returning a Observable that emits items of list ("it" is the list here) 
     }.observeOn(Schedulers.io()).flatMap { 
     // How does this flatMap know that emission of all item has been finished so that onCompleted() method could be called. 
      personalizedFood -> 

      Observable.create<Boolean>{ 
       FirebaseDTDatabase.getFoodListReference(personalizedFood.foodId).addListenerForSingleValueEvent(object :ValueEventListener{ 
        override fun onCancelled(p0: DatabaseError?) { 
         it.onError(p0?.toException()) 
        } 

        override fun onDataChange(p0: DataSnapshot?) { 
         if(p0 != null) { 
          val food = p0.getValue(FBFood::class.java)!! 
          val repo = LocalFoodRepository() 
          doAsync { 
           repo.insertFood([email protected], Food(food.foodId, food.foodName, food.foodDesc)) 
           repo.insertServingDetails([email protected], food.servingList.map { it.component2() }) 
           repo.saveFood([email protected], personalizedFood) 
           it.onNext(true) 
          } 

         }else { 
          it.onNext(false) 
         } 
        } 

       }) 
      } 
     }.observeOn(Schedulers.io()).doOnCompleted{ 
      dismissProgressDialog() 
      finish() 
     }.doOnError{ 
      it.printStackTrace() 
      dismissProgressDialog() 
      finish() 
     }.subscribe() 

感谢。

+0

是否使用fireabse:由于每个在flatMap创建仅发射一个项目的观测,就可以在onNext()方法后直接叫什么名字?有一些第三方RxFirebase可以为您提供Firebase数据库的很好包装。 –

+0

@Phoenix王这是另一回事。其实我正在学习如此想知道如何实现,而不使用任何第三方包装。我不喜欢使用第三方lib,如果这很容易做到这一点.. – chandil03

+0

嗯,其实你不需要。在第一个可观察到的情况下,您获取所有PersonalizedFood项目并将它们作为列表发出,然后完成流。然后将其转换为一组项目,每个项目都在第二个flatMap中处理。关键在于每个onCompleted都传递给流,所以第二个flatMap“知道”不会有更多的元素并完成自己。 我建议复习代码安全性和一致性,因为有些观点对我来说看起来不对。 – MightySeal

回答

2

flatMapObservable知道“什么时候所有物品已经完成”,当它发出的所有可观察物都称为onCompleted()。代码中的第二个flatMap从未调用onCompleted(),因为它创建的所有可观察对象都不会调用onCompleted()

您应该在onDataChange()方法中调用onCompleted()

override fun onDataChange(p0: DataSnapshot?) { 
    if(p0 != null) { 
     val food = p0.getValue(FBFood::class.java)!! 
     val repo = LocalFoodRepository() 
     doAsync { 
      repo.insertFood([email protected], Food(food.foodId, food.foodName, food.foodDesc)) 
      repo.insertServingDetails([email protected], food.servingList.map { it.component2() }) 
      repo.saveFood([email protected], personalizedFood) 
      it.onNext(true) 
      it.onCompleted() 
     } 
    } else { 
     it.onNext(false) 
     it.onCompleted() 
    } 
} 
+0

这不符合我的要求。当所有数据都被提取时,我想完成活动。这就是为什么我不能在'onDataChange()'上调用'onComplete'。 – chandil03

+0

@ chandil03这将是'doOnCompleted()'方法,不是吗?你可以在该方法中调用'finish()',因此添加'onComplete()'应该触发完成该活动。 – Bryan

+0

我知道,如果你仔细检查我的代码,我已经在'doOnCompleted()'中添加了'finish()'方法调用。如果我在你提示我的地方调用'onComplete()','doOnComplete方法将被调用的次数尽可能多地被observable'发出。因此'finish()'将被多次调用,因此会导致应用程序崩溃,因为活动将为空。 – chandil03

相关问题