2016-01-07 103 views
5

我试图通过将它们放入队列以稍后访问来限制函数的速率限制。在下面,我创建了一小部分请求,并且requestHandler函数以一定的速率处理每个请求。转:通过通道传递函数

我希望它接受具有不同类型参数的各种函数,因此接口{}类型。

我将如何通过通道传递函数并成功调用它们?

type request struct { 
    function interface{} 
    channel chan interface{} 
} 

var requestQueue []request 

func pushQueue(f interface{}, ch chan interface{}) { 
    req := request{ 
     f, 
     ch, 
    } 

    //push 
    requestQueue = append(requestQueue, req) 
} 

func requestHandler() { 
    for { 
     if len(requestQueue) > 0 { 
      //pop 
      req := requestQueue[len(requestQueue)-1] 
      requestQueue = requestQueue[:len(requestQueue)-1] 

      req.channel <- req.function 
     } 

     <-time.After(1200 * time.Millisecond) 
    } 
} 

这里是我想要实现(GetLeagueEntries(字符串,字符串)和GetSummonerName(INT,INT)是函数)的例子:

ch := make(chan interface{}) 
    pushQueue(l.GetLeagueEntries, ch) 
    pushQueue(l.GetSummonerName, ch) 

    leagues, _ := <-ch(string1, string2) 
    summoners, _ := <-ch(int1, int2) 
+1

你为什么要限制的函数调用的速度?限速访问实际有限的资源(如网络,磁盘等)可能更容易实现。另请参见https://godoc.org/golang.org/x/time/rate – kostya

+0

因为这些函数调用是与API绑定的外部库调用。我必须限制自己的功能。 – tyuo9980

回答

0

好吧,这里是codez:https://play.golang.org/p/XZvb_4BaJF

注意,它并不是完美的。你有一个每秒执行的队列。如果队列为空且添加了新项目,则新项目在执行前可能会等待几秒钟。

但是,这应该让你非常接近,你需要什么呢:)

该代码可以分成3部分:

  1. 速率的限制队列执行者,我称之为服务器(我在命名事物时很可怕) - 服务器对功能一无所知。它所做的只是启动一个永不停息的goroutine,它会弹出队列中最老的函数,每秒一次,并调用它。我上面提到的问题在代码的这一部分中,如果您愿意,我可以帮助您解决它。
  2. 按钮点击功能 - 这将显示每个按钮点击如何使用服务器调用3个diff功能(显然可以进行更多/更少的函数调用),并确保它们彼此相距1秒。你甚至可以给任何函数添加一个超时(假延迟),它们仍然会被调用1秒。这是唯一需要通道的地方,因为您希望尽可能快地完成所有函数调用(如果第一个函数需要5秒钟,您只需要等待1秒钟来调用第二个函数),然后等待它们完成,所以你需要知道他们什么时候完成。
  3. 按钮点击模拟(主要功能) - 这只是表明3按钮点击将按预期工作。你也可以把它们放在一个goroutine中来模拟3个用户同时点击这个按钮,它仍然可以工作。

    package main 
    
    import (
        "fmt" 
        "sync" 
        "time" 
    ) 
    
    const (
        requestFreq = time.Second 
    ) 
    
    type (
        // A single request 
        request func() 
    
        // The server that will hold a queue of requests and make them once a requestFreq 
        server struct { 
         // This will tick once per requestFreq 
         ticker  *time.Ticker 
    
         requests []request 
         // Mutex for working with the request slice 
         sync.RWMutex 
        } 
    ) 
    
    var (
        createServerOnce sync.Once 
        s *server 
    ) 
    
    func main() { 
        // Multiple button clicks: 
        ButtonClick() 
        ButtonClick() 
        ButtonClick() 
    
        fmt.Println("Done!") 
    } 
    
    
    
    
    
    
    // BUTTON LOGIC: 
    
    // Calls 3 functions and returns 3 diff values. 
    // Each function is called at least 1 second appart. 
    func ButtonClick() (val1 int, val2 string, val3 bool) { 
        iCh := make(chan int) 
        sCh := make(chan string) 
        bCh := make(chan bool) 
    
        go func(){ 
         Server().AppendRequest(func() { 
          t := time.Now() 
          fmt.Println("Calling func1 (time: " + t.Format("15:04:05") + ")") 
          // do some stuff 
          iCh <- 1 
         }) 
        }() 
        go func(){ 
         Server().AppendRequest(func() { 
          t := time.Now() 
          fmt.Println("Calling func2 (time: " + t.Format("15:04:05") + ")") 
          // do some stuff 
          sCh <- "Yo" 
         }) 
        }() 
        go func(){ 
         Server().AppendRequest(func() { 
          t := time.Now() 
          fmt.Println("Calling func3 (time: " + t.Format("15:04:05") + ")") 
          // do some stuff 
          bCh <- true 
         }) 
        }() 
    
        // Wait for all 3 calls to come back 
        for count := 0; count < 3; count++ { 
         select { 
         case val1 = <-iCh: 
         case val2 = <-sCh: 
         case val3 = <-bCh: 
         } 
        } 
    
        return 
    } 
    
    
    
    
    
    // SERVER LOGIC 
    
    // Factory function that will only create a single server 
    func Server() *server { 
        // Only one server for the entire application 
        createServerOnce.Do(func() { 
         s = &server{ticker: time.NewTicker(requestFreq), requests: []request{}} 
    
         // Start a thread to make requests. 
         go s.makeRequests() 
        }) 
        return s 
    } 
    func (s *server) makeRequests() { 
        if s == nil || s.ticker == nil { 
         return 
        } 
    
        // This will keep going once per each requestFreq 
        for _ = range s.ticker.C { 
    
         var r request 
    
         // You can't just access s.requests because you are in a goroutine 
         // here while someone could be adding new requests outside of the 
         // goroutine so you have to use locks. 
         s.Lock() 
         if len(s.requests) > 0 { 
          // We have a lock here, which blocks all other operations 
          // so just shift the first request out, save it and give 
          // the lock back before doing any work. 
          r = s.requests[0] 
          s.requests = s.requests[1:] 
         } 
         s.Unlock() 
    
         if r != nil { 
          // make the request! 
          r() 
         } 
        } 
    } 
    func (s *server) AppendRequest(r request) { 
        if s == nil { 
         return 
        } 
        s.Lock() 
        s.requests = append(s.requests, r) 
        s.Unlock() 
    } 
    
0

我本来以为很容易使用某种信号量或工作池。这样你就可以做任何事情的工人数量有限。也可能有多个工作人员池。

您是否需要这些调用中的任何一个是并发/异步的?如果没有,他们可以被调用,以便你可以配置睡眠(一个讨厌的黑客头脑)。

尝试一个工作者池或信号量,而不是一个功能。

+0

主要问题是找到一种方法来限制我的api调用,但我发现有一个办法来延迟这些并发调用的麻烦。 – tyuo9980

2

首先,我把它写成:

leagues := server.GetLeagueEntries() 
summoners := server.GetSummoners() 

而且,把速率限制到服务器。有一个限速库。

然而,也可以使用接口来统一请求,并使用FUNC类型,以允许封闭件(如在http.HandleFunc):

type Command interface { 
    Execute(server *Server) 
} 

type CommandFunc func(server *Server) 
func (fn CommandFunc) Execute(server *Server) { fn(server) } 

type GetLeagueEntries struct { Leagues []League } 

func (entries *GetLeagueEntries) Execute(server *Server) { 
    // ... 
} 

func GetSummonerName(id int, result *string) CommandFunc { 
    return CommandFunc(func(server *Server){ 
     *result = "hello" 
    }) 
} 

get := GetLeagueEnties{} 
requests <- &get 

requests <- CommandFunc(func(server *Server){ 
    // ... handle struff here 
}) 

当然,这需要一些同步。