2013-07-18 110 views
9

我想写一个nodejs sqs队列处理器。Nodejs sqs队列处理器

"use strict"; 
var appConf = require('./config/appConf'); 
var AWS = require('aws-sdk'); 
AWS.config.loadFromPath('./config/aws_config.json'); 
var sqs = new AWS.SQS(); 
var exec = require('child_process').exec; 
function readMessage() { 
    sqs.receiveMessage({ 
    "QueueUrl": appConf.sqs_distribution_url, 
    "MaxNumberOfMessages": 1, 
    "VisibilityTimeout": 30, 
    "WaitTimeSeconds": 20 
    }, function (err, data) { 
    var sqs_message_body; 
    if (data.Messages) { 
     if (typeof data.Messages[0] !== 'undefined' && typeof data.Messages[0].Body !== 'undefined') { 
     //sqs msg body 
     sqs_message_body = JSON.parse(data.Messages[0].Body); 
     //make call to nodejs handler in codeigniter 
     exec('php '+ appConf.CI_FC_PATH +'/index.php nodejs_handler make_contentq_call "'+ sqs_message_body.contentq_cat_id+'" "'+sqs_message_body.cnhq_cat_id+'" "'+sqs_message_body.network_id+'"', 
      function (error, stdout, stderr) { 
      if (error) { 
       throw error; 
      } 
      console.log('stdout: ' + stdout); 
      if(stdout == 'Success'){ 
       //delete message from queue 
       sqs.deleteMessage({ 
       "QueueUrl" : appConf.sqs_distribution_url, 
       "ReceiptHandle" :data.Messages[0].ReceiptHandle 
       }); 
      } 
      }); 
     } 
    } 
    }); 
} 
readMessage(); 

上述代码适用于队列中的单个消息。我应该如何编写此脚本,以便它可以继续轮询队列中的邮件,直到处理完所有邮件?我应该使用设置超时吗?所有的

回答

15

首先,你应该definetely使用长轮询Amazon提供技术,据我所知,你已经在使用它,因为你必须在sqs.receiveMessage通话"WaitTimeSeconds": 20说法。我希望你不要忘记在AWS Web interface中配置它。

关于轮询消息 - 你可以使用不同的技术,包括定时器,但我认为最简单的将只是叫你readMessage()功能在receiveMessage的(甚至exec的)回调函数结束。因此,处理(或等待)队列中的下一个消息将在队列中的前一个消息的处理结束之后立即开始。

UPDATE:

至于我在新的代码版本有许多readMessage()电话。我认为最好将其最小化以使代码更清晰易于维护。但是,如果你离开了,例如,在主回调结束时唯一的一个调用,你将会收到大量并行运行的PHP工作者脚本 - 从性能的角度来看可能并不是那么糟 - 但是你将不得不添加一些复杂的脚本来控制并行工作者的数量。我认为您可以在exec回拨中减少一些呼叫,尝试加入if s并在主回叫中加入呼叫。

"use strict"; 
var appConf = require('./config/appConf'); 
var AWS = require('aws-sdk'); 
AWS.config.loadFromPath('./config/aws_config.json'); 
var delay = 20 * 1000; 
var sqs = new AWS.SQS(); 
var exec = require('child_process').exec; 
function readMessage() { 
    sqs.receiveMessage({ 
    "QueueUrl": appConf.sqs_distribution_url, 
    "MaxNumberOfMessages": 1, 
    "VisibilityTimeout": 30, 
    "WaitTimeSeconds": 20 
    }, function (err, data) { 
    var sqs_message_body; 
    if (data.Messages) 
     && (typeof data.Messages[0] !== 'undefined' && typeof data.Messages[0].Body !== 'undefined')) { 
     //sqs msg body 
     sqs_message_body = JSON.parse(data.Messages[0].Body); 
     //make call to nodejs handler in codeigniter 
     exec('php '+ appConf.CI_FC_PATH +'/index.php nodejs_handler make_contentq_call "'+ sqs_message_body.contentq_cat_id+'" "'+sqs_message_body.cnhq_cat_id+'" "'+sqs_message_body.network_id+'"', 
      function (error, stdout, stderr) { 
      if (error) { 
       // error handling 
      } 
      if(stdout == 'Success'){ 
       //delete message from queue 
       sqs.deleteMessage({ 
       "QueueUrl" : appConf.sqs_distribution_url, 
       "ReceiptHandle" :data.Messages[0].ReceiptHandle 
       }, function(err, data){     
       }); 
      } 
      readMessage();     
      }); 
     }   
    }   
    readMessage();   
    }); 
} 
readMessage(); 

关于内存泄漏:我认为你不应该担心,因为readMessage()下一次调用回调函数发生了 - 所以不能递归调用,并递归调用的函数只调用函数receiveMessage()后返回值父功能。

+0

您好,请查看我的这个要点。 https://gist.github.com/yalamber/374add88e887e688d818 – Yalamber

+0

另外我应该担心运行此脚本时有任何内存泄漏? – Yalamber

+0

@askkirati更新! – zavg

1

如果您正在使用节点,请使用https://www.npmjs.com/package/sqs-worker模块。它会为你做这项工作。

var SQSWorker = require('sqs-worker') 

var options = 
{ url: 'https://sqs.eu-west-1.amazonaws.com/001123456789/my-queue' 
} 

var queue = new SQSWorker(options, worker) 

function worker(notifi, done) { 
    var message; 
    try { 
    message = JSON.parse(notifi.Data) 
    } catch (err) { 
    throw err 
    } 

    // Do something with `message` 

    var success = true 

    // Call `done` when you are done processing a message. 
    // If everything went successfully and you don't want to see it any more, 
    // set the second parameter to `true`. 
    done(null, success) 
} 
+1

为什么倒票? https://github.com/BBC/sqs-consumer也是一种选择 – Cmag