2016-02-21 53 views
0

由于Go没有泛型,所有预制解决方案都使用我不太喜欢的类型转换。我也想自己实现它,并尝试下面的代码。但是,有时它不会等待所有的goroutines,我是否过早地关闭了工作渠道?我没有任何东西可以从他们那里获取。我可能也使用了一个伪输出通道,并等待从中获取确切的数量,但我相信下面的代码也可以工作。我错过了什么?在Go中实现工作人员池

func jobWorker(id int, jobs <-chan string, wg sync.WaitGroup) { 
    wg.Add(1) 
    defer wg.Done() 

    for job := range jobs { 
     item := ParseItem(job) 
     item.SaveItem() 
     MarkJobCompleted(item.ID) 
     log.Println("Saved", item.Title) 
    } 
} 

// ProcessJobs processes the jobs from the list and deletes them 
func ProcessJobs() { 

    jobs := make(chan string) 

    list := GetJobs() 
    // Start workers 
    var wg sync.WaitGroup 
    for w := 0; w < 10; w++ { 
     go jobWorker(w, jobs, wg) 
    } 

    for _, url := range list { 
     jobs <- url 
    } 

    close(jobs) 
    wg.Wait() 
} 

回答

2

在goroutine之外调用wg.Add并将指针传递给等待组。

如果从goroutine中调用Add,主goroutine有可能在goroutine有机会运行之前调用Wait。如果Add尚未被调用,那么Wait将立即返回。

传递指向goroutine的指针。否则,goroutines使用他们自己的等待组副本。

func jobWorker(id int, jobs <-chan string, wg *sync.WaitGroup) { 

    defer wg.Done() 

    for job := range jobs { 
     item := ParseItem(job) 
     item.SaveItem() 
     MarkJobCompleted(item.ID) 
     log.Println("Saved", item.Title) 
    } 
} 

// ProcessJobs processes the jobs from the list and deletes them 
func ProcessJobs() { 

    jobs := make(chan string) 

    list := GetJobs() 
    // Start workers 
    var wg sync.WaitGroup 
    for w := 0; w < 10; w++ { 
     wg.Add(1) 
     go jobWorker(w, jobs, &wg) 
    } 

    for _, url := range list { 
     jobs <- url 
    } 

    close(jobs) 
    wg.Wait() 
} 
+0

是的,你是绝对正确的,并且将'wg.Add(1)'分隔到那里实际上更有意义和更正确。 – Mustafa

+0

实际上,用'wg.Add(10)'调用它一次更有意义;) – fl0cke

1

您需要将指针传递给waitgroup,否则每个作业都会收到它自己的副本。

func jobWorker(id int, jobs <-chan string, wg *sync.WaitGroup) { 
    wg.Add(1) 
    defer wg.Done() 

    for job := range jobs { 
     item := ParseItem(job) 
     item.SaveItem() 
     MarkJobCompleted(item.ID) 
     log.Println("Saved", item.Title) 
    } 
} 

// ProcessJobs processes the jobs from the list and deletes them 
func ProcessJobs() { 

    jobs := make(chan string) 

    list := GetJobs() 
    // Start workers 
    var wg sync.WaitGroup 
    for w := 0; w < 10; w++ { 
     go jobWorker(w, jobs, &wg) 
    } 

    for _, url := range list { 
     jobs <- url 
    } 

    close(jobs) 
    wg.Wait() 
} 

看到这里的区别:without pointerwith pointer

+0

你是对的,非常愚蠢的错误! :) – Mustafa

相关问题