2017-02-07 37 views
1

我一直遇到这种情况,我希望能够通过RxJava监听对请求的响应。问题是我不知道如何设置Observable,以便我正在侦听事件,并以正确的顺序在订阅上发送消息。我不想发送消息然后听,因为如果线程暂停或响应超快,我可能会错过它。这是我能想到上的最接近我自己收听并发送订阅RxJava

connection.onReceivedMessage() 
      .doOnSubscribe(() -> connection.send(message)) 
      .filter(message -> message.id == id) 
      ... // do stuff 

Observable.defer(() -> { 
    connection.send(message); 
    return connection.onReceivedMessage(); 
})... // do stuff 

但这些似乎仍像我仍然可以发送邮件,而不是被监听响应。有没有其他人试图做到这一点?我觉得我真的想要一种afterCreate()。

回答

1

我不想发送消息,然后听,因为如果线程 被暂停或响应速度超快,我可能会错过它。

使用SubjectBehaviorSubject(发出始终发射到新用户的最新可观测数据)或ReplySubject(发射发射到新用户的所有Observable)。我不知道整个逻辑,但你可以有这样的:

public BehaviorSubject mMessageBehaviorSubject = BehaviorSubject.create(); 


private void sendMessage() { 
    connection.onReceivedMessage() 
      .doOnSubscribe(() -> connection.send(message)) 
      .filter(message -> message.id == id) 
      .subscribe(mSubject::onNext, Throwable::printStackTrace); 
} 


public Observable<String> getMessageObservable() { 
    return mMessageBehaviorSubject.asObservable(); 
} 

这种方式,你可以发送邮件,只要你愿意听宇会得到,在这种情况下,最新的消息发送

0

doOnSubscribe正是你所需要的,它是你正在寻找的“afterCreate”。在第一个示例中,第一条消息将在订阅后发送,此时Observable已经准备好处理响应。

至于你的问题 - 有没有其他人试图做到这一点? - 答案是肯定的。我使用与第一个示例相同的技术来使某些代码变得粗糙。

+0

doOnSubscribe这样的作品?我将不得不进行更多的测试。我承认我从未做过单元测试来验证。 – Buttink