2016-08-27 62 views
1

我正在使用RxSwift来包装移动应用程序的服务器同步过程。我有一个Observable<RemoteEvent>,它包装一个websocket连接并发出每个收到的消息Event。同样,我有一个包含API同步过程的Observable<SynchronizationResult>。一旦我的应用程序打开WebSocket连接,服务器将发送一个hello消息。收到该消息后,我想开始同步过程并缓存所有事件,直到同步完成。这是我挣扎的地方。目前,我有:缓冲区可观察直到另一个Observable成功完成

self.eventStreamService.observe(connection).scan((nil, [])) { (state, event) -> (Observable<RemoteEvent>?, [RemoteEvent]) in 
    guard event.type == "hello" else { 
    return (state.0?.concat(Observable.just(event)), state.1 + [event]) 
    } 

    // This is the sync operation 
    return (
    self.synchronizationService 
     .synchronize(ConnectionSynchronizationContext(connection: connection), lightweight: true) 
     .toArray() 
     .flatMap { results -> Observable<RemoteEvent> in 
     (state.1 + [event]).toObservable() 
     }, 
    [] 
) 
} 
.flatMapLatest { $0.0 ?? Observable.empty() } 

尽管这是非常丑陋的,它也有显著的错误:在同步Observable任何传入事件结果被重新订阅,然后重新启动整个同步过程。我确定必须有更好的方法来做到这一点。

+0

将 “你好” 是由WebSocket的所发出的第一个事件?如果没有,“hello”之前的任何事件是否会被缓冲,或者是否应该在“hello”/ sync-start之后缓冲任何事件?这个用例看起来很奇怪,但我会忽略它,并将其作为面值,因为我认为你已经将你的问题改为实际用例的简单版本。 – solidcell

+0

@solidcell假设“hello”是第一个事件是安全的。在此之前的任何事情(无论在实践中)都可以忽略。你是对的,这既是一个奇怪的用例,也是一个简化版本的问题。但最终,对于应用程序来说,这是防止竞争条件最简单的方法(例如,如果同步发生,然后websocket连接,则可能会错过事件;如果websocket连接并同步而没有缓冲区,则事件可能是在支持数据可用之前进行处理)。这个解决方案大多是暂时的,直到解决其他问题。 –

+0

这是怎么回事?:“如果同步发生,然后websocket连接”。如果websocket需要连接以便出现“hello”消息,以便我们尝试启动同步,那么在websocket连接之前可能会发生同步? – solidcell

回答

1

这里是你如何能得到你正在寻找的功能:

// this is a stub for the purpose of the example 
let interval = Observable<Int>.interval(1, scheduler: MainScheduler.instance) 
let websocketEvents = interval 
    .map { i -> String in 
     if i == 1 { 
      return "hello" 
     } else { 
      return String(i) 
     } 
    } 
    .replayAll() 

websocketEvents.connect() 

func performSync() -> Observable<Void> { 
    return Observable<Void>.create { o in 
     print("starting sync") 
     // actually start sync with server 
     // .... 
     delay(2.0) { 
      print("sync finished") 
      o.onNext(()) 
     } 
     return NopDisposable.instance 
    } 
} 

// websocket events as they come, regardless of sync status 
websocketEvents 
    .subscribeNext { e in 
     print("websocket event received: \(e)") 
    } 

// all websocket events, buffered and only emitted post-sync 
websocketEvents 
    .filter { $0 == "hello" } 
    .flatMapLatest { _ in performSync() } 
    .flatMapLatest { _ in websocketEvents } 
    .subscribeNext { e in 
     print("websocket event post sync: \(e)") 
    } 

这将输出:

的WebSocket事件好评:0
的WebSocket事件收到:你好
开始同步
收到的websocket事件:2
收到的websocket事件:3
同步完成
WebSocket的事件后同步:0
WebSocket的事件后同步:你好
WebSocket的事件后同步:2
WebSocket的事件后同步:3
的WebSocket事件接收:4
WebSocket的事件后同步:4收到
的WebSocket事件:5
的WebSocket事件后同步:5

+0

这真棒!一个问题 - 我正确地认为'replayAll'会继续保持从流中发出的所有项目,即使它们已经被回放了吗?这里有关于内存使用的担忧吗?我当前(非常黑客和破解)的方法确实对缓冲区施加了最大大小限制,并且如果在缓冲区之前无法完成同步(出于简洁目的,未在问题中列出),错误将从流中排除。 –

+0

是的,这绝对是值得关注的东西。有'replay(bufferCount:Int)'这可能会限制缓冲区大小。但是,如果可能的话,您应该考虑针对同步情况采用不同的方法,而不是缓冲元素。 – solidcell

+0

感谢您的澄清。至于同步,我玩过几种不同的方法。我最大的粉丝之一就是让服务器在最后x个时间内保存事件缓存。然后在连接上它只会丢弃你错过的所有东西,所以客户没有额外的工作要做。问题在于(取决于应用程序和事件类型),这会比创建API调用导致更多的数据使用。 –