2016-02-11 70 views
0

我想收听多个传输编码响应的HTTP流,然后逐行读取消息,然后将消息推送到一个通道。然后我想从频道中读取并稍后通过websocket。HTTP分块流式传输到WebSocket

func subscribe(ws chan<- string, group string) (scanner *bufio.Scanner, err error){ 
    res, _ := req(STREAM_URL, channelTemplate(group)) 
    reader := bufio.NewScanner(res.Body) 
    return reader, reader.Err() 
} 

func main() { 
    ws := make(chan string) 
    request, _ := http.NewRequest("GET", URL, nil) 
    request.Header.Add("Content-Type", "application/json") 
    client := &http.Client{} 
    resp, _ := client.Do(request) 
    ac := ACResponse{} 
    json.NewDecoder(resp.Body).Decode(&ac) 
    resp.Body.Close() 
    var scanners = make([]*bufio.Scanner, 0) 
    for _, group := range ac.Groups { 
     fmt.Println("Started worker for", group) 
     //listen to all stream URLs 
     scanner, err := subscribe(ws, group) 
     if err != nil { 
      panic(err) 
     } 
     // keep track of Scanner to read later 
     scanners = append(scanners, scanner) 
    } 
    for { 
     select { 
     case msg := <-ws: 
      fmt.Println("[events] ", msg) 
     default: 
      randScanner := rand.Intn(len(ac.Groups)-1) 
      fmt.Println("Reading from", randScanner) 
      reader := scanners[randScanner] 
      reader.Scan() 
      if err := reader.Err(); err != nil { 
       panic(err) 
      } 
      text := reader.Text() 
      ws <- text 
     } 
    } 
} 

该程序阻止在reader.Scan()。输出是Reading from 1,没有别的。我看着wireshark,消息正在通过。

我怎样才能更好地使用Go来设计这个问题?

+0

标题中提到的websocket代码在哪里?为了调试这个,通过发送一个SIQUIT进程来打印goroutine栈。这可能会让你对程序停滞的地方有所了解。 –

+0

交叉发布在这里:https://groups.google.com/d/topic/golang-nuts/dQu1AU38F8Y/discussion – JimB

+0

尚未编写websocket代码,我只想确认它与stdout的工作。 – viperfx

回答

0

发送到无缓冲通道的主块ws。要解决此问题,更改ws到缓冲信道:

ws := make(chan string, 1) 

的第二个问题是,main()中后继续它们达到EOF阅读扫描器。问题出现在这些行上:

 reader.Scan() 
     if err := reader.Err(); err != nil { 
      panic(err) 
     } 
     text := reader.Text() 

Scan()在EOF处返回false,但忽略来自扫描的返回值。 Err()在EOF上返回nil。修改应用程序以使用Scan()中的返回值。

还有一个问题是读取任何一个扫描仪的主要块。以避免阻塞在单个连接上,启动一个的goroutine读取每个连接:

func subscribe(wg *sync.WaitGroup, ws chan<- string, group string) { 
    defer wg.Done() 
    res, err := req(STREAM_URL, channelTemplate(group)) 
    if err ! nil { 
     // handle error 
    } 
    defer resp.Body.Close() 
    reader := bufio.NewScanner(res.Body) 
    for reader.Scan() { 
     ws <- reader.Text() 
    } 
    if err := reader.Err(); err != nil { 
     // handle error 
    } 
} 

func main() { 
    ws := make(chan string) 
    request, _ := http.NewRequest("GET", URL, nil) 
    request.Header.Add("Content-Type", "application/json") 
    resp, err := http.DefaultClient.Do(request) 
    if err != nil { 
     // handle error 
    } 
    var ac ACResponse 
    if err := json.NewDecoder(resp.Body).Decode(&ac); err != nil { 
     // handle error 
    } 
    resp.Body.Close() 
    var wg sync.WaitGroup 
    for _, group := range ac.Groups { 
     wg.Add(1) 
     go subscribe(&wg, ws, group) 
    } 

    go func() { 
     wg.Wait() 
     close(ws) 
    }() 

    for msg := range ws { 
     fmt.Println("[events] ", msg) 
    } 
} 

上面的代码是未编译的和未经测试。我标记了需要错误处理的地方。在所有连接到达EOF后,我编写了代码以退出main。这可能是也可能不是你想要的应用程序。