2017-01-11 44 views
0

我想在NodeJS中使用promisified函数从MongoDB中聚合数据。 我的转储脚本在这里https://github.com/network-spy/lego使用promisified函数在NodeJS中进行数据聚合

小描述:数据库中有2个集合:“zip”和“restaurants”。 “zip”包含位置的邮政编码,“餐馆”包含有关邮政编码的餐馆的信息。因此,脚本应创建新的收集“统计”,并填写文件如: {“zip_code”:“01002”,“餐馆”:[餐馆清单]}

问题是,在“zip”集合那里是29353个文档,但是在脚本处理之后,我得到了29026个文档的“stat”集合(有时可以改变文档数目)。

我想这是因为在我的JS代码的某处失效。请你看看我的代码和建议如何修复它?

const MongoClient = require('mongodb').MongoClient; 
const mongoDbUrl = 'mongodb://127.0.0.1:27017/world'; 

MongoClient.connect(mongoDbUrl, function(err, db) { 
    if (err) { 
     console.log(err); 
     return; 
    } 
    console.log("Connected to server."); 
    clearStat(db).then(
     result => { 
      console.log(result); 
      processZips(db).then(
       result => { 
        console.log(result); 
        closeMongoDBConnection(db); 
       }, 
       error => { 
        console.log(error); 
        closeMongoDBConnection(db); 
       } 
      ); 
     }, 
     error => { 
      console.log(error); 
      closeMongoDBConnection(db); 
     } 
    ); 
}); 

let closeMongoDBConnection = (db) => { 
    db.close(); 
    console.log("Disconnected from server."); 
}; 

let clearStat = (db) => { 
    return new Promise((resolve, reject) => { 
     db.collection('stat').deleteMany({}, function(err, results) { 
      if (err) { 
       reject(err); 
      } 
      resolve('Stat data cleared'); 
     }); 
    }); 
}; 


let processZips = (db) => { 
    return new Promise((resolve, reject) => { 
     db.collection('zip').find({}, {"_id":1}).each((err, zipCode) => { 
      if (zipCode == null) { 
       resolve('Zips precessed'); 
      } else if (err) { 
       reject(err); 
      } else { 
       findRestaurantsByZip(db, zipCode._id).then(
        result => { 
         insertToStat(db, zipCode._id, result).then(
          result => { 
           console.log('Inserted: '); 
           console.dir(result); 
          }, 
          error => { 
           reject(error); 
          } 
         ); 
        }, 
        error => { 
         reject(error); 
        } 
       ); 
      } 
     }); 
    }); 
}; 

let findRestaurantsByZip = (db, zipCode) => { 
    return new Promise((resolve, reject) => { 
     db.collection('restaurant').find({"address.zipcode": zipCode}).toArray((err, restaurants) => { 
      if (err) { 
       reject(err); 
      } 
      resolve(restaurants); 
     }); 
    }); 
}; 

let insertToStat = (db, zip, restaurants) => { 
    return new Promise((resolve, reject) => { 
     let statDocument = {}; 
     statDocument.zip_code = zip; 
     statDocument.restaurants = restaurants; 
     db.collection('stat').insertOne(statDocument).then(
      result => { 
       resolve(statDocument); 
      }, 
      error => { 
       reject(error); 
      } 
     ); 
    }); 
}; 
+1

我在你的git的代码中编辑过,我认为这是问题 - 它只有100行。我在问题中看到了更长,未格式化的代码,因此您应该在可以 –

+0

好时添加代码。感谢您的帮助 –

+0

最新版本的node.js mongodb驱动程序具有对promise的本地支持,因此在您的函数中不需要明确的promise。代码应该显着简化并且更容易调试。 –

回答

1

首先,简化您的processZips函数。这是功能上等同于你的代码,但使用无极链接,而不是您解决在.each处理结束的processZips承诺嵌套承诺

let processZips = (db) => new Promise((resolve, reject) => 
    db.collection('zip').find({}, {"_id":1}).each((err, zipCode) => { 
     if (zipCode == null) { 
      resolve('Zips precessed'); 
     } else if (err) { 
      reject(err); 
     } else { 
      findRestaurantsByZip(db, zipCode._id) 
      .then(result => insertToStat(db, zipCode._id, result)) 
      .then(result => console.log('Inserted: ', result)) 
      .catch(error => reject(error)); 
     } 
    }) 
); 

问题可以是(我不能测试任何东西)。这会“触发”关闭数据库的.then。但是,由于异步查找/插入代码,当时可能有一些“正在进行中”。我不知道什么是mongodb,所以我不知道什么关闭分贝,而处理仍然活跃会做 - 看起来很可能这就是为什么你的输出数据是“短”

所以,有两种方式可以解决这个

1 - 工艺系列中的每个邮政编码,即每找到/插入等待以前完成,然后在最后一个邮政编码做

let processZips = (db) => { 
    // set p to a resolved Promise so the first find/insert will kick off 
    let p = Promise.resolve(); 
    return new Promise((resolve, reject) => 
     db.collection('zip').find({}, {"_id":1}).each((err, zipCode) => { 
      if (zipCode == null) { 
       // wait for last insert to complete before resolving the Promise 
       resolve(p.then(() => resolve('Zips precessed'))); // see note 1, 2 
      } else if (err) { 
       reject(err); 
      } else { 
       // wait for previous insert to complete before starting new find/insert 
       p = p 
       .then(() => findRestaurantsByZip(db, zipCode._id)) 
       .then(result => insertToStat(db, zipCode._id, result)) 
       .then(result => console.log('Inserted: ', result)); // see note 1 
      } 
     }) 
    ); 
}; 

有了这个代码解决,尽快作为查找/插入拒绝,实际上将不再执行查找/插入

2 - 过程中的“平行”,即揭开序幕所有的查找/插入,然后解决在所有邮编完成

let processZips = (db) => { 
    // create an array for all the find/insert Promises 
    let p = []; 
    return new Promise((resolve, reject) => 
     db.collection('zip').find({}, {"_id":1}).each((err, zipCode) => { 
      if (zipCode == null) { 
       // wait for all find/insert to complete before resolving this Promise 
       resolve(Promise.all(p).then(() => 'Zips precessed')); // see note 1, 2 
      } else if (err) { 
       reject(err); 
      } else { 
       p.push(findRestaurantsByZip(db, zipCode._id) 
        .then(result => insertToStat(db, zipCode._id, result)) 
        .then(result => console.log('Inserted: ', result)) 
       ); // see note 1 
      } 
     }) 
    ); 
}; 

一个需要注意用第二种方法是,像你原来的每个码代码,如果其中一个查找/插入失败,不会停止后续查找/插入处理。

您会注意到,与原始代码相比,似乎缺少错误处理。此代码使用承诺的2个“功能”。

  1. 拒绝将“流过”的承诺链,
  2. 如果解决了一个承诺,一个拒绝承诺,它等同于拒绝承诺。
+0

谢谢你的详细解答。第二种方法 - 就是我想要实现的。 –