我正在使用RxSwift来包装移动应用程序的服务器同步过程。我有一个Observable<RemoteEvent>
,它包装一个websocket连接并发出每个收到的消息Event
。同样,我有一个包含API同步过程的Observable<SynchronizationResult>
。一旦我的应用程序打开WebSocket连接,服务器将发送一个hello
消息。收到该消息后,我想开始同步过程并缓存所有事件,直到同步完成。这是我挣扎的地方。目前,我有:缓冲区可观察直到另一个Observable成功完成
self.eventStreamService.observe(connection).scan((nil, [])) { (state, event) -> (Observable<RemoteEvent>?, [RemoteEvent]) in
guard event.type == "hello" else {
return (state.0?.concat(Observable.just(event)), state.1 + [event])
}
// This is the sync operation
return (
self.synchronizationService
.synchronize(ConnectionSynchronizationContext(connection: connection), lightweight: true)
.toArray()
.flatMap { results -> Observable<RemoteEvent> in
(state.1 + [event]).toObservable()
},
[]
)
}
.flatMapLatest { $0.0 ?? Observable.empty() }
尽管这是非常丑陋的,它也有显著的错误:在同步Observable
任何传入事件结果被重新订阅,然后重新启动整个同步过程。我确定必须有更好的方法来做到这一点。
将 “你好” 是由WebSocket的所发出的第一个事件?如果没有,“hello”之前的任何事件是否会被缓冲,或者是否应该在“hello”/ sync-start之后缓冲任何事件?这个用例看起来很奇怪,但我会忽略它,并将其作为面值,因为我认为你已经将你的问题改为实际用例的简单版本。 – solidcell
@solidcell假设“hello”是第一个事件是安全的。在此之前的任何事情(无论在实践中)都可以忽略。你是对的,这既是一个奇怪的用例,也是一个简化版本的问题。但最终,对于应用程序来说,这是防止竞争条件最简单的方法(例如,如果同步发生,然后websocket连接,则可能会错过事件;如果websocket连接并同步而没有缓冲区,则事件可能是在支持数据可用之前进行处理)。这个解决方案大多是暂时的,直到解决其他问题。 –
这是怎么回事?:“如果同步发生,然后websocket连接”。如果websocket需要连接以便出现“hello”消息,以便我们尝试启动同步,那么在websocket连接之前可能会发生同步? – solidcell