2015-12-05 35 views
3

我目前试图通过map来尝试range而不是同步执行并发数据库请求,显然是因为速度提升。我怎样才能使用通道知道什么时候所有的循环启动的循环都完成

我的问题是我有这样的事情:

var mainthreads = make(chan *mainthread) 
var mainthreadsFetched = make(chan struct{}) 
for containerid := range containers { 
    go func() { 
     rows, err := db.Query("SELECT thread_id, belongs_to, thread_name, access_level FROM forum_mainthread WHERE belongs_to = ?", containerid) 
     defer rows.Close() 
     if err != nil { 
      log.Println(err) 
     } 
     for rows.Next() { 
      mainthread := &MainThread{} 
      err := rows.Scan(&mainthread.MainThreadID, &mainthread.BelongsTo, &mainthread.ThreadName, &mainthread.AccessLevel) 
      if err != nil { 
       log.Println(err) 
      } 
      mainthreads <- mainthread 
     } 
    }() 
    mainthreadsFetched <- struct{}{} 
} 

// Get all mainthreads 
<-mainthreadsFetched 
// Do other stuff after complete 

显然mainthreadsFetched <- struct{}{}被称为几乎是瞬间,因为循环完成的速度比你眨眼,我怎么能为每循环不会阻止新渠道每个新的goroutine从开始,而是让循环开始全部goroutines,然后在每个goroutine完成时发送通道。

回答

3

使用sync.WaitGroup是一个很好的解决方案,并且是一个通常使用的解决方案。

或者,您可以在mainthreadsFetchedlen(containers)次上接收,而不仅仅是1次。这将保证所有的例行程序都已完成。您需要将发送线路移动到去程序的末尾(或者更好,延迟)。

另外,由于containerid位于for循环中,因此其值发生变化。您需要将它作为参数传递给go例程关闭。

+0

OP明确表示他知道他可以使用sync.WaitGroup,但正在寻找使用渠道的替代方案,因此您根本没有真正回答他的问题。 – evanmcdonnal

+0

看到我的答案的第二段,我给他这样一个选择。我引用'sync.WaitGroup'是因为它是更好的解决方案,无论他是否想使用它。 – mjibson

+0

@mjibson我不想使用'sync.WaitGroup'的唯一原因是因为我有一些嵌套的数据库循环,我最终会得到所有关系数据库的东西,所以我不想要'wg.Add(1)'和'wg.Done()'嵌套了15遍我认为频道应该是更好的解决方案 – Datsik

0

所以我能想出这样做的最好方法是使用sync.WaitGroup,做这样的事情:

var wg sync.WaitGroup 
var mainThreadFetched = make(chan MainThread) 
for containerid := range containers { 
    wg.Add(1) 
    go func(containerid int64) { 
     rows, err := db.Query("SELECT thread_id, belongs_to, thread_name, access_level FROM forum_mainthread WHERE belongs_to = ?", containerid) 
     defer rows.Close() 
     if err != nil { 
      log.Println(err) 
     } 
     for rows.Next() { 
      mainthread := MainThread{} 
      err := rows.Scan(&mainthread.MainThreadID, &mainthread.BelongsTo, &mainthread.ThreadName, &mainthread.AccessLevel) 
      if err != nil { 
       log.Println(err) 
      } 
      mainThreadFetched <- mainthread 
     } 
     wg.Done() 
    }(containerid) 
} 

go func() { 
    wg.Wait() 
    close(mainThreadFetched) 
}() 

for mainthread := range mainThreadFetched { 
    containers[mainthread.BelongsTo].MainThreads = append(containers[mainthread.BelongsTo].MainThreads, mainthread) 
} 

// Do other stuff 

现在我可以从mainThreadFetched通道读取,然后在WaitGroup满足它将关闭允许循环结束并继续的通道

0

我没有看到你正在读取mainthreads的位置。如果它不是缓冲频道,则需要以某种方式解决这个问题。我将提供一些解决方案 - 没有一个比另一个更“正确” - 这取决于您的需求。

变体A 这是最简单的解决方案,但它假定你有一些其他的goroutine读mainthreads(其可能已经是的情况下)

var mainthreads = make(chan *mainthread) 
var mainthreadsFetched = make(chan struct{}) 
go somethingWhichReadsMainThreads() 
for containerid := range containers { 
    go func(containerid int) { 
     // build query omitted for brevity 
     for rows.Next() { 
      // omitted for brevity 
      mainthreads <- mainthread 
     } 
     mainthreadsFetched <- struct{}{} 
    }(containerid) 
} 

for i := 0; i < len(containers); i++ { 
    <-mainThreadsFetched 
} 
close(mainthreads) 
// Do other stuff after complete 

变式B 这一个用途select声明处理读取线程与完成通知分开,而不需要另一个goroutine。

var mainthreads = make(chan *mainthread) 
var mainthreadsFetched = make(chan struct{}) 
for containerid := range containers { 
    go func(containerid int) { 
     // build query omitted for brevity 
     for rows.Next() { 
      // omitted for brevity 
      mainthreads <- mainthread 
     } 
     mainthreadsFetched <- struct{}{} 
    }(containerid) 
} 

numComplete := 0 
readRunning := true 
for readRunning { 
    select { 
    case thread := <-mainthreads: 
     // do something with thread, like threads = append(threads, thread) 
    case <-mainthreadsFetched: 
     numFetched++ 
     if numFetched == len(containers) { 
      readRunning = False 
     } 
    } 
} 
// Do other stuff after complete 

变式C 这一个使用了不使用“零值”(零),用于传递的实际数据,因此可以使用以下事实:作为信号值,而不是一个单独的结构通道。它具有代码少得多的优点,但它确实感觉像远方的鬼怪行为。

var mainthreads = make(chan *mainthread) 
for containerid := range containers { 
    go func(containerid int) { 
     // build query omitted for brevity 
     for rows.Next() { 
      // omitted Scan for brevity 
      mainthreads <- mainthread 
     } 
     mainthreads <- nil // nil signals to us we are done 
    }(containerid) 
} 

numComplete := 0 
for thread := range mainthreads { 
    if thread != nil { 
     // do something with thread, like threads = append(threads, thread) 
    } else { 
     numFetched++ 
     if numFetched == len(containers) { 
      break 
     } 
    } 
} 
// Do other stuff after complete 
相关问题