我有一个脚本,它从数据库中选择一些数据并发送到一个通道,由多个goroutines处理,然后将结果发送回主线程以在数据库上更新。去渠道:如何使这个非阻塞?
但是,它将数据发送到第一个通道时挂起(可能阻塞)。
var chin = make(chan in)
var chout = make(chan out)
in
和out
均为structs
首先够程被启动:
for i:=0; i<5; i++ {
go worker()
}
加载信道的代码然后
的信道与全局创建的:
if verbose {
fmt.Println(`Getting nextbatch2 and sending to workers`)
}
rows, err = nextbatch2.Query()
if err != nil {
panic(err)
}
var numtodo int
for rows.Next() {
err = rows.Scan(&id, &data)
if err != nil {
rows.Close()
panic(err)
}
// Start
var vin in
vin.id = id
vin.data = data
chin <- vin
numtodo++
}
rows.Close()
然后后立即这样:
if verbose {
fmt.Println(`Processing out channel from workers`)
}
for res := range chout {
update5.Exec(res.data, res.id)
if numtodo--; numtodo == 0 {
break
}
}
,并在后台多个worker()
够程运行:打印Getting nextbatch2 and sending to workers
后
func worker() {
for res := range chin {
var v out
v.id = res.id
v.data = process(res.data)
chout <- v
}
}
此代码挂起。它永远不会到Processing out channel from workers
。所以它挂在rows.Next()
循环中的某个地方,对此我找不到原因,因为chin
通道应该是非阻塞的 - 即使worker()
goroutines没有处理它,它仍应该至少完成该循环。
任何想法?
编辑:
通过增加对fmt.Println(" on", numtodo)
在rows.Next()
循环结束时,我可以看到,它的块5之后,我不明白,因为它应该是非阻塞的,对不对?
编辑2:
通过改变信道到make(chan in/out, 100)
现在将阻塞后105
我不明白你的意思,它在此之前阻止。问题不在于接收器阻塞,接收器是为了阻止。问题是'chin'频道阻塞。而且,它们并不需要关闭,因为整个事情都是无限循环的,尽管你在这里看不到这些代码。 – Alasdair
您的'worker()'不能发送到'chout',因为没有goroutine读取'chout'。 –
它不仅仅是在队列中收集吗? – Alasdair