2017-10-18 116 views
-1

我试图建立一个系统,工作池/ jobqueue,以尽可能多地在每个API端点上处理http requests。我看着这example,并得到它的工作很好,除了我偶然发现的问题,我不明白如何将pool/jobqueue扩大到不同的端点。Golang HTTP请求工作池

对于方案的缘故,让我们勾画有跨越不同的端点和请求类型一百万请求/ min的Golang http服务器GET & POST ETC.

我该如何扩展这个概念?我应该为每个端点创建不同的工作池和作业。或者我可以创建不同的作业并将它们输入到同一个队列中,并使用相同的池来处理这些作业?

我想保持简单性,如果我创建一个新的API端点,我不必创建新的工作池,所以我可以专注于API。但表现也非常重要。

我试图构建的代码取自之前链接的示例,here是其他人使用此代码的github'gist'。

+2

Go的http包为每个传入连接启动一个go例程。除非你在谈论后台工作处理,否则这似乎是浪费精力。 – squiguy

+0

是的,这是为了后台处理。有些人可能需要一段时间才能完成,我宁愿不让一个不受控制的goroutines宽松 –

+0

goroutines有什么问题?它们基本上是内置异步支持的jobqueue实现。 –

回答

0

不清楚为什么你需要工作人员池?会不会是足够的goroutines?

如果您受资源限制,可以考虑实施rates limiting。如果不是,为什么根本不需要跨越例程呢?

最好的学习方法是研究别人如何做好事。

看一看https://github.com/valyala/fasthttp

快速HTTP包围棋。调整为高性能。热路径中的零内存分配。比net/http快10倍。

他们都声称:

从每个物理服务器

这是相当可观的,我怀疑你可以做超过1.5M并发保持连接服务多达200K RPS用pool/jobqueue更好。

1

前面有一件事:如果您正在运行HTTP服务器(无论如何都是Go的标准服务器),则无法停止并重新启动服务器就无法控制goroutine的数量。每个请求至少启动一个goroutine,并且你无能为力。好消息是,这通常不是问题,因为goroutine非常轻巧。然而,你想保持正在努力工作的goroutines的数量是完全合理的。

您可以将任何值放入通道中,包括函数。因此,如果目标是只需要在http处理程序中编写代码,那么应该关闭这些工作 - 工作人员不知道(或关心)他们正在处理的是什么。

package main 

import (
    "encoding/json" 
    "io/ioutil" 
    "net/http" 
) 

var largePool chan func() 
var smallPool chan func() 

func main() { 
    // Start two different sized worker pools (e.g., for different workloads). 
    // Cancelation and graceful shutdown omited for brevity. 

    largePool = make(chan func(), 100) 
    smallPool = make(chan func(), 10) 

    for i := 0; i < 100; i++ { 
      go func() { 
        for f := range largePool { 
          f() 
        } 
      }() 
    } 

    for i := 0; i < 10; i++ { 
      go func() { 
        for f := range smallPool { 
          f() 
        } 
      }() 
    } 

    http.HandleFunc("/endpoint-1", handler1) 
    http.HandleFunc("/endpoint-2", handler2) // naming things is hard, okay? 

    http.ListenAndServe(":8080", nil) 
} 

func handler1(w http.ResponseWriter, r *http.Request) { 
    // Imagine a JSON body containing a URL that we are expected to fetch. 
    // Light work that doesn't consume many of *our* resources and can be done 
    // in bulk, so we put in in the large pool. 
    var job struct{ URL string } 

    if err := json.NewDecoder(r.Body).Decode(&job); err != nil { 
      http.Error(w, err.Error(), http.StatusBadRequest) 
      return 
    } 

    go func() { 
      largePool <- func() { 
        http.Get(job.URL) 
        // Do something with the response 
      } 
    }() 

    w.WriteHeader(http.StatusAccepted) 
} 

func handler2(w http.ResponseWriter, r *http.Request) { 
    // The request body is an image that we want to do some fancy processing 
    // on. That's hard work; we don't want to do too many of them at once, so 
    // so we put those jobs in the small pool. 

    b, err := ioutil.ReadAll(r.Body) 
    if err != nil { 
      http.Error(w, err.Error(), http.StatusInternalServerError) 
      return 
    } 

    go func() { 
      smallPool <- func() { 
        processImage(b) 
      } 
    }() 
    w.WriteHeader(http.StatusAccepted) 
} 

func processImage(b []byte) {} 

这是一个非常简单的例子来说明问题。设置工作池的方式并不重要。你只需要一个聪明的工作定义。在上面的例子中它是一个闭包,但是你也可以定义一个Job接口。现在

type Job interface { 
    Do() 
} 

var largePool chan Job 
var smallPool chan Job 

,我不会把整个工作池方法 “简单”。你说你的目标是限制goroutines(正在工作)的数量。这根本不需要工人;它只需要一个限制器。这和上面的例子是一样的,但是使用通道作为信号来限制并发。

package main 

import (
    "encoding/json" 
    "io/ioutil" 
    "net/http" 
) 

var largePool chan struct{} 
var smallPool chan struct{} 

func main() { 
    largePool = make(chan struct{}, 100) 
    smallPool = make(chan struct{}, 10) 

    http.HandleFunc("/endpoint-1", handler1) 
    http.HandleFunc("/endpoint-2", handler2) 

    http.ListenAndServe(":8080", nil) 
} 

func handler1(w http.ResponseWriter, r *http.Request) { 
    var job struct{ URL string } 

    if err := json.NewDecoder(r.Body).Decode(&job); err != nil { 
      http.Error(w, err.Error(), http.StatusBadRequest) 
      return 
    } 

    go func() { 
      // Block until there are fewer than cap(largePool) light-work 
      // goroutines running. 
      largePool <- struct{}{} 
      defer func() { <-largePool }() // Let everyone that we are done 

      http.Get(job.URL) 
    }() 

    w.WriteHeader(http.StatusAccepted) 
} 

func handler2(w http.ResponseWriter, r *http.Request) { 
    b, err := ioutil.ReadAll(r.Body) 
    if err != nil { 
      http.Error(w, err.Error(), http.StatusInternalServerError) 
      return 
    } 

    go func() { 
      // Block until there are fewer than cap(smallPool) hard-work 
      // goroutines running. 
      smallPool <- struct{}{} 
      defer func() { <-smallPool }() // Let everyone that we are done 

      processImage(b) 
    }() 

    w.WriteHeader(http.StatusAccepted) 
} 

func processImage(b []byte) {} 
0

正如以前在服务器中回答的那样,每个请求处理程序将至少在一个goroutine中运行。

但你仍然可以在必要时使用工人池后端并行任务。例如,让我们假设一些HttpHandler函数触发对其他外部API的调用并将它们的结果“汇总”在一起,因此在这种情况下调用的顺序并不重要,这是一种可以利用工作池并分发为了工作,让他们并行调度每个任务运行,以一个工人的goroutine:

的示例代码段:

// build empty response 
    capacity := config.GetIntProperty("defaultListCapacity") 
    list := model.NewResponseList(make([]model.Response, 0, capacity), 1, 1, 0) 

    // search providers 
    providers := getProvidersByCountry(country) 

    // create a slice of jobResult outputs 
    jobOutputs := make([]<-chan job.JobResult, 0) 

    // distribute work 
    for i := 0; i < len(providers); i++ { 
     job := search(providers[i], m) 
     if job != nil { 
      jobOutputs = append(jobOutputs, job.ReturnChannel) 
      // Push each job onto the queue. 
      GetInstance().JobQueue <- *job 
     } 
    } 

    // Consume the merged output from all jobs 
    out := job.Merge(jobOutputs...) 
    for r := range out { 
     if r.Error == nil { 
      mergeSearchResponse(list, r.Value.(*model.ResponseList)) 
     } 
    } 
    return list 

。职工池中运行的“通用”异步任务完成例如:https://github.com/guilhebl/go-offer/blob/master/offer/repo.go

。使用的工作者库lib:https://github.com/guilhebl/go-worker-pool