2017-09-24 53 views
0

我在Node.js中遇到了RabbitMQ问题。 我试图实现一个Pub/Sub连接器,其中每个用户都有自己的队列轮询以获取消息。 当我通过Postma发布邮件并且用户使用邮件时,我没有任何问题(我正确地获取邮件),而如果用户试图使用队列中的邮件(但不存在邮件),它会一直卡住,直到发布了新的发布,但我无法获得该消息。 我想要做的是放弃消耗,然后重试。 你能帮我解决吗?工人阻止消费 - RabbitMQ - Node.js

get_queue工作正常,并获取用户的个人队列。

路线

app.post('/consume', (req, res) => { 
     project.get_queue(req, res) 
      .then(result1 => {return project.consume(req, res, result1.message);}) 
      .then(result2 => {res.status(result2.status).json({ message: result2.message });}) 
      .catch(err => res.status(err.status >= 100 && err.status < 600 ? err.status : 500).send({message: err.message})) 
}); 

控制器

exports.consume = function(req, res, user) { 
     return new Promise((resolve,reject) => { 
      const queue = user.queueName; 
      amqpConn.createChannel(function(err, ch) { 
       if(closeOnErr(err)) reject({ status: 500, message: 'Internal Server Error of Signup!' }); 
       ch.on("error", function(err){ 
        console.error("[AMQP] channel error", err.message); 
       }); 
       ch.on("close", function() { 
        console.log("[AMQP] channel closed"); 
       }); 

       ch.assertQueue(queue, {durable: true},function(err, _ok){ 
        if(closeOnErr(err)) reject({ status: 500, message: 'Internal Server Error of Signup!' }); 
        console.log("#Msg: ", _ok.messageCount) 
        if(_ok.messageCount === 0) { 
         reject({ status: 400, message: 'No consuming' }); 
        } 
        else { 
        var json = []; 

        var topic = ch.consume(queue, function(msg, err) { 
         work(msg, function(ok) { 
        try { 
         if (ok) { 
         ch.ack(msg); 
         json.push(JSON.parse(msg.content.toString())); 
         resolve({ status: 200, message: json}); 
          } 
         else { 
         ch.reject(msg, true); 
           resolve({ status: 200, message: json}); 
          } 
        } catch (e) { 
          reject({ status: 404, message: "niente da consumare"}); 
         closeOnErr(e); 
        }}); 
      }, {noAck: false}); 
     }) 
    }) 
}; 

这是我做的那么远。

exports.consume = function(req, res, user) { 
     return new Promise((resolve,reject) => { 
      const queue = user.queueName; 
      amqpConn.createChannel(function(err, ch) { 
       if(closeOnErr(err)) reject({ status: 500, message: 'Internal Server Error of Signup!' });; 
       ch.on("error", function(err){ 
        console.error("[AMQP] channel error", err.message); 
       }); 
       ch.on("close", function() { 
        console.log("[AMQP] channel closed"); 
       }); 

       var json = []; 
       iterate(ch, queue, json); 
      }) 
     }) 
    }; 

    function iterate(ch, queue, json) { 
    ch.get(queue, { 
     noAck: false 
    }, function (err, msg) { 
     if (!msg) return resolve({ 
      status: 200, 
      message: json 
     }); 
     work(msg, function (ok) { 
         console.log("Errore?"); 
        console.log("MSG consumed: ", msg.content.toString()); 
      try { 
       if (ok) { 
        ch.ack(msg); 
        json.push(JSON.parse(msg.content.toString())); 
        return iterate(ch, queue, json); 
       } 
           ch.reject(msg, true); 
       return resolve({ 
        status: 200, 
        message: json 
       }); 
      } catch (e) { 
       reject({ 
        status: 404, 
        message: "niente da consumare" 
       }); 
       closeOnErr(e); 
      } 
    }); 
})} 
+0

你忘了通过决定并拒绝迭代。因为,与我的代码相比,你从Promise的范围中删除了迭代,但忘记了通过决定并拒绝。这就是为什么解决未定义。 –

+0

新承诺:你能提供一个例子吗? – JackLametta

+0

当然,Promise中的解析函数基本上说明了它的成功。我将用您的代码的更正版本更新我的答案。 –

回答

1

使用ch.get于迭代:

exports.consume = function(req, res, user) { 
    return new Promise((resolve, reject) => { 
     const queue = user.queueName; 
     amqpConn.createChannel(function(err, ch) { 
      if(closeOnErr(err)) reject({ status: 500, message: 'Internal Server Error of Signup!' });; 
      ch.on("error", function(err){ 
       console.error("[AMQP] channel error", err.message); 
      }); 
      ch.on("close", function() { 
       console.log("[AMQP] channel closed"); 
      }); 

      var json = []; 
      return iterate(ch, queue, json, resolve, reject); 
     }); 
    }); 
}; 

function iterate(ch, queue, json, resolve, reject) { 
    ch.get(queue, { 
    noAck: false 
    }, function (err, msg) { 
    if (!msg) return resolve({ 
     status: 200, 
     message: json 
    }); 
    work(msg, function (ok) { 
     console.log("Errore?"); 
     console.log("MSG consumed: ", msg.content.toString()); 
     try { 
      if (ok) { 
       ch.ack(msg); 
       json.push(JSON.parse(msg.content.toString())); 
       return iterate(ch, queue, json, resolve, reject); 
      } 
      ch.reject(msg, true); 
      return resolve({ 
       status: 200, 
       message: json 
      }); 
     } catch (e) { 
      closeOnErr(e); 
      return reject({ 
       status: 404, 
       message: "niente da consumare" 
      }); 
     } 
    }); 
    }); 
} 

它将运行功能iterate(),如果msgfalse,它会解决,否则将处理消息,如果从work结果不好,它会拒绝并停止迭代,如果结果没问题,它会再次迭代;