2013-10-04 24 views
4

注 - 新手在Go。一个通道多路复用器

我写了一个多路复用器,应该合并通道数组的输出为一个。满意的建设性批评。

func Mux(channels []chan big.Int) chan big.Int { 
    // Count down as each channel closes. When hits zero - close ch. 
    n := len(channels) 
    // The channel to output to. 
    ch := make(chan big.Int, n) 

    // Make one go per channel. 
    for _, c := range channels { 
     go func() { 
      // Pump it. 
      for x := range c { 
       ch <- x 
      } 
      // It closed. 
      n -= 1 
      // Close output if all closed now. 
      if n == 0 { 
       close(ch) 
      } 
     }() 
    } 
    return ch 
} 

我与测试它:

func fromTo(f, t int) chan big.Int { 
    ch := make(chan big.Int) 

    go func() { 
     for i := f; i < t; i++ { 
      fmt.Println("Feed:", i) 
      ch <- *big.NewInt(int64(i)) 
     } 
     close(ch) 
    }() 
    return ch 
} 

func testMux() { 
    r := make([]chan big.Int, 10) 
    for i := 0; i < 10; i++ { 
     r[i] = fromTo(i*10, i*10+10) 
    } 
    all := Mux(r) 
    // Roll them out. 
    for l := range all { 
     fmt.Println(l) 
    } 
} 

,但我的输出是很奇怪:

Feed: 0 
Feed: 10 
Feed: 20 
Feed: 30 
Feed: 40 
Feed: 50 
Feed: 60 
Feed: 70 
Feed: 80 
Feed: 90 
Feed: 91 
Feed: 92 
Feed: 93 
Feed: 94 
Feed: 95 
Feed: 96 
Feed: 97 
Feed: 98 
Feed: 99 
{false [90]} 
{false [91]} 
{false [92]} 
{false [93]} 
{false [94]} 
{false [95]} 
{false [96]} 
{false [97]} 
{false [98]} 
{false [99]} 

所以我的问题:

  • 有什么事我在MUX中做错了吗?
  • 为什么我只能从我的输出通道获取最后10个?
  • 为什么喂食看起来很奇怪? (每个输入通道的第一个,所有最后一个通道,然后都没有)
  • 有没有更好的方法来做到这一点?

我所需要的所有输入通道中的具有相等权利至输出信道 - 即我不能有所有的输出从一个通道,然后将所有来自下一等


对于任何感兴趣 - 这是修复后的最终代码和正确的(可能)使用的sync.WaitGroup

import (
    "math/big" 
    "sync" 
) 

/* 
    Multiplex a number of channels into one. 
*/ 
func Mux(channels []chan big.Int) chan big.Int { 
    // Count down as each channel closes. When hits zero - close ch. 
    var wg sync.WaitGroup 
    wg.Add(len(channels)) 
    // The channel to output to. 
    ch := make(chan big.Int, len(channels)) 

    // Make one go per channel. 
    for _, c := range channels { 
     go func(c <-chan big.Int) { 
      // Pump it. 
      for x := range c { 
       ch <- x 
      } 
      // It closed. 
      wg.Done() 
     }(c) 
    } 
    // Close the channel when the pumping is finished. 
    go func() { 
     // Wait for everyone to be done. 
     wg.Wait() 
     // Close. 
     close(ch) 
    }() 
    return ch 
} 

回答

2

您的每一个够程从Mux催生结束了从同一渠道拉动,因为c获得更新循环的每个迭代–他们不只是捕获的价值c

for _, c := range channels { 
    go func(c <-chan big.Int) { 
     ... 
    }(c) 
} 

您可以测试这个修改here:如果您通过通道向够程,像这样你会得到预期的结果。

另一个可能的问题是你对n变量的处理:如果你正在运行GOMAXPROCS != 1,你可能有两个goroutines试图立即更新它。 sync.WaitGroup类型是等待goroutine完成的更安全的方式。

+0

谢谢 - 这完全解释了我的问题。结果会在任何体系结构中始终如一地享有所有渠道的平等权利? – OldCurmudgeon

+0

您是否问每个喂食'ch'的goroutine是否会公平计划?我不知道这是否定义。如果您需要特定的交叉结果,则可能需要更多。 –

+0

我担心在某些环境下,每个频道可能会耗尽以便在下一次查看前消耗殆尽。这必须避免。我不需要特定的顺序,但我需要在所有渠道之间保持公平的平衡。 – OldCurmudgeon

1

要使用range语句时被分配一个局部变量的基础上James Hentridge答案,一个惯用的方式来处理重新assignement问题股份价值:

for _, c := range channels { 
    c := c 
    go func() { 
    ... 
    }() 
}