2016-11-25 23 views
2

我有一个简单的并发用例在去,它让我疯狂我找不出一个优雅的解决方案。任何帮助,将不胜感激。惯用的goroutine终止和错误处理

我想写一个方法fetchAll并行查询来自远程服务器的未指定数量的资源。如果任何提取失败,我想立即返回第一个错误。从阅读https://blog.golang.org/pipelines我可以创建一个信号通道清理其他线程https://play.golang.org/p/Be93J514R5

我知道:

我最初的,幼稚的做法,漏够程:

package main 

import (
    "fmt" 
    "math/rand" 
    "sync" 
    "time" 
) 

func fetchAll() error { 
    wg := sync.WaitGroup{} 
    errs := make(chan error) 
    leaks := make(map[int]struct{}) 
    defer fmt.Println("these goroutines leaked:", leaks) 

    // run all the http requests in parallel 
    for i := 0; i < 4; i++ { 
    leaks[i] = struct{}{} 
    wg.Add(1) 
    go func(i int) { 
     defer wg.Done() 
     defer delete(leaks, i) 

     // pretend this does an http request and returns an error 
     time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) 
     errs <- fmt.Errorf("goroutine %d's error returned", i) 
    }(i) 
    } 

    // wait until all the fetches are done and close the error 
    // channel so the loop below terminates 
    go func() { 
    wg.Wait() 
    close(errs) 
    }() 

    // return the first error 
    for err := range errs { 
    if err != nil { 
     return err 
    } 
    } 

    return nil 
} 

func main() { 
    fmt.Println(fetchAll()) 
} 

游乐场。或者,我可以使用context来完成它。但是,似乎这样一个简单的用例应该有一个我错过的更简单的解决方案。

回答

4

除了其中一个goroutines外,其他所有人都泄露了,因为他们仍在等待发送到errs频道 - 您从未完成清空它的范围。你也在泄漏那个工作是关闭错误通道的例程,因为等待组从未完成。

(同时,随着安迪指出,从地图中删除是不是线程安全的,这样会需要保护,以免遭受互斥。)

不过,我不认为地图,互斥waitgroups,上下文等等在这里甚至是必要的。我已经重写了整个事情只使用基本的渠道运营,类似如下:

package main 

import (
    "fmt" 
    "math/rand" 
    "time" 
) 

func fetchAll() error { 
    var N = 4 
    quit := make(chan bool) 
    errc := make(chan error) 
    done := make(chan error) 
    for i := 0; i < N; i++ { 
     go func(i int) { 
      // dummy fetch 
      time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) 
      err := error(nil) 
      if rand.Intn(2) == 0 { 
       err = fmt.Errorf("goroutine %d's error returned", i) 
      } 
      ch := done // we'll send to done if nil error and to errc otherwise 
      if err != nil { 
       ch = errc 
      } 
      select { 
      case ch <- err: 
       return 
      case <-quit: 
       return 
      } 
     }(i) 
    } 
    count := 0 
    for { 
     select { 
     case err := <-errc: 
      close(quit) 
      return err 
     case <-done: 
      count++ 
      if count == N { 
       return nil // got all N signals, so there was no error 
      } 
     } 
    } 
} 

func main() { 
    rand.Seed(time.Now().UnixNano()) 
    fmt.Println(fetchAll()) 
} 

游乐场链接:https://play.golang.org/p/mxGhSYYkOb

编辑:有确实是一个愚蠢的错误,谢谢指点出来。我修正了上面的代码(我认为...)。我还为增加的Realism™添加了一些随机性。

另外,我想强调的是真的有多种方法来解决这个问题,我的解决方案只是一种方法。最终归结为个人品味,但总体而言,您希望努力实现“惯用”代码 - 并朝着自然且易于理解的风格迈进。

+0

'ec:= chan error(nil)'很有趣,我之前没有看到过这种模式。我认为'select'原因是以随机顺序执行的。在'ec <-err'之前发送'done <-true'是否有竞争条件? – gerad

+0

很好,绝对有一场比赛!我写得很快,就像我提到的那样,没有测试它(你应该始终这样做)。幸运的是,修复这个错误只会让整个代码变得更简单,在这种情况下,不需要'chan error(nil)'技巧(当你想阻止select语句的发送时,这很有用,所以你不要不必写多个条件选择)。感谢您指出我的错误:) – Aedolon

+0

这可以进一步简化。你不需要单独完成和错误的渠道,还有其他一些事情不会改进。 https://play.golang.org/p/1a0ZXuy3Dz –

0

只要每个goroutine完成,你就不会泄漏任何东西。您应该创建缓冲区的错误通道,其缓冲区大小等于goroutine的数量,以便通道上的发送操作不会被阻止。每个goroutine总是应该在通道上发送一些信息,无论它是成功还是失败。然后底部的循环可以迭代goroutine的数量,如果得到非零错误则返回。您不需要WaitGroup或另一个关闭通道的goroutine。

我认为它看起来goroutines泄漏的原因是,当你得到第一个错误时你会返回,所以有些仍在运行。

顺便说一下,地图不是goroutine安全的。如果你在goroutines中共享一个地图,并且其中一些正在对地图进行更改,则需要使用互斥锁来保护它。

+0

我同意使用缓冲通道可以工作,但我试图避免这个解决方案,虽然提取数量不易事先知道(实际代码比示例更复杂)。 – gerad