2015-10-16 20 views
1

我正在使用SendGrid通过电子邮件接收文件。 SendGrid解析传入的电子邮件,并以多部分形式将文件发送到我设置的端点。如何在以多部分形式处理文件流之前等待字段

我不想让我的本地磁盘上的文件,所以我直接流到Amazon S3。这工作完美。

但是,在我可以流到S3之前,我需要掌握目标邮件地址,这样我才能制定出正确的s3文件夹。这是在表单帖子中的名为“to”的字段中发送的。不幸的是,这个字段有时会在文件到达后到达,因此我需要一种方法在我准备好接收流之前等待字段。

我以为我可以将onField封装在承诺中,并等待onFile中的to-field。但是这个概念似乎在字段到达文件后自动锁定它。

我是新来的摊位流和承诺。如果有人能告诉我如何做到这一点,我将非常感激。

这是不工作pseudoish代码:

function sendGridUpload(req, res, next) { 
    var busboy = new Busboy({ headers: req.headers }); 

    var awaitEmailAddress = new Promise(function(resolve, reject) { 
    busboy.on('field', function(fieldname, val, fieldnameTruncated, valTruncated) { 
     if(fieldname === 'to') { 
     resolve(val); 
     } else { 
     return; 
     } 
    }); 
    }); 


    busboy.on('file', function(fieldname, file, filename, encoding, mimetype) { 

    function findInbox(emailAddress) { 
     console.log('Got email address: ' + emailAddress); 

     ..find the inbox and generate an s3Key 
     return s3Key; 
    } 

    function saveFileStream(s3Key) { 
     ..pipe the file directly to S3 
    } 

    awaitEmailAddress.then(findInbox) 
    .then(saveFileStream) 
    .catch(function(err) { 
     log.error(err) 
    }); 
    }); 

    req.pipe(busboy); 
} 
+0

* facepalm *在重写的某个阶段,我设法重写onData事件。如果没有onData,则不会有数据流。重新启动onData事件后,它实际上似乎工作。但我会真正感兴趣的人更有知识的人的意见。这是一个好的解决方案吗? – Michael

+0

另外..通过这个解决方案,我正在装载流中的内部缓冲区,直到我得到电子邮件地址并将流释放到S3。经过一些测试后,似乎是100k到几MB的情侣。内部流缓冲区能够增长多少而不会让我陷入困境? – Michael

回答

0

我终于得到了这个工作。解决方案不是很漂亮,而且我实际上转向了另一个概念(在帖子结尾处描述)。

缓冲输入数据,直到“到”字段到达我用@samcday使用流缓冲区。当我掌握到现场时,我将可读流释放到排列成数据的管道。

这里是代码(有些部分省略,但基本部分在那里)。

var streamBuffers = require('stream-buffers'); 

function postInboundMail(req, res, next) { 
    var busboy = new Busboy({ headers: req.headers}); 

    //Sometimes the fields arrives after the files are streamed. 
    //We need the "to"-field before we are ready for the files 
    //Therefore the onField is wrapped in a promise which gets 
    //resolved when the to field arrives 
    var awaitEmailAddress = new Promise(function(resolve, reject) { 
    busboy.on('field', function(fieldname, val, fieldnameTruncated, valTruncated) { 
     var emailAddress; 

     if(fieldname === 'to') { 
     try { 
      emailAddress = emailRegexp.exec(val)[1] 
      resolve(emailAddress) 
     } catch(err) { 
      return reject(err);   
     } 
     } else { 
     return; 
     } 
    }); 
    }); 


    busboy.on('file', function(fieldname, file, filename, encoding, mimetype) { 
    var inbox; 

    //I'm using readableStreamBuffer to accumulate the data before 
    //I get the email field so I can send the stream through to S3 
    var readBuf = new streamBuffers.ReadableStreamBuffer(); 

    //I have to pause readBuf immediately. Otherwise stream-buffers starts 
    //sending as soon as I put data in in with put(). 
    readBuf.pause(); 

    function getInbox(emailAddress) { 
     return model.inbox.findOne({email: emailAddress}) 
     .then(function(result) { 
     if(!result) return Promise.reject(new Error(`Inbox not found for ${emailAddress}`)) 

     inbox = result; 
     return Promise.resolve(); 
     }); 
    } 

    function saveFileStream() { 
     console.log('=========== starting stream to S3 ========= ' + filename) 

     //Have to resume readBuf since we paused it before 
     readBuf.resume(); 

     //file.save will approximately do the following: 
     // readBuf.pipe(gzip).pipe(encrypt).pipe(S3) 
     return model.file.save({ 
     inbox: inbox, 
     fileStream: readBuf 
     }); 
    } 

    awaitEmailAddress.then(getInbox) 
    .then(saveFileStream) 
    .catch(function(err) { 
     log.error(err) 
    }); 


    file.on('data', function(data) { 
     //Fill readBuf with data as it arrives 
     readBuf.put(data); 
    }); 

    file.on('end', function() { 
     //This was the only way I found to get the S3 streaming finished. 
     //Destroysoon will let the pipes finish the reading bot no more writes are allowed 
     readBuf.destroySoon() 
    }); 
    }); 


    busboy.on('finish', function() { 
    res.writeHead(202, { Connection: 'close', Location: '/' }); 
    res.end(); 
    }); 

    req.pipe(busboy); 
} 

我真的很喜欢这个解决方案的反馈,即使我没有使用它。我有一种感觉,这可以做得更简单和优雅。

新的解决方案: 而不是等待到现场我直接发送流到S3。我想,在输入流和S3保存之间放置的东西越多,由于代码中的错误,丢失传入文件的风险就越高。 (SendGrid最终将重新发送该文件,如果我不跟200响应,但它需要一定的时间。)

这是我要做的事:

  1. 保存在数据库中的文件的占位符
  2. 管道流至S3
  3. 更新更多信息占位符到达

该解决方案还使我有机会,因为很容易获得成功上传的保持不成功的上传占位符将不完整。

// Michael

相关问题