2017-02-23 53 views
1

我们有一堆文件要处理后上传到远程blob存储。去队列处理失败后重试

目前,前端(PHP)创建了一个这样的文件的redis列表,并给它一个唯一的ID,称为JobID。然后它将这个唯一的ID传递给一个豆杆管,这个管通过一个Go进程接收。它使用名为Go workers的库来按照net/http所做的方式处理每个作业ID。它接收作业ID,检索redis列表并开始处理文件。

但是,目前一次只能处理一个文件。由于这里的操作是I/O绑定,而不是CPU绑定,直觉表明,使用每个文件的goroutine会有好处。

但是,我们希望在失败时重试上传,并跟踪每个作业处理的项目数量。我们无法启动一个未绑定的goroutine数量,因为一个Job可以包含大约10k个文件来处理,并且在高峰时间内每秒可以发送100个这样的作业。这将是什么正确的方法?

NB:我们可以改变技术,如果需要的堆栈中的位(如换掉的东西beanstalkd)

回答

2

您可以通过使用一个缓冲chan的大小够程的最大数量的限制够程的数量你要。如果达到最大容量,则可以阻止此chan。随着您的goroutines完成,他们将释放插槽以允许新goroutines运行。

例子:

package main 

import (
    "fmt" 
    "sync" 
) 

var (
    concurrent = 5 
    semaphoreChan = make(chan struct{}, concurrent) 
) 

func doWork(wg *sync.WaitGroup, item int) { 
    // block while full 
    semaphoreChan <- struct{}{} 

    go func() { 
     defer func() { 
      // read to release a slot 
      <-semaphoreChan 
      wg.Done() 
     }() 
     // This is where your work actually gets done 
     fmt.Println(item) 
    }() 
} 

func main() { 
    // we need this for the example so that we can block until all goroutines finish 
    var wg sync.WaitGroup 
    wg.Add(10) 

    // start the work 
    for i := 0; i < 10; i++ { 
     doWork(&wg, i) 
    } 

    // block until all work is done 
    wg.Wait() 
} 

去游乐场链接:https://play.golang.org/p/jDMYuCe7HV

本Golang英国会议的启发谈话:https://youtu.be/yeetIgNeIkc?t=1413

+0

它帮助我开始使用限制并发性。然而,现在仍然存在的问题是如何跟踪工作的成败。作业包含N个子任务,所有这些都必须成功处理,否则需要报告错误。我如何解决这个问题? – agathver

+0

创建一个您传递给goroutine的频道。该goroutine可以将操作的结果写入该通道,包括错误。调用者可以根据需要从该通道中获取信息来处理错误(例如记录错误或重试操作)。如果您需要重试该操作,请使用具有必要上下文的自定义结构类型来重试该通道(例如,goroutine需要再次尝试的输入)并出现错误。 – MahlerFive