2017-09-13 15 views
0

我正在使用RxJs来侦听来自WebSocket的数据流。为了使用这个pubsub套接字,我需要首先在另一个套接字上进行身份验证,它将返回pubsub套接字url。如何使观察者等待验证后创建WebSocket

我现在只用一个用户就可以工作了。我需要添加另一个,但是我的当前方法存在问题,因为这两个订阅者都将触发身份验证并创建第二个pubsub套接字。

要说清楚的是,整个应用程序应该只有一个身份验证套接字和一个pubsub套接字。但是我需要订阅者在尝试使用pubsub套接字之前“等待”认证。否则,pubsub套接字将是未定义的(因为我们只知道它是运行时的url)。

这是我目前的尝试:

Websocket.service.ts

private connect(): Observable<IConnectionInfo> { 

    if (!this.authObservable) { 
     var credentials = { 
     "username": "bob", 
     "password": "slob" 
     } 

     this.authObservable = Observable.create((observer) => { 
     const socket = new WebSocket(this.authurl); 
     socket.onopen =() => { 
      console.log("Auth socket opened"); 
      socket.send(JSON.stringify(credentials)); 
     } 
     socket.onmessage = (event) => { 
      var connection_info = <IConnectionInfo>JSON.parse(event.data); 
      observer.next(connection_info); 
     } 

     return() => { 
      socket.close(); //invoked on unsubscribe 
      console.log("Auth socket closed"); 
     } 
     }) 
    } 
    return this.authObservable; 
    } 

public open_pubsub(events: string[]): Observable<IPubSubMessage> { 

    return this.connect() 
     .flatMap((connection_info: IConnectionInfo) => { 
     var url = "ws://" + this.hostName + ":" + connection_info.port + "/" + connection_info.ps; 

     var subscription = { 
      "subscribe": events 
     } 

     var authenticate_request = { 
      "authenticate": connection_info['token'] 
     } 

     if (!this.psObservable) { 
      this.psObservable = Observable.create((observer) => { 
      const socket = new WebSocket(url); 

      socket.onopen =() => { 
       console.log("PS socket opened"); 
       socket.send(JSON.stringify(authenticate_request)); 
       socket.send(JSON.stringify(subscription)); 
      } 
      socket.onmessage = (event) => { 
       var psmsg = <IPubSubMessage>JSON.parse(event.data); 
       observer.next(psmsg); 
      } 

      return() => { 
       socket.close(); //invoked on unsubscribe 
       console.log("PS socked closed"); 
      } 
      }) 
     } 
     return this.psObservable; 
     } 
    ); 
    } 

和观察者:

getSystemState(): Observable<string> { 

    return this._wsService.open_pubsub([MSG_SYSTEM_STATE_CHANGED]) 
     .map((response: IPubSubMessage): string => { 

       console.log(response.payload); 
       return "I wish this worked"; 
     }) 
     .catch(this.handleError); 
} 

任何帮助表示赞赏!

编辑基础上似乎已经被删除的答案,但真的是非常有用的,我修改了代码的问题。这种固定部分,但仍然导致多个插槽)

回答

0

发现了什么我错过了 - share()运算符的魔力。

所以对于这两种观测量,我这样做:

this.authObservable = Observable.create((observer) => { 
     ...(etc)... 
     }).share(); 

而且令人惊讶的,只是创建每个插座的1。