2016-09-06 51 views
1

我正在使用NodeJS和名为line-by-line的模块读取大文件(1mil +具有许多属性的记录)。用流读取大文件并插入到数据库NodeJS

读数在流中,以避免内存不足错误。当我达到50000条记录时,我想将它们插入数据库。我正在使用node-mssql模块。

由于我对这种异步编程颇为陌生,所以我很难理解导致某些批处理未导入数据库的问题。通常1-4。

读文件代码:

function readAndInsertToDb(fileName, callback){ 
    lr = new LineByLineReader(fileName); 

    lr.on('error', function (err) { 
     callback("Error while reading file:" + err); 
    }); 
    var contacts = []; 
    var batchNr = 0; 
    var lineNr = 0; 
    lr.on('line', function (line) { 
     // 'line' contains the current line without the trailing newline character. 
     if (lineNr>0){ 
      //util.log("Line number: "+lineNr); 
      contacts.push(toObject(line.split(";"))); 
      if ((contacts.length%50000)==0){ 
       batchNr++; 
       util.log("Inserting file data to DB. Batch #"+batchNr); 
       dbHandler.insertToDb("DSContact", contacts, 0, callback); 
       contacts = []; 
       util.log("Array emptied after insertion to DB."); 
      } 
     } 
     lineNr++; 
    }); 

    lr.on('end', function() { 
     batchNr++; 
     // All lines are read, file is closed now. 
     util.log("Inserting file data to DB. Last batch #"+batchNr); 
     var lastBatch = true; 
     dbHandler.insertToDb("DSContact", contacts, 0, callback, lastBatch); 
     return; 
    }); 
} 

DB插入:

dbHandler.prototype.insertToDb = function (tableName, contactData, callback, lastBatch, batchNr){ 
    var self = this; 
    util.log("Going to insert data to DB: "+contactData.length); 
    sql.connect(this.mssqlConfig).then(function() { 
     // DO the actual bulk insert 
     var table = new sql.Table(tableName); 
     table.create = false; 
     table.columns.add('EmailAddress', sql.NVarChar(255), {nullable: false, primary: true}); 
     table.columns.add('DataSourceId', sql.NVarChar(50), {nullable: true}); 
     util.log("Array size: "+contactData.length); 
     var counter = 0; 
     for (var i=0; i<contactData.length; i++){ 
      table.rows.add(contactData[i].EmailAddress, contactData[i].DataSourceId); 
      counter++; 
     } 

     var request = new sql.Request(); 
     util.log("Making SQL request for batch: #"+batchNr); 
     request.bulk(table, function(err, rowCount) { 
      if (err){ 
       sql.close(); 
       return callback("Error during executing a query: " + err); 
      } 
      util.log("Insertion of a batch done. #"+batchNr); 
      sql.close(); 
      if (lastBatch){ 
       callback(null); 
      } 
      return; 
     }); 
    }).catch(function(err) { 
      util.log("Error during DB connection: "+err); 
      sql.close(); 
    }); 
} 

输出日志(您可能会看到一些批次 - #5,#18 ......没有写日志说是插入完成):

6 Sep 11:25:22 - Going to blank out table: DSContact 
6 Sep 11:25:23 - Deleting from table: DSContact 
6 Sep 11:25:36 - Inserting file data to DB. Batch #1 
6 Sep 11:25:36 - Going to insert data to DB: 50000 
6 Sep 11:25:36 - Array emptied after insertion to DB. 
6 Sep 11:25:36 - Array size: 50000 
6 Sep 11:25:36 - Making SQL request for batch: #1 
6 Sep 11:25:38 - Insertion of a batch done. #1 
6 Sep 11:25:38 - Inserting file data to DB. Batch #2 
6 Sep 11:25:38 - Going to insert data to DB: 50000 
6 Sep 11:25:38 - Array emptied after insertion to DB. 
6 Sep 11:25:39 - Array size: 50000 
6 Sep 11:25:39 - Making SQL request for batch: #2 
6 Sep 11:25:40 - Inserting file data to DB. Batch #3 
6 Sep 11:25:40 - Going to insert data to DB: 50000 
6 Sep 11:25:40 - Array emptied after insertion to DB. 
6 Sep 11:25:40 - Array size: 50000 
6 Sep 11:25:40 - Making SQL request for batch: #3 
6 Sep 11:25:41 - Insertion of a batch done. #2 
6 Sep 11:25:42 - Inserting file data to DB. Batch #4 
6 Sep 11:25:42 - Going to insert data to DB: 50000 
6 Sep 11:25:42 - Array emptied after insertion to DB. 
6 Sep 11:25:42 - Array size: 50000 
6 Sep 11:25:42 - Making SQL request for batch: #4 
6 Sep 11:25:42 - Insertion of a batch done. #3 
6 Sep 11:25:44 - Inserting file data to DB. Batch #5 
6 Sep 11:25:44 - Going to insert data to DB: 50000 
6 Sep 11:25:44 - Array emptied after insertion to DB. 
6 Sep 11:25:44 - Insertion of a batch done. #4 
6 Sep 11:25:45 - Inserting file data to DB. Batch #6 
6 Sep 11:25:45 - Going to insert data to DB: 50000 
6 Sep 11:25:45 - Array emptied after insertion to DB. 
6 Sep 11:25:45 - Array size: 50000 
6 Sep 11:25:45 - Making SQL request for batch: #6 
6 Sep 11:25:46 - Insertion of a batch done. #6 
6 Sep 11:25:47 - Inserting file data to DB. Batch #7 
6 Sep 11:25:47 - Going to insert data to DB: 50000 
6 Sep 11:25:47 - Array emptied after insertion to DB. 
6 Sep 11:25:47 - Array size: 50000 
6 Sep 11:25:47 - Making SQL request for batch: #7 
6 Sep 11:25:49 - Insertion of a batch done. #7 
6 Sep 11:25:49 - Inserting file data to DB. Batch #8 
6 Sep 11:25:49 - Going to insert data to DB: 50000 
6 Sep 11:25:49 - Array emptied after insertion to DB. 
6 Sep 11:25:49 - Array size: 50000 
6 Sep 11:25:49 - Making SQL request for batch: #8 
6 Sep 11:25:51 - Insertion of a batch done. #8 
6 Sep 11:25:51 - Inserting file data to DB. Batch #9 
6 Sep 11:25:51 - Going to insert data to DB: 50000 
6 Sep 11:25:51 - Array emptied after insertion to DB. 
6 Sep 11:25:51 - Array size: 50000 
6 Sep 11:25:51 - Making SQL request for batch: #9 
6 Sep 11:25:52 - Insertion of a batch done. #9 
6 Sep 11:25:53 - Inserting file data to DB. Batch #10 
6 Sep 11:25:53 - Going to insert data to DB: 50000 
6 Sep 11:25:53 - Array emptied after insertion to DB. 
6 Sep 11:25:53 - Array size: 50000 
6 Sep 11:25:53 - Making SQL request for batch: #10 
6 Sep 11:25:54 - Insertion of a batch done. #10 
6 Sep 11:25:54 - Inserting file data to DB. Batch #11 
6 Sep 11:25:54 - Going to insert data to DB: 50000 
6 Sep 11:25:54 - Array emptied after insertion to DB. 
6 Sep 11:25:54 - Array size: 50000 
6 Sep 11:25:55 - Making SQL request for batch: #11 
6 Sep 11:25:56 - Insertion of a batch done. #11 
6 Sep 11:25:56 - Inserting file data to DB. Batch #12 
6 Sep 11:25:56 - Going to insert data to DB: 50000 
6 Sep 11:25:56 - Array emptied after insertion to DB. 
6 Sep 11:25:56 - Array size: 50000 
6 Sep 11:25:56 - Making SQL request for batch: #12 
6 Sep 11:25:58 - Inserting file data to DB. Batch #13 
6 Sep 11:25:58 - Going to insert data to DB: 50000 
6 Sep 11:25:58 - Array emptied after insertion to DB. 
6 Sep 11:25:58 - Array size: 50000 
6 Sep 11:25:58 - Making SQL request for batch: #13 
6 Sep 11:25:58 - Insertion of a batch done. #12 
6 Sep 11:26:00 - Insertion of a batch done. #13 
6 Sep 11:26:00 - Inserting file data to DB. Batch #14 
6 Sep 11:26:00 - Going to insert data to DB: 50000 
6 Sep 11:26:00 - Array emptied after insertion to DB. 
6 Sep 11:26:00 - Array size: 50000 
6 Sep 11:26:00 - Making SQL request for batch: #14 
6 Sep 11:26:01 - Insertion of a batch done. #14 
6 Sep 11:26:02 - Inserting file data to DB. Batch #15 
6 Sep 11:26:02 - Going to insert data to DB: 50000 
6 Sep 11:26:02 - Array emptied after insertion to DB. 
6 Sep 11:26:02 - Array size: 50000 
6 Sep 11:26:02 - Making SQL request for batch: #15 
6 Sep 11:26:04 - Inserting file data to DB. Batch #16 
6 Sep 11:26:04 - Going to insert data to DB: 50000 
6 Sep 11:26:04 - Array emptied after insertion to DB. 
6 Sep 11:26:04 - Array size: 50000 
6 Sep 11:26:04 - Making SQL request for batch: #16 
6 Sep 11:26:04 - Insertion of a batch done. #15 
6 Sep 11:26:05 - Insertion of a batch done. #16 
6 Sep 11:26:05 - Inserting file data to DB. Batch #17 
6 Sep 11:26:05 - Going to insert data to DB: 50000 
6 Sep 11:26:05 - Array emptied after insertion to DB. 
6 Sep 11:26:06 - Array size: 50000 
6 Sep 11:26:06 - Making SQL request for batch: #17 
6 Sep 11:26:07 - Inserting file data to DB. Batch #18 
6 Sep 11:26:07 - Going to insert data to DB: 50000 
6 Sep 11:26:07 - Array emptied after insertion to DB. 
6 Sep 11:26:07 - Insertion of a batch done. #17 
6 Sep 11:26:08 - Inserting file data to DB. Batch #19 
6 Sep 11:26:08 - Going to insert data to DB: 50000 
6 Sep 11:26:08 - Array emptied after insertion to DB. 
6 Sep 11:26:08 - Array size: 50000 
6 Sep 11:26:09 - Making SQL request for batch: #19 
6 Sep 11:26:10 - Insertion of a batch done. #19 
6 Sep 11:26:10 - Inserting file data to DB. Batch #20 
6 Sep 11:26:10 - Going to insert data to DB: 50000 
6 Sep 11:26:10 - Array emptied after insertion to DB. 
6 Sep 11:26:10 - Array size: 50000 
6 Sep 11:26:10 - Making SQL request for batch: #20 
6 Sep 11:26:11 - Insertion of a batch done. #20 
6 Sep 11:26:11 - Inserting file data to DB. Batch #21 
6 Sep 11:26:11 - Going to insert data to DB: 50000 
6 Sep 11:26:11 - Array emptied after insertion to DB. 
6 Sep 11:26:12 - Array size: 50000 
6 Sep 11:26:12 - Making SQL request for batch: #21 
6 Sep 11:26:13 - Inserting file data to DB. Batch #22 
6 Sep 11:26:13 - Going to insert data to DB: 50000 
6 Sep 11:26:13 - Array emptied after insertion to DB. 
6 Sep 11:26:13 - Array size: 50000 
6 Sep 11:26:13 - Making SQL request for batch: #22 
6 Sep 11:26:14 - Insertion of a batch done. #21 
6 Sep 11:26:14 - Insertion of a batch done. #22 
6 Sep 11:26:15 - Inserting file data to DB. Batch #23 
6 Sep 11:26:15 - Going to insert data to DB: 50000 
6 Sep 11:26:15 - Array emptied after insertion to DB. 
6 Sep 11:26:15 - Array size: 50000 
6 Sep 11:26:15 - Making SQL request for batch: #23 
6 Sep 11:26:16 - Inserting file data to DB. Last batch #24 
6 Sep 11:26:16 - Going to insert data to DB: 37919 
6 Sep 11:26:16 - Insertion of a batch done. #23 

回答

0

我有种想通了自己,解决方案是让它更顺序。虽然我仍然想知道为什么我以前的解决方案没有奏效。

我在line-by-linedocumentation发现这一点:

线的异步处理:

var LineByLineReader = require('line-by-line'), 
    lr = new LineByLineReader('big_file.txt'); 

lr.on('error', function (err) { 
    // 'err' contains error object 
}); 

lr.on('line', function (line) { 
    // pause emitting of lines... 
    lr.pause(); 

    // ...do your asynchronous line processing.. 
    setTimeout(function() { 

     // ...and continue emitting lines. 
     lr.resume(); 
    }, 100); 
}); 

lr.on('end', function() { 
    // All lines are read, file is closed now. 
}); 

所以我的新的解决方案是这样的:

读文件代码:

function readAndInsertToDb(fileName, callback){ 
    lr = new LineByLineReader(fileName); 

    lr.on('error', function (err) { 
     callback("Error while reading file:" + err); 
    }); 
    var contacts = []; 
    var batchNr = 0; 
    var lineNr = 0; 
    lr.on('line', function (line) { 


     // 'line' contains the current line without the trailing newline character. 
     //if (lineNr>0){ 
      //util.log("Line number: "+lineNr); 
      contacts.push(toObject(line.split(";"))); 
      if ((contacts.length%50000)==0){ 
       lr.pause(); 


       batchNr++; 
       util.log("Inserting file data to DB. Batch #"+batchNr); 
       dbHandler.insertToDb("DSContact", contacts, callback, false, function(){lr.resume();}); 
       contacts = []; 
       util.log("Array emptied after insertion to DB."); 
      } 
     //} 
     lineNr++; 
    }); 

    lr.on('end', function() { 
     batchNr++; 
     // All lines are read, file is closed now. 
     util.log("Inserting file data to DB. Last batch #"+batchNr); 
     var lastBatch = true; 
     dbHandler.insertToDb("DSContact", contacts, callback, lastBatch, batchNr); 
     return; 
    }); 
} 

DB插入码:

dbHandler.prototype.insertToDb = function (tableName, contactData, callback, lastBatch, innerCallback){ 
    var self = this; 
    util.log("Going to insert data to DB: "+contactData.length); 
    sql.connect(this.mssqlConfig).then(function() { 
     // DO the actual bulk insert 
     var table = new sql.Table(tableName); 
     table.create = false; 
     table.columns.add('EmailAddress', sql.NVarChar(255), {nullable: false, primary: true}); 
     table.columns.add('DataSourceId', sql.NVarChar(50), {nullable: true}); 
     util.log("Array size: "+contactData.length); 
     var counter = 0; 
     for (var i=0; i<contactData.length; i++){ 
      table.rows.add(contactData[i].EmailAddress, contactData[i].DataSourceId); 
      counter++; 
     } 

     var request = new sql.Request(); 
     util.log("Making SQL request."); 
     request.bulk(table, function(err, rowCount) { 
      if (err){ 
       sql.close(); 
       return callback("Error during executing a query: " + err); 
      } 
      util.log("Insertion of a batch done."); 
      sql.close(); 
      if (lastBatch){ 
       return callback(null); 
      } 
      return innerCallback(); 
     }); 
    }).catch(function(err) { 
      util.log("Error during DB connection: "+err); 
      sql.close(); 
    }); 
} 
相关问题