2017-08-24 55 views
0

我试图从队列(RabbitMQ)读取URL并进行有限数量的并发HTTP请求,即有10个工作人员对从队列接收的URL进行并发请求的池(永远)。限制处理来自RabbitMQ的消息时的并发性

到目前为止,我已经实现了一个以消费者为每RabbitMQ的教程: https://www.rabbitmq.com/tutorials/tutorial-one-go.html

,并尝试了一些从发现网络上的实例方法,在这里的例子结束: http://jmoiron.net/blog/limiting-concurrency-in-go/

不幸的是,我当前的代码运行了大约一分钟,然后无限期地冻结。我尝试过添加/移动例程,但似乎无法按预期工作(我对Go非常陌生)。

当前代码:

package main 

import (
    "fmt" 
    "log" 
    "net/http" 
    "time" 

    "github.com/Xide/bloom" 
    "github.com/streadway/amqp" 
) 

func failOnError(err error, msg string) { 
    if err != nil { 
     log.Fatalf("%s: %s", msg, err) 
     panic(fmt.Sprintf("%s: %s", msg, err)) 
    } 
} 

var netClient = &http.Client{ 
    Timeout: time.Second * 10, 
} 

func getRequest(url string) { 
    //resp, err := http.Get(string(url)) 
    resp, err := netClient.Get(string(url)) 
    if err != nil { 
     log.Printf("HTTP request error: %s", err) 
     return 
    } 
    fmt.Println("StatusCode:", resp.StatusCode) 
    fmt.Println(resp.Request.URL) 
} 

func main() { 
    bf := bloom.NewDefaultScalable(0.1) 

    conn, err := amqp.Dial("amqp://127.0.0.1:5672/") 
    failOnError(err, "Failed to connect to RabbitMQ") 
    defer conn.Close() 

    ch, err := conn.Channel() 
    failOnError(err, "Failed to open a channel") 
    defer ch.Close() 

    q, err := ch.QueueDeclare(
     "urls",   // name 
     true,    // durable 
     false,    // delete when unused 
     false,    // exclusive 
     false,    // no-wait 
     nil,    // arguments 
    ) 
    failOnError(err, "Failed to declare a queue") 

    err = ch.Qos(
     1,  // prefetch count 
     0,  // prefetch size 
     false, //global 
    ) 
    failOnError(err, "Failed to set Qos") 

    msgs, err := ch.Consume(
     q.Name, // queue 
     "",  // consumer 
     false, // auto-ack 
     false, // exclusive 
     false, // no-local 
     false, // no-wait 
     nil, // args 
    ) 
    failOnError(err, "Failed to register a consumer") 

    forever := make(chan bool) 

    concurrency := 10 
    sem := make(chan bool, concurrency) 
    go func() { 
     for d := range msgs { 
      sem <- true 
      url := string(d.Body) 
      if bf.Match(url) == false { 
       bf.Feed(url) 
       log.Printf("Not seen: %s", d.Body) 
       go func(url string) { 
        defer func() { <-sem }() 
        getRequest(url) 
       }(url) 
      } else { 
       log.Printf("Already seen: %s", d.Body) 
      } 
      d.Ack(false) 
     } 
     for i := 0; i < cap(sem); i++ { 
      sem <- true 
     } 
    }() 

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C") 
    <-forever 
} 
+0

可能你的日志输出增加的问题,这将有助于人们看到是怎么回事 – Lewis42

+0

试着用'-race'标志运行程序,它可以帮助您进行调试:https://开头博客。 golang.org/race-detector – Nebril

+0

如果将并发设置为10,它会使大约60个HTTP请求(逐渐变慢)然后冻结。用-race编译不提供任何信息。 – user3104123

回答

2

你没有正确地处理你的HTTP响应,导致一组开放连接的增长。试试这个:

func getRequest(url string) { 
    resp, err := netClient.Get(string(url)) 
    if err != nil { 
     log.Printf("HTTP request error: %s", err) 
     return 
    } 
    // Add this bit: 
    defer func() { 
     io.Copy(ioutil.Discard, resp.Body) 
     resp.Body.Close() 
    }() 
    fmt.Println("StatusCode:", resp.StatusCode) 
    fmt.Println(resp.Request.URL) 
} 

这一点,你从通道看完信息后,似乎是不必要的和潜在问题:

for i := 0; i < cap(sem); i++ { 
     sem <- true 
    } 

为什么填补sem通道你从队列中读取所有的消息后, ?您已将尽可能多的消息添加到您希望从中读取的消息中,因此最好是,如果您对其余代码进行了错误更改,则可能会导致问题。

完全无关的问题,但是这是多余的:

if err != nil { 
    log.Fatalf("%s: %s", msg, err) 
    panic(fmt.Sprintf("%s: %s", msg, err)) 
} 

the documentationFatalf已经存在,所以panic将永远不会被调用。如果你想记录和panic,请尝试log.Panicf,这是专门为此目的而设计的。

0

当您收到一条消息时,您将添加到sem,但只有在您没有看到url时从sem中删除。

因此,一旦你已经“看到”10个网址,你的应用就会锁定。 因此,您需要将<-sem添加到记录“已见过”的else语句中。

无论哪种方式,这是一种相当奇怪的方式来做这种并发。 这是一个更加习惯于on Play的版本。

注意,在这个版本中,我们只产生了10个听兔子频道的goroutines。

包主

进口( “FMT” “日志” “净/ HTTP” “时间”

"github.com/Xide/bloom" 
"github.com/streadway/amqp" 

FUNC failOnError(ERR出错,味精串){ 若ERR =零{ log.Fatalf(! “%S%S”,味精,ERR) } }

变种netClient = & http.Client { 超时:time.Second * 10, }

FUNC调用getRequest(URL串){// RESP,ERR:= http.Get(字符串(URL) ) RESP,ERR:!= netClient.Get(字符串(URL)) 如果ERR =零{ log.Printf( “HTTP请求错误:%s”,ERR) 返回 } resp.Body.Close( ) fmt.Println( “的StatusCode:”,resp.StatusCode) fmt.Println(resp.Request.URL) }

func main() { 
    bf := bloom.NewDefaultScalable(0.1) 

    conn, err := amqp.Dial("amqp://127.0.0.1:5672/") 
    failOnError(err, "Failed to connect to RabbitMQ") 
    defer conn.Close() 

    ch, err := conn.Channel() 
    failOnError(err, "Failed to open a channel") 
    defer ch.Close() 

    q, err := ch.QueueDeclare(
     "urls", // name 
     true, // durable 
     false, // delete when unused 
     false, // exclusive 
     false, // no-wait 
     nil, // arguments 
    ) 
    failOnError(err, "Failed to declare a queue") 

    err = ch.Qos(
     1,  // prefetch count 
     0,  // prefetch size 
     false, //global 
    ) 
    failOnError(err, "Failed to set Qos") 

    msgs, err := ch.Consume(
     q.Name, // queue 
     "",  // consumer 
     false, // auto-ack 
     false, // exclusive 
     false, // no-local 
     false, // no-wait 
     nil, // args 
    ) 
    failOnError(err, "Failed to register a consumer") 

    concurrency := 10 
    var wg sync.Waitgroup    // used to coordinate when they are done, ie: if rabbit conn was closed 
    for x := 0; x < concurrency; x++ { // spawn 10 goroutines, all reading from the rabbit channel 
     go func() { 
      defer wg.Done() // signal that this goroutine is done 
      for d := range msgs { 
       url := string(d.Body) 
       if bf.Match(url) == false { 
        bf.Feed(url) 
        log.Printf("Not seen: %s", d.Body) 
        getRequest(url) 
       } else { 
        log.Printf("Already seen: %s", d.Body) 
       } 
       d.Ack(false) 
      } 
      log.Println("msgs channel closed") 
     }() 
    } 

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C") 
    wg.Wait() // when all goroutine's exit, the app exits 
} 
+0

上面的例子与退出:'恐慌:同步:负WaitGroup counter' @更新的例子大卫-budworth – user3104123

+0

,我忽略与工人的数量(并发性)来初始化Waitgroup。我实际上无法运行应用程序,因为我没有提交任何项目,所以您可能需要调整一下。重点更多的是展示另一种方式,并解释为什么您的解决方案挂起。 –