2013-04-18 47 views
4

我在Go中做了一些流处理,并试图弄清楚如何在没有锁的情况下执行此操作。去并发切片访问

这个人为的例子显示了我面临的问题。

  • 我们一次得到一个thing
  • 有一个goroutine将它们缓冲到一个称为things的切片中。
  • thingslen(things) == 100然后以某种方式处理和复位
  • n一些需要访问things它的全
  • 获得了“不完整的” things从其它够程前并发够程是不可预测的。
  • 无论doSomethingWithPartial也不doSomethingWithComplete需求变异things

代码:

var m sync.Mutex 
var count int64 
things := make([]int64, 0, 100) 

// slices of data are constantly being generated and used 
go func() { 
    for { 
    m.Lock() 
    if len(things) == 100 { 
     // doSomethingWithComplete does not modify things 
     doSomethingWithComplete(things) 
     things = make([]int64, 0, 100) 
    } 
    things = append(things, count) 
    m.Unlock() 
    count++ 
    } 
}() 

// doSomethingWithPartial needs to access the things before they're ready 
for { 
    m.Lock() 
    // doSomethingWithPartial does not modify things 
    doSomethingWithPartial(things) 
    m.Unlock() 
} 
  1. 我知道片是不可变这是否意味着我可以删除互斥锁,并期望它仍工作(我假设不是)

  2. 我该如何重构这个使用通道而不是互斥量。

编辑:这是我想出了解决方案不使用互斥

package main 

import (
    "fmt" 
    "sync" 
    "time" 
) 

func Incrementor() chan int { 
    ch := make(chan int) 
    go func() { 
     count := 0 
     for { 
      ch <- count 
      count++ 
     } 
    }() 
    return ch 
} 

type Foo struct { 
    things []int 
    requests chan chan []int 
    stream chan int 
    C  chan []int 
} 

func NewFoo() *Foo { 
    foo := &Foo{ 
     things: make([]int, 0, 100), 
     requests: make(chan chan []int), 
     stream: Incrementor(), 
     C:  make(chan []int), 
    } 
    go foo.Launch() 
    return foo 
} 

func (f *Foo) Launch() { 
    for { 
     select { 
     case ch := <-f.requests: 
      ch <- f.things 
     case thing := <-f.stream: 
      if len(f.things) == 100 { 
       f.C <- f.things 
       f.things = make([]int, 0, 100) 
      } 
      f.things = append(f.things, thing) 
     } 
    } 
} 

func (f *Foo) Things() []int { 
    ch := make(chan []int) 
    f.requests <- ch 
    return <-ch 
} 

func main() { 

    foo := NewFoo() 

    var wg sync.WaitGroup 
    wg.Add(10) 

    for i := 0; i < 10; i++ { 
     go func(i int) { 
      time.Sleep(time.Millisecond * time.Duration(i) * 100) 
      things := foo.Things() 
      fmt.Println("got things:", len(things)) 
      wg.Done() 
     }(i) 
    } 

    go func() { 
     for _ = range foo.C { 
      // do something with things 
     } 
    }() 

    wg.Wait() 
} 
+1

切片不是不可变的。字符串是。 – fuz

+1

切片是可变的。字符串是不可变的。 – peterSO

+0

@FUZxxl切片指向的数组是可变的,但切片本身不是。 (AFAIK) –

回答

1

应该指出的是,“去的方式”可能只是使用互斥这个。研究如何使用通道来实现它是很有趣的,但互斥对于这个特定的问题可能更简单也更容易推理。