2017-07-06 75 views
2

我有一个函数,它应该返回一个BehaviorSubject。主题是想回的最新版本Profile如何创建订阅observable的BehaviorSubject?

(用户)Profile仅仅是包含三个成员引用一个POJO:
- 一个User
- 即用户MeasurementList
- 和Deadline

其中两个属性是通过改进调用获得的,其中一个属性已经存放在一个类变量中。

每当可观察者发出新的measurement listdeadline时,BehaviorSubject应发出一个新的更新的配置文件。

这里是什么应该发生 enter image description here

这是我迄今为止

public BehaviorSubject<Profile> observeProfile() { 

     if (profileBS == null) { 

      profileBS = BehaviorSubject.create(); 

      Observable o = Observable.combineLatest(
        Observable.just(userAccount), 
        observeMeasurements(), 
        observeDeadline(), 
        Profile::new 
      ); 


      profileBS.subscribeTo(o); //subscribeTo does not exist, but this is what I am trying to figure out how to do. 

     } 
     return profileBS; 
    } 

谁能帮助我正确地创建这个BehaviorSubject(希望有用)图?

谢谢。

回答

2

Subject实现Observer接口,这样你可以做以下

public BehaviorSubject<Profile> observeProfile() { 

     if (profileBS == null) { 

      profileBS = BehaviorSubject.create(); 

      Observable o = Observable.combineLatest(
        Observable.just(userAccount), 
        observeMeasurements(), 
        observeDeadline(), 
        Profile::new 
      ); 

      // Note the following returns a subscription. If you want to prevent leaks you need some way of unsubscribing. 
      o.subscribe(profileBS); 

     } 
     return profileBS; 
} 

请注意,您应该拿出处理所产生的认购方式。

+0

谢谢。答案很简单,但最初对我来说似乎有些违反直觉。 – Stephen

+0

@Stephen这个答案可以工作,但有一定的条件。正如我在我的回答中所说的,你太早订阅你的上游。所以你的下游将失去它的事件。 –

+0

@PhoenixWang我现在正在阅读你的答案。我会看看。我现在必须马上开始工作,但稍后会跟进。感谢您的高举。 – Stephen

0

我想我和你面临同样的问题。我想在提出答案之前先讲述一些历史。 所以我觉得很多人都看到了杰克华顿在Devoxx上的讲话:The State of Managing State with RxJava

所以他想出了一个基于一些变形金刚的建筑。但问题是,即使这些Transformer实例在ViewObservables之外生存。他们每次使用ViewObservable时都会创建新的Observable。这是因为运营商的概念。

因此,一个常见的解决方案是使用主题作为您的问题做的网关。但新问题是你太早订阅你的信息来源。

订阅操作应该在subscribeActual()中完成,这将由您的下游的subscribe()方法触发。但是你在中间订阅你的上游。你失去了你的事件。 通过解决这个问题我很困难,但从未找到解决方案。

但最近由于Davik的回答是:RxJava - .doAfterSubscribe()? 我想出了这样的事情:

public BehaviorSubject<Profile> observeProfile() { 
    public Observable resultObservable; 
    if (profileBS == null) { 

     profileBS = BehaviorSubject.create(); 
     //this source observable should save as a field as well 
     o = Observable.combineLatest(
       Observable.just(userAccount), 
       observeMeasurements(), 
       observeDeadline(), 
       Profile::new 
     ); 
     //return this observable instead of your subject 
     resultObservable = profileBS.mergeWith(
      Observable.empty() 
         .doOnComplete(() -> { 
          o.subscribe(profileBS); 
         })) 
    } return resultObservable; 
} 

的诀窍是。您使用此批准程序来创建像doAfterSubscribe这样的运营商。所以只有下游已经订阅你的主题。您的主题将订阅您的原始上游来源。

希望这可以帮助。并为我的英语不好而感到抱歉。

+0

'susbcribeActual'只有rxJava2我认为。 – JohnWowUs

+0

@JohnWowUs是的,但概念是相同的。通过触发'subscribe()'方法,Obseravble应该被认为是懒惰地初始化的。但是在这个方法中,Subject不管你的“主”订阅是否订阅你的主题,都订阅原始源,以便从源接收事件。这就是你失去活动的原因。 –