2015-05-01 27 views
0

我有一个Node.js项目将消息从AWS SQS(消息队列)中消费。我想在同一个项目中启动多个消费者,以跟上放在队列中的消息,而不必创建另一个Node.js实例。有没有可能像Fibers或其他框架那样做?我基本上试图让接收消息多线程的行为(竞争消费者模式)。Node.js在一个项目中产生多个SQS监听器线程

下面是一个例子:

var AWS = require('aws-sdk'), 
    nconf = require('nconf'), 
    SQS_URL = process.env.SQS_SERVICES_EVENTS; 

AWS.config.update({accessKeyId: nconf.get("accessKeyId"), secretAccessKey: nconf.get("secretAccessKey")}); 
AWS.config.update({region: nconf.get("region")}); 

// Initialize SQS 
var sqs = new AWS.SQS(); 

// Params for SQS 
var MAX_NUM_MSGS = nconf.get("sqs.max.messages"); 
var params = { 
    QueueUrl: SQS_URL, 
    MaxNumberOfMessages: MAX_NUM_MSGS, 
    VisibilityTimeout: 30, 
    WaitTimeSeconds: 20 
}; 

exports.startSqsListener = setInterval(sqsListener, nconf.get("sqs.interval")); 

// Make multi-threaded 
function sqsListener() { 
    sqs.receiveMessage(params, function(err, data) { 
     if (err) { 
      logger.error(err, err.stack); 
     } 
     if (data.Messages) { 
      // do something with each message 
     } 
    }); 
} 

回答

0

如果你要处理更多的消息同时你可以写一个异步函数来实现的逻辑“做每个消息的东西”。只要消息不需要按顺序处理,节点就会自动排队每个异步执行并同时运行它们。

sqs.receiveMessage(params, function(err, data) { 
    if (err) { 
     logger.error(err, err.stack); 
    } 
    if (data.Messages) { 
     data.Messages.forEach(function(message) { 
      processMessage(message, function(err) { 
       //check for error, do something with it 
      }); 
     }); 
    } 
}); 

function processMessage(messageData, callback) { 
    //move your processing here 

    callback(); //or send error back, if needed 
} 
+0

不,我需要在比这更高的水平上做到这一点。到此为止,我已经收到了处理该部分的消息。我基本上想要设置多个接收器,因此调用导出'startSqsListener'的调用函数可以调用它'n'多次来启动将单独接收消息的工作人员。这是我试图实施的竞争消费者EAI模式。 – occasl

+1

如果你想运行更多的监听器函数,难道你不能简单地多次调用'startSqsListener'函数吗?每次调用都会执行'setTimeout'函数,生成另一个间隔执行的侦听器。 SQS将隐藏基于'VisibilityTimeout'的消息,因此您将在每个侦听器中收到不同批次的消息。 –

+0

我最终做的是让调用函数调用导出的函数'n'次。由于我在'setInterval'上有这个输出,它会持续运行寻找SQS消息。感谢您对此有所了解。 – occasl