我试图从队列(RabbitMQ)读取URL并进行有限数量的并发HTTP请求,即有10个工作人员对从队列接收的URL进行并发请求的池(永远)。限制处理来自RabbitMQ的消息时的并发性
到目前为止,我已经实现了一个以消费者为每RabbitMQ的教程: https://www.rabbitmq.com/tutorials/tutorial-one-go.html
,并尝试了一些从发现网络上的实例方法,在这里的例子结束: http://jmoiron.net/blog/limiting-concurrency-in-go/
不幸的是,我当前的代码运行了大约一分钟,然后无限期地冻结。我尝试过添加/移动例程,但似乎无法按预期工作(我对Go非常陌生)。
当前代码:
package main
import (
"fmt"
"log"
"net/http"
"time"
"github.com/Xide/bloom"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
var netClient = &http.Client{
Timeout: time.Second * 10,
}
func getRequest(url string) {
//resp, err := http.Get(string(url))
resp, err := netClient.Get(string(url))
if err != nil {
log.Printf("HTTP request error: %s", err)
return
}
fmt.Println("StatusCode:", resp.StatusCode)
fmt.Println(resp.Request.URL)
}
func main() {
bf := bloom.NewDefaultScalable(0.1)
conn, err := amqp.Dial("amqp://127.0.0.1:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"urls", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, //global
)
failOnError(err, "Failed to set Qos")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
concurrency := 10
sem := make(chan bool, concurrency)
go func() {
for d := range msgs {
sem <- true
url := string(d.Body)
if bf.Match(url) == false {
bf.Feed(url)
log.Printf("Not seen: %s", d.Body)
go func(url string) {
defer func() { <-sem }()
getRequest(url)
}(url)
} else {
log.Printf("Already seen: %s", d.Body)
}
d.Ack(false)
}
for i := 0; i < cap(sem); i++ {
sem <- true
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
可能你的日志输出增加的问题,这将有助于人们看到是怎么回事 – Lewis42
试着用'-race'标志运行程序,它可以帮助您进行调试:https://开头博客。 golang.org/race-detector – Nebril
如果将并发设置为10,它会使大约60个HTTP请求(逐渐变慢)然后冻结。用-race编译不提供任何信息。 – user3104123