2017-08-02 44 views
1

有没有办法将我从SQS获得的消息传输到Dynamodb?我尝试使用CloudWatch每分钟触发一次Lambda函数。我愿意在AWS中使用任何其他服务来完成此任务。我确信有一个简单的解释,我只是俯视。 *编辑我的代码不起作用,我正在寻找修复我的代码或其他解决方案来完成此操作。将数据从SQS传输到Dynamodb

**编辑得到它的工作。

'use strict'; 
const AWS = require('aws-sdk'); 

const SQS = new AWS.SQS({ apiVersion: '2012-11-05' }); 
const Lambda = new AWS.Lambda({ apiVersion: '2015-03-31' }); 

const QUEUE_URL = 'SQS_URL'; 
const PROCESS_MESSAGE = 'process-message'; 
const DYNAMO_TABLE = 'TABLE_NAME'; 

function poll(functionName, callback) { 
    const params = { 
     QueueUrl: QUEUE_URL, 
     MaxNumberOfMessages: 10, 
     VisibilityTimeout: 10 
    }; 

    // batch request messages 
    SQS.receiveMessage(params, function(err, data) { 
     if (err) { 
      return callback(err); 
     } 

     // parse each message 
     data.Messages.forEach(parseSQSMessage); 
    }) 
    .promise() 
    .then(function(){ 
     return Lambda.invokeAsync({}) 
     .promise() 
     .then(function(data){ 
      console.log('Recursion'); 
     }) 
    } 
    ) 
    .then(function(){context.succeed()}).catch(function(err){context.fail(err, err.stack)}); 
} 

// send each event in message to dynamoDB. 
// remove message from queue 
function parseSQSMessage(msg, index, array) { 

    // delete SQS message 
    var params = { 
     QueueUrl: QUEUE_URL, 
     ReceiptHandle: msg.ReceiptHandle 
    }; 

    SQS.deleteMessage(params, function(err, data) { 
     if (err) console.log(err, err.stack); // an error occurred 
     else  console.log(data);   // successful response 
    }); 
} 

// store atomic event JSON directly to dynamoDB 
function storeEvent(event) { 
    var params = { 
     TableName : DYNAMO_TABLE, 
     Item: event 
    }; 

    var docClient = new AWS.DynamoDB.DocumentClient(); 

    docClient.put(params, function(err, data) { 
     if (err) console.log(err); 
     else console.log(data); 
    }); 
} 

exports.handler = (event, context, callback) => { 
    try { 
     // invoked by schedule 
     poll(context.functionName, callback); 
    } catch (err) { 
     callback(err); 
    } 
}; 
+0

相关GitHub的项目写在从SQS拉消息,并保存到DynamoDB节点AWS lambda表达式:https://github.com/leaflevellabs/aws-lambda-sqs-dynamodb。 – jarmod

+0

你的代码不工作吗?你是否收到错误信息?你的问题是什么 - 你问是否可能,或者你的代码为什么/不工作?随意编辑你的问题来澄清。 –

回答

0
var aws = require("aws-sdk"); 


// get configuration defaults from config file. 
var tableName = 'Table_Name'; 
var queueUrl = 'SQS_URL'; 

var dbClient = new aws.DynamoDB.DocumentClient(); 
var sqsClient = new aws.SQS(); 



// get config values from dynamodb - if the config values are found, then override existing values 
// this will occur on every execution of the lambda which will allow real time configuration changes. 
var updateConfig = function updateConfigValues(invokedFunction, cb) { 

    var params = { 
      TableName: "Table_NAME", 

     Key: { 
      "KEY": "KEY" 
     } 

    }; 

    dbClient.get(params, function(err, data) { 

     if(err) { 
      console.log("ERR_DYNAMODB_GET", err, params); 
     } 
     else if(!data || !data.Item) { 
      console.log("INFO_DYNAMODB_NOCONFIG", params); 
     } 
     else { 
      queueUrl = data.Item.config.queueUrl; 
      tableName = data.Item.config.tableName; 
     } 

     return cb(err); 
    }); 

}; 

// save the email to dynamodb using conditional write to ignore addresses already in the db 
var saveEmail = function saveEmail(messageBody, cb) { 


    var params = { 
     TableName:tableName, 
     Item:messageBody, 
     ConditionExpression : "attribute_not_exists(clickId)", 
    }; 

    dbClient.put(params, function(err, data) { 
     cb(err, data); 
    }); 
}; 

var deleteMessage = function deleteMessage(receiptHandle, cb) { 

    var params = { 
     QueueUrl: queueUrl, 
     ReceiptHandle: receiptHandle 
    }; 

    sqsClient.deleteMessage(params, function(err, data) { 
     cb(err, data); 
    }); 

} 

exports.handler = function(event, context) { 

    updateConfig(context.invokedFunctionArn, function(err) { 

     if(err) { 
      context.done(err); 
      return; 
     } 

     console.log("INFO_LAMBDA_EVENT", event); 
     console.log("INFO_LAMBDA_CONTEXT", context); 

     sqsClient.receiveMessage({MaxNumberOfMessages:10 , QueueUrl: queueUrl}, function(err, data) { 

      if(err) { 
       console.log("ERR_SQS_RECEIVEMESSAGE", err); 
       context.done(null); 
      } 
      else { 

       if (data && data.Messages) { 


        console.log("INFO_SQS_RESULT", " message received"); 

         var message = JSON.parse(data.Messages[0].Body); 

         var messageBody = message.Message; 

         messageBody = JSON.parse(messageBody); 




         // loops though the messages and replaces any empty strings with "N/A" 
         messageBody.forEach((item) => { 
          var item = item; 
          var custom = item.customVariables; 
          for (i = 0; i < custom.length; i++) { 
           if(custom[i] === ''){ 
            custom[i] = 'N/A'; 
           } 
           item.customVariables = custom; 
          } 
          for(variable in item) { 

           if(item[variable] === ""){ 
            item[variable] = "N/A"; 
            console.log(item); 
           } 
          } 

          var messageBody = item; 
         }); 
         var messageBody = messageBody[0]; 
         // Logs out the new messageBody 
         console.log("FIXED - ", messageBody); 

         // Checks for errors and delets from que after sent 
         saveEmail(messageBody, function(err, data) { 

          if (err && err.code && err.code === "ConditionalCheckFailedException") { 
           console.error("INFO_DYNAMODB_SAVE", messageBody + " already subscribed"); 
           deleteMessage(message.MessageId, function(err) { 
            if(!err) { 
             console.error("INFO_SQS_MESSAGE_DELETE", "receipt handle: " + message.MessageId, "successful"); 
            } else { 
             console.error("ERR_SQS_MESSAGE_DELETE", "receipt handle: " + message.MessageId, err); 
            } 
            context.done(err); 
           }); 

          } 
          else if (err) { 
           console.error("ERR_DYNAMODB_SAVE", "receipt handle: " + message.MessageId, err); 
           context.done(err); 
          } 
          else { 
           console.log("INFO_DYNAMODB_SAVE", "email_saved", "receipt handle: " + message.MessageId, messageBody.Message); 
           deleteMessage(message.MessageId, function(err) { 
            if(!err) { 
             console.error("INFO_SQS_MESSAGE_DELETE", "receipt handle: " + message.MessageId, "successful"); 
            } else { 
             console.error("ERR_SQS_MESSAGE_DELETE", "receipt handle: " + message.MessageId, err); 
            } 
            context.done(err); 
           }); 
          } 


         }); 
       } else { 
        console.log("INFO_SQS_RESULT", "0 messages received"); 
        context.done(null); 
       } 
      } 
     }); 
    }); 

}