2017-10-04 66 views
4

我有我正在刮的网址列表。我想要做的是将所有成功抓取的页面数据存储到一个通道中,当我完成时,将其转储到一个切片中。我不知道我会得到多少成功的提取,所以我不能指定一个固定的长度。我预计代码将达到wg.Wait(),然后等到所有的wg.Done()方法被调用,但我从未达到close(queue)声明。寻找一个类似的答案,我碰到这个来得如此回答为什么我的代码在goroutine中运行wg.Wait()时工作正常?

https://stackoverflow.com/a/31573574/5721702

在这里笔者有类似的功能:

ports := make(chan string) 
toScan := make(chan int) 
var wg sync.WaitGroup 

// make 100 workers for dialing 
for i := 0; i < 100; i++ { 
    wg.Add(1) 
    go func() { 
     defer wg.Done() 
     for p := range toScan { 
      ports <- worker(*host, p) 
     } 
    }() 
} 

// close our receiving ports channel once all workers are done 
go func() { 
    wg.Wait() 
    close(ports) 
}() 

只要我包裹着我的wg.Wait()的够程内,close(queue)达到:

urls := getListOfURLS() 
activities := make([]Activity, 0, limit) 
queue := make(chan Activity) 
for i, activityURL := range urls { 
    wg.Add(1) 
    go func(i int, url string) { 
     defer wg.Done() 
     activity, err := extractDetail(url) 
     if err != nil { 
      log.Println(err) 
      return 
     } 
     queue <- activity 
    }(i, activityURL) 
} 
    // calling it like this without the goroutine causes the execution to hang 
// wg.Wait() 
// close(queue) 

    // calling it like this successfully waits 
go func() { 
    wg.Wait() 
    close(queue) 
}() 
for a := range queue { 
    // block channel until valid url is added to queue 
    // once all are added, close it 
    activities = append(activities, a) 
} 

为什么这些代码并没有达到close如果我不使用股份公司oroutine for wg.Wait()?我认为所有的defer wg.Done()声明都会被调用,所以最终它会清除,因为它会到达wg.Wait()。它是否与在我的频道中接收值有关?

+0

你能展示整个代码吗? –

+0

默认情况下,通道是无缓冲的,这意味着如果有相应的接收准备好接收发送的值,它们将只接受发送。因此,当您将活动发送到队列时,没有相应的接收,并且它被阻止。所有例程被阻止,最后wg.wait会挂起。有一个概念叫做缓冲通道,可以用来缓冲多个输入。通过https://gobyexample.com/channel-buffering去浏览 –

回答

4

您需要等待goroutines完成一个单独的线程,因为queue需要读取。当你做到以下几点:

queue := make(chan Activity) 
for i, activityURL := range urls { 
    wg.Add(1) 
    go func(i int, url string) { 
     defer wg.Done() 
     activity, err := extractDetail(url) 
     if err != nil { 
      log.Println(err) 
      return 
     } 
     queue <- activity // nothing is reading data from queue. 
    }(i, activityURL) 
} 

wg.Wait() 
close(queue) 

for a := range queue { 
    activities = append(activities, a) 
} 

queue <- activity每个够程块,因为queue是无缓冲,并没有什么从中读取数据。这是因为queue上的范围循环在wg.Wait之后的主线程中。

wg.Wait只会在所有goroutine返回时解锁。但是如前所述,所有的goroutines在频道发送时都被阻止。

当您使用单独的goroutine等待时,代码执行实际上到达queue上的范围循环。

// wg.Wait does not block the main thread. 
go func() { 
    wg.Wait() 
    close(queue) 
}() 

这导致了够程在queue <- activity声明解封(主线程开始读取关闭queue)和运行直到完成。而后者又调用每个人wg.Done

一旦等待的例程通过wg.Wait,关闭queue,主线程退出范围循环。

+0

鉴于你的解释很有意义,但是只有在阅读之后,这让我很想知道,我写它的方式是写它的最好方法吗?我不打算按照这种方式工作,所以我担心代码有点复杂,而且不太易读。 –

+0

这几乎是如何处理这样的问题。有些人更喜欢完全依赖渠道,但等待组织没有任何问题。至于可读性,这个问题可能与理解代码流程有关。这是您在使用该语言时更熟悉的事情。 – abhink

1

queue通道没有缓冲,所以每个试图写入它的goroutine都会被阻塞,因为读取器进程尚未启动。所以没有goroutinte可以写,他们都挂起 - 结果wg.Wait永远等待。 尝试在一个单独的goroutine推出读者:

go func() { 
    for a := range queue { 
     // block channel until valid url is added to queue 
     // once all are added, close it 
     activities = append(activities, a) 
    } 
}() 

,然后开始服务员:

wg.Wait() 
close(queue) 

这样你就不能积累在通道中的所有数据和过载,但得到的数据,因为它来到目标切片。

+0

你的意思是先打电话给读者,然后在没有门厅的情况下给服务员打电话?我试过这个,它在我的slice中返回了3个结果,而不是预期的4.我怀疑它是在到达所有wg.Done()之后过早地关闭队列并且之前返回slice最后一项可以追加。 –

+0

是的。在这种情况下,您需要等到阅读器完成,否则最后的记录可能会丢失。 –

相关问题