2016-10-27 78 views
0

我有一个奇怪的问题,我的回调从未发布,并且消息进入超时,即使该方法在队列中运行。这发生在一些特定的队列中,并且在发生一次之后,我无法从客户端发出任何其他请求,甚至之前的工作都会超时。必须重新启动客户端并重新启动,以使其再次运行。 这是代码,它发生的地方,我似乎不明白什么是错的。RabbitMQ请求,总是超时

Server.js文件,我在创建队列。我有几个这样的队列,这是其中之一。

var amqp = require('amqp'); 
var util = require('util'); 
var cnn = amqp.createConnection({host:'127.0.0.1'}); 
var getCart = require('./services/getCart'); 

cnn.on('ready', function() { 

    cnn.queue('getCart_queue', function(q){ 
      q.subscribe(function(message, headers, deliveryInfo, m){ 

       // util.log(util.format(deliveryInfo.routingKey, message)); 
       // util.log("Message: "+JSON.stringify(message)); 
       // util.log("DeliveryInfo: "+JSON.stringify(deliveryInfo)); 

       getCart.handle_request(message, function(err,res){ 
        cnn.publish(m.replyTo, res, { 
         contentType:'application/json', 
         contentEncoding:'utf-8', 
         correlationId:m.correlationId 
        }); 
       }); 
      }); 
     }); 
}); 

这里,手柄请求功能成功完成,但回调从来没有经历过去和它总是在另一端

var cart = require('../models/cart'); 

function handle_request(msg, callback) { 

    var user_id = msg.id; 
    cart 
     .find({id:user_id}) 
     .populate('users ads') 
     .exec(function(err, results){ 

         // This works, just the callback doesnt 

      if(!err){ 
       console.log(results); 
       callback(null, results); 
      } else { 
       console.log(err); 
       callback(err, null); 
      } 

     }); 

} 

exports.handle_request = handle_request; 

超时我这是怎么调用请求

var msg_payload = {"id":id}; 
    mq_client.make_request('getCart_queue', msg_payload, function(err, results){ 
     console.log(results); // never prints 
     //stuff that is never reached 
}); 

These are my rpc files,我不认为这些应该有什么问题,因为其他一些队列工作正常。 这是在客户端显示

GET /getCart - - ms - - 
Error: timeout 6ee0bd2a4b2ba1d8286e068b0f674d8f 
    at Timeout.<anonymous> (E:\Ebay_client\rpc\amqprpc.js:32:18) 
    at Timeout.ontimeout [as _onTimeout] (timers.js:341:34) 
    at tryOnTimeout (timers.js:232:11) 
    at Timer.listOnTimeout (timers.js:202:5) 

希望的信息一点也不含糊,如果你需要更多的,请让我知道错误。谢谢!

我认为错误出现在这个文件中,因为我尝试了调试,并且从rabbitmq服务器调用回调函数,并且它具有相关ID以及对变量的回复,所以请求没有被拾取这里。

var amqp = require('amqp') 
    , crypto = require('crypto'); 

var TIMEOUT=8000; 
var CONTENT_TYPE='application/json'; 
var CONTENT_ENCODING='utf-8'; 
var self; 

exports = module.exports = AmqpRpc; 

function AmqpRpc(connection){ 
    self = this; 
    this.connection = connection; 
    this.requests = {}; 
    this.response_queue = false; 
} 


AmqpRpc.prototype.makeRequest = function(queue_name, content, callback){ 

    self = this; 
    var correlationId = crypto.randomBytes(16).toString('hex'); 


    var tId = setTimeout(function(corr_id){ 

    callback(new Error("timeout " + corr_id)); 
    delete self.requests[corr_id]; 
    }, TIMEOUT, correlationId); 

    var entry = { 
    callback:callback, 
    timeout: tId 
    }; 

    self.requests[correlationId]=entry; 


    self.setupResponseQueue(function(){ 


    self.connection.publish(queue_name, content, { 
     correlationId:correlationId, 
     contentType:CONTENT_TYPE, 
     contentEncoding:CONTENT_ENCODING, 
     replyTo:self.response_queue}); 
    }); 
}; 


AmqpRpc.prototype.setupResponseQueue = function(next){ 

    if(this.response_queue) return next(); 

    self = this; 

    self.connection.queue('', {exclusive:true}, function(q){ 

    self.response_queue = q.name; 

    q.subscribe(function(message, headers, deliveryInfo, m){ 
     var correlationId = m.correlationId; 

     if(correlationId in self.requests){ 

     var entry = self.requests[correlationId]; 
     clearTimeout(entry.timeout); 
     delete self.requests[correlationId]; 

     entry.callback(null, message); 
     } 
    }); 
    return next();  
    }); 
}; 
+0

你应该表现出'make_request'方法的源代码。如果在问题中详细说明了与问题相关的所有代码,那么我们应该回答并提出历史建议。 – lao

+0

嘿,我发布了make请求文件,在rpc文件的github链接中。没有显示整个代码,因为它会变得混乱。 https://github.com/vansh007/Twitter-Clone-MEAN-stack/blob/master/Tw_Client/rpc/amqprpc.js –

回答

0

rabbitMQ没有问题,但我的查询处理请求和响应请求后。

对于其他人来说这个问题来了,检查,并仔细检查每一个语句,误差不会在控制台中显示,但将只显示超时

0

这是您的make_request()client.js文件中的代码:

var amqp = require('amqp'); 

var connection = amqp.createConnection({host:'127.0.0.1'}); 
var rpc = new (require('./amqprpc'))(connection); 


function make_request(queue_name, msg_payload, callback){ 

    rpc.makeRequest(queue_name, msg_payload, function(err, response){ 
     if(err) 
      console.error(err); 
     else{ 
      console.log("response", response); 
      callback(null, response); 
     } 

    }); 
} 

exports.make_request = make_request; 

看,当你有上rpc.makeRequest()的ERR会发生什么:

rpc.makeRequest(queue_name, msg_payload, function(err, response){ 
    if(err) 
     console.error(err); 
     // 
     //HERE: should be a callback call here. 
     // 
    else{ 
     console.log("response", response); 
     callback(null, response); 
    } 

}); 

这可能是为什么你超时。我希望它有帮助。

+0

超时错误在这里发布,添加回调处理错误,但情况是相同的。对我而言,问题在于找出错误来自哪里。任何想法在哪里寻找? –