2015-02-11 120 views
0

我有一个golang服务器做这样的事情: 包主要异步消息golang

func main() { 
    for { 
     c := listener.Accept() 
     go handle(c) 
    } 
} 

... 
func handle(c net.Conn) { 
    m := readMessage(c) // func(net.Conn)Message 
    r := processMessage(m) //func(Message)Result 
    sendResult(c, r)  // func(net.Conn,Result) 
} 

读取和写入同步消息。我现在需要的是通过一个给定的开放连接来异步发送消息,我知道一个通​​道可以被我使用。

这是我的想法:

... 
func someWhereElese(c chan Result) { 
    // generate a message and a result 
    r := createResultFromSomewhere() 
    c <- r // send the result through the channel 
} 

并修改我的把手使用同一通道,而不是

func handle(c net.Conn, rc chan Result) { 
    m := readMessage(c) // func(net.Conn)Message 
    r := processMessage(m) //func(Message)Result 
    //sendResult(c, r)  // func(net.Conn,Result) 
    rc <- r 
} 

这里就是我的困惑所在。

结果通道应创建,它应该有一个连接发送到哪里不管它接收

func doSend(c net.Con, rc chan Result) { 
    r := rc   // got a result channel 
    sendResult(c, r) // send it through the wire 
} 

但应该在哪里该通道产生的呢?在主循环中?

func main() { 
    ... 
    for { 
     c := l.Accept() 
     rc := make(chan Result) 
     go doSend(c, rc) 
    } 
} 

怎么样读?它应该进入它自己的频道/ gorutine吗? 如果我需要向n个客户进行广播,我是否应该保留一部分结果频道?一片连接?

我在这里很困惑,但我觉得我很接近。

+0

从这里开始:http://talks.golang.org/2012/concurrency.slide。你的问题的关键是使用'select'来观看一个传入的连接频道,一个响应频道和一个退出频道。根据您的预期负载,每个请求创建一个goroutine可能没有问题;或者您可能需要创建一个池。但是一旦你通过Go Concurrency Patterns,你会更好地理解正确的问题。另见http://blog.golang.org/advanced-go-concurrency-patterns,特别是最后的“相关文章”。理解'select'是伟大的“哦,这就是Go的工作方式!”时刻。 – 2015-02-11 20:54:19

+0

@RobNapier呃...我之前通过幻灯片阅读,我有点不明白。我会在一会儿再次通过他们。同时,我设法制作了一个小程序,如果您发现任何特别危险的事情,您能否对此发表评论? – OscarRyz 2015-02-11 22:04:50

+0

好的;我可能误解了你的意思是“异步”。我曾假设多个请求和响应会发生在同一个连接上(交错请求)。您似乎意味着您想在阅读器获取数据时传输数据。这可能比读取两个字节并在一个goroutine中写入两个字节更平行一些。你可能想看看'golang.org/x/text/transform'和'io.Copy()'。 (对不起,我对这里的实际代码没有更多帮助;这是一个有趣的问题,但我没有时间这一分钟来帮助广泛。) – 2015-02-11 22:55:26

回答

0

这个程序似乎解决我迫切的问题

package main 

import (
    "bytes" 
    "encoding/binary" 
    "log" 

    "net" 
) 

var rcs []chan int = make([]chan int,0) 


func main() { 
    a, e := net.ResolveTCPAddr("tcp", ":8082") 
    if e != nil { 
     log.Fatal(e) 
    } 
    l, e := net.ListenTCP("tcp", a) 
    for { 
     c, e := l.Accept() 
     if e != nil { 
      log.Fatal(e) 
     } 
     rc := make(chan int) 
     go read(c, rc) 
     go write(c, rc) 
     rcs = append(rcs, rc) 
     // simulate broacast 
     log.Println(len(rcs)) 
     if len(rcs) > 5 { 
      func() { 
       for _, v := range rcs { 
        log.Println("sending") 
        select { 
        case v <- 34: 
         log.Println("done sending") 
        default: 
         log.Println("didn't send") 
        } 
       } 
      }() 
     } 
    } 
} 
func read(c net.Conn, rc chan int) { 
    h := make([]byte, 2) 
    for { 
     _, err := c.Read(h) 
     if err != nil { 
      rc <- -1 
     } 
     var v int16 
     binary.Read(bytes.NewReader(h[:2]), binary.BigEndian, &v) 
     rc <- int(v) 
    } 
} 
func write(c net.Conn, rc chan int) { 
    for { 
     r := <-rc 
     o := []byte{byte(r * 2)} 
     c.Write(o) 
    } 
}