1
我使用async queue来处理庞大的数据量。该队列工作得很好,直到我尝试使用MongoDB findOneAndUpdate方法在数据库中执行更新。为什么我的回调没有使用异步队列进行触发?
我先建立队列,并开始从节点流推送对象到它:
//Create queue to process all items
let q = async.queue(processLink, 2);
// Create Node Stream
let createStream = function() {
let stream = fs.createReadStream(LinkData, {encoding: 'utf8'});
let parser = JSONStream.parse('RECDATA.*');
return stream.pipe(parser);
};
//Listen to 'data' event on stream and add object to queue
createStream().on('data', function(link){
q.push(link)
});
这是我的任务功能,“processLink”。正是在这里,我无法解决问题。每当findOneAndUpdate回调被触发,它进入条件块之一,我得到消息记录到控制台,但是当我打电话异步回调processComplete(),如预期的任务没有完成。
正如标题所暗示的,为什么我的异步回调没有完成每个任务?
function processLink(link, processComplete){
if(_.includes(link.URL, 'www.usda.gov') && _.includes(link.URL, '?recid=')){
let url_items = _.split(link.URL, '=',2);
let facilityOrgID = url_items[1];
let update = {$push: {"links": link}};
if(_.isNumber(parseInt(facilityOrgID)) && facilityOrgID.length > 4){
Facility.findOneAndUpdate({facilityOrgID: parseInt(facilityOrgID)}, update, (err, result) => {
if(err !== null){
console.log("Error:",err);
return processComplete(err); /** NOT FIRING **/
} else if(err === null && result !== null){
console.log("Link added to:", result.name);
return processComplete(); /** NOT FIRING **/
}else if(err === null && result === null){
console.log('Facility not in database');
processComplete(); /** NOT FIRING **/
}else{
console.log('Something has gone terrible wrong');
}
});
}else{
console.log("Invalid facilityID");
return processComplete();
}
}else{
console.log('Link Discarded:', link.URL);
processComplete(); /** Fires normally **/
}
}