2017-08-03 62 views
1

我试图编写一个可以并行或按顺序执行功能的通用函数。在对它进行测试时,我发现了一些关于关闭的非常意想不到的行为。在下面的代码中,我定义了一个不接受参数并返回错误的函数列表。函数也在闭包中使用for循环变量,但我使用了在循环中定义一个新变量的技巧,以避免捕获。并行执行和顺序执行之间的关闭不一致

我期待着我可以按顺序或同时调用这些函数,但效果相同,但我看到了不同的结果。就好像封闭变量被捕获,但只有在同时运行时。

据我所知,这不是捕获循环变量的常见情况。正如我所提到的,我正在循环中定义一个新变量。另外,我没有在循环中运行闭包函数。我在循环中生成一个函数列表,但是我在循环之后执行这些函数。

我使用的是go版本go1.8.3 linux/amd64。

package closure_test 

import (
    "sync" 
    "testing" 
) 

// MergeErrors merges multiple channels of errors. 
// Based on https://blog.golang.org/pipelines. 
func MergeErrors(cs ...<-chan error) <-chan error { 
    var wg sync.WaitGroup 
    out := make(chan error) 

    // Start an output goroutine for each input channel in cs. output 
    // copies values from c to out until c is closed, then calls wg.Done. 
    output := func(c <-chan error) { 
     for n := range c { 
      out <- n 
     } 
     wg.Done() 
    } 
    wg.Add(len(cs)) 
    for _, c := range cs { 
     go output(c) 
    } 

    // Start a goroutine to close out once all the output goroutines are 
    // done. This must start after the wg.Add call. 
    go func() { 
     wg.Wait() 
     close(out) 
    }() 
    return out 
} 

// WaitForPipeline waits for results from all error channels. 
// It returns early on the first error. 
func WaitForPipeline(errs ...<-chan error) error { 
    errc := MergeErrors(errs...) 
    for err := range errc { 
     if err != nil { 
      return err 
     } 
    } 
    return nil 
} 

func RunInParallel(funcs ...func() error) error { 
    var errcList [](<-chan error) 
    for _, f := range funcs { 
     errc := make(chan error, 1) 
     errcList = append(errcList, errc) 
     go func() { 
      err := f() 
      if err != nil { 
       errc <- err 
      } 
      close(errc) 
     }() 
    } 
    return WaitForPipeline(errcList...) 
} 

func RunSequentially(funcs ...func() error) error { 
    for _, f := range funcs { 
     err := f() 
     if err != nil { 
      return err 
     } 
    } 
    return nil 
} 

func validateOutputChannel(t *testing.T, out chan int, n int) { 
    m := map[int]bool{} 
    for i := 0; i < n; i++ { 
     m[<-out] = true 
    } 
    if len(m) != n { 
     t.Errorf("Output channel has %v unique items; wanted %v", len(m), n) 
    } 
} 

// This fails because j is being captured. 
func TestClosure1sp(t *testing.T) { 
    n := 4 
    out := make(chan int, n*2) 
    var funcs [](func() error) 
    for i := 0; i < n; i++ { 
     j := i // define a new variable that has scope only inside the current loop iteration 
     t.Logf("outer i=%v, j=%v", i, j) 
     f := func() error { 
      t.Logf("inner i=%v, j=%v", i, j) 
      out <- j 
      return nil 
     } 
     funcs = append(funcs, f) 
    } 
    t.Logf("Running funcs sequentially") 
    if err := RunSequentially(funcs...); err != nil { 
     t.Fatal(err) 
    } 
    validateOutputChannel(t, out, n) 
    t.Logf("Running funcs in parallel") 
    if err := RunInParallel(funcs...); err != nil { 
     t.Fatal(err) 
    } 
    close(out) 
    validateOutputChannel(t, out, n) 
} 

以下是上述测试功能的输出。

closure_test.go:91: outer i=0, j=0 
closure_test.go:91: outer i=1, j=1 
closure_test.go:91: outer i=2, j=2 
closure_test.go:91: outer i=3, j=3 
closure_test.go:99: Running funcs sequentially 
closure_test.go:93: inner i=4, j=0 
closure_test.go:93: inner i=4, j=1 
closure_test.go:93: inner i=4, j=2 
closure_test.go:93: inner i=4, j=3 
closure_test.go:104: Running funcs in parallel 
closure_test.go:93: inner i=4, j=3 
closure_test.go:93: inner i=4, j=3 
closure_test.go:93: inner i=4, j=3 
closure_test.go:93: inner i=4, j=3 
closure_test.go:80: Output channel has 1 unique items; wanted 4 

任何想法?这是Go中的一个错误吗?

回答

4

始终使用-race运行测试。在你的情况,你忘了重新在每次迭代fRunInParallel

func RunInParallel(funcs ...func() error) error { 
    var errcList [](<-chan error) 
    for _, f := range funcs { 

     f := f // << HERE 

     errc := make(chan error, 1) 
     errcList = append(errcList, errc) 
     go func() { 
      err := f() 
      if err != nil { 
       errc <- err 
      } 
      close(errc) 
     }() 
    } 
    return WaitForPipeline(errcList...) 
} 

其结果是,你总是推出最后f,而不是每一个。

+0

整洁,我从来没有想过像这样遮蔽自己。 – RayfenWindspear

+0

感谢您使用“-race”的建议。它突出了你已经确定的问题。我怀疑这是一个循环捕获问题,但我专注于错误的循环! –

4

我相信你的问题在于你的RunInParallel func。

func RunInParallel(funcs ...func() error) error { 
    var errcList [](<-chan error) 
    for _, f := range funcs { 
     errc := make(chan error, 1) 
     errcList = append(errcList, errc) 
     go func() { 
      // This line probably isn't being reached until your range 
      // loop has completed, meaning f is the last func by the time 
      // each goroutine starts. If you capture f 
      // in another variable inside the range, you won't have this issue. 
      err := f() 
      if err != nil { 
       errc <- err 
      } 
      close(errc) 
     }() 
    } 
    return WaitForPipeline(errcList...) 
} 

您也可以将f作为参数传递给您的匿名函数以避免此问题。

for _, f := range funcs { 
    errc := make(chan error, 1) 
    errcList = append(errcList, errc) 
    go func(g func() error) { 
     err := g() 
     if err != nil { 
      errc <- err 
     } 
     close(errc) 
    }(f) 
} 

Here是在操场上的现场示例。