2017-03-29 21 views
1

我正在尝试使用RxSwift创建聊天服务链。 基本上它应该做的:RxSwift - 当observable发生变化但只发出最后一个值时如何重试?

  • 检查聊天有ID,如果是去下一个,如果没有flatMap到createChat服务
  • 检查sockect连接已启用
  • 发送聊天消息

基本上下面的函数工作,但它现在不检查套接字连接状态。 它应该在第二个flatMap之前检查套接字是否连接,如果是,则应该使用messageUpload进行flatMap,如果不是,则应该等到连接返回true为止。 我有变量,告诉我关于当前连接状态(布尔)

chatSocketService.isSubscribedToChannel.asObservable() 

,但我无法弄清楚如何把它们放在一起。 我试着用第三个flatMap(在当前的最后一个之前)它不起作用。 接下来的问题是用户可以在连接返回前尝试发送很少的消息,所以每次他点击sendButton时,都会执行此方法,所以当连接返回时,它只应发送最后一条消息。 任何想法我可以如何处理与Rx?

func sendMessage(withBody body: String) { 
    guard !body.isEmpty else { return } 

    Observable.just(chatModel.value) 
     .filter({ $0.product != nil }) 
     .flatMap({ [unowned self] chatModel -> Observable<ChatModel> in 
      if chatModel.id != nil { 
       return Observable.just(chatModel) 
      } else { 
       return self.createChat(withProductModel: chatModel.product!) 
      } 
     }) 
     .flatMap({ [unowned self] chatModel -> Observable<ChatMessageModel> in 
      return self.chatService.uploadChatMessage(forChat: chatModel, withBody: body) 
        .trackActivity(self.progressHelper.activityIndicator) 
     }) 
     .subscribe(onNext: { [unowned self] chatMessageModel in 
      self.finishedSendingMessage.onNext(()) 
     }) 
     .addDisposableTo(disposeBag) 
} 

回答

2
.flatMap({ [unowned self] chatModel -> Observable<ChatMessageModel> in 
    let successfulConnect = chatSocketService.isSubscribedToChannel.asObservable() 
     .skipWhile { $0 == false } 
     .map { _ -> Void in } 
     .take(1) 

    return successfulConnect.flatMap { self.chatService.uploadChatMessage(forChat: chatModel, withBody: body) } 
     .trackActivity(self.progressHelper.activityIndicator) 
}) 

上述代码应在质询用于管理连接所述的行为。 successfulConnect是一个observable,它会在连接状态为true时发出一个值并完成。

skipWhile会忽略false值,map改变我们观察到从Observable<Bool>Observable<Void>take(1)确保第一个值后可观察到的完成。 flatMap然后在sucessfulConnect发送一个值时执行uploadChatMessage


您所描述的第二行为意味着,如果用户之前的一个已经被上传之前发送另一个消息取消先前上传。这可以通过处理订阅时进行处理sendMessage

var disposable: Disposable? 

func sendMessage(withBody body: String) { 
    disposable?.dispose() 
    disposable = Observable.just(chatModel.value) 
     // ... 
     .subscribe(onNext: { /* ... */ }) 
} 
+0

太棒了!这正是我所期待的! 谢谢Tomahh –

相关问题