2015-05-01 29 views
2

我有一个脚本,它从数据库中选择一些数据并发送到一个通道,由多个goroutines处理,然后将结果发送回主线程以在数据库上更新。去渠道:如何使这个非阻塞?

但是,它将数据发送到第一个通道时挂起(可能阻塞)。

var chin = make(chan in) 
var chout = make(chan out) 

inout均为structs

首先够程被启动:

for i:=0; i<5; i++ { 
    go worker() 
} 

加载信道的代码然后

的信道与全局创建的:

 if verbose { 
      fmt.Println(`Getting nextbatch2 and sending to workers`) 
     } 

     rows, err = nextbatch2.Query() 
     if err != nil { 
      panic(err) 
     } 

     var numtodo int 
     for rows.Next() { 
      err = rows.Scan(&id, &data) 
      if err != nil { 
       rows.Close() 
       panic(err) 
      } 

      // Start 
      var vin in 
      vin.id = id 
      vin.data = data 
      chin <- vin 
      numtodo++ 
     } 
     rows.Close() 

然后后立即这样:

 if verbose { 
      fmt.Println(`Processing out channel from workers`) 
     } 

     for res := range chout { 
      update5.Exec(res.data, res.id) 
      if numtodo--; numtodo == 0 { 
       break 
      } 
     } 

,并在后台多个worker()够程运行:打印Getting nextbatch2 and sending to workers

func worker() { 
     for res := range chin { 
      var v out 
      v.id = res.id 
      v.data = process(res.data) 
      chout <- v 
     } 
} 

此代码挂起。它永远不会到Processing out channel from workers。所以它挂在rows.Next()循环中的某个地方,对此我找不到原因,因为chin通道应该是非阻塞的 - 即使worker() goroutines没有处理它,它仍应该至少完成该循环。

任何想法?

编辑:

通过增加对fmt.Println(" on", numtodo)rows.Next()循环结束时,我可以看到,它的块5之后,我不明白,因为它应该是非阻塞的,对不对?

编辑2:

通过改变信道到make(chan in/out, 100)现在将阻塞后105

回答

3

接收机总是阻塞,直到有数据要接收。如果通道未经缓冲,则发送器阻塞,直到接收器收到该值。

https://golang.org/doc/effective_go.html#channels

所以你可以重写你的消费者的代码是这样的:

go func(){ 
    for res := range chout { 
     update5.Exec(res.data, res.id) 
    } 
}() 

还需要close(chin)close(chout)range语句的正确用法。

+0

我不明白你的意思,它在此之前阻止。问题不在于接收器阻塞,接收器是为了阻止。问题是'chin'频道阻塞。而且,它们并不需要关闭,因为整个事情都是无限循环的,尽管你在这里看不到这些代码。 – Alasdair

+1

您的'worker()'不能发送到'chout',因为没有goroutine读取'chout'。 –

+0

它不仅仅是在队列中收集吗? – Alasdair