2016-07-25 121 views
0

我正在处理一个政治竞选捐款的数据集,该数据集最终成为一个大约500MB的JSON文件(最初是一个124MB CSV)。在Firebase网络界面导入(尝试崩溃Google Chrome上的标签之前)太大了。我试图手动上传对象,因为它们是从CSV制作的(使用CSVtoJSON转换器,每一行都成为JSON对象,然后我会将该对象上传到Firebase)。将大量数据导入Firebase数据库的正确方法是什么?

这是我使用的代码。

var firebase = require('firebase'); 
var Converter = require("csvtojson").Converter; 
firebase.initializeApp({ 
    serviceAccount: "./credentials.json", 
    databaseURL: "url went here" 
}); 
var converter = new Converter({ 
    constructResult:false, 
    workerNum:4 
}); 
var db = firebase.database(); 
var ref = db.ref("/"); 

var lastindex = 0; 
var count = 0; 
var section = 0; 
var sectionRef; 
converter.on("record_parsed",function(resultRow,rawRow,rowIndex){ 
    if (rowIndex >= 0) { 
     sectionRef = ref.child("reports" + section); 
     var reportRef = sectionRef.child(resultRow.Report_ID); 
     reportRef.set(resultRow); 
     console.log("Report uploaded, count at " + count + ", section at " + section); 
     count += 1; 
     lastindex = rowIndex; 
     if (count >= 1000) { 
      count = 0; 
      section += 1; 
     } 
     if (section >= 100) { 
      console.log("last completed index: " + lastindex); 
      process.exit(); 
     } 
    } else { 
     console.log("we out of indices"); 
     process.exit(); 
    } 

}); 
var readStream=require("fs").createReadStream("./vUPLOAD_MASTER.csv"); 
readStream.pipe(converter); 

但是,这会遇到内存问题并且无法完成数据集。由于Firebase没有显示上传的所有数据,因此试图以大块的方式执行操作并不可行,而且我也不确定从哪里离开。 (当离开火力地堡数据库在Chrome中打开,我看到的数据进来,但最终的标签会崩溃,并在重装了很多后来的数据的缺失。)

然后我用Firebase Streaming Import试过,但抛出这个错误:

started at 1469471482.77 
Traceback (most recent call last): 
    File "import.py", line 90, in <module> 
    main(argParser.parse_args()) 
    File "import.py", line 20, in main 
    for prefix, event, value in parser: 
    File "R:\Python27\lib\site-packages\ijson\common.py", line 65, in parse 
    for event, value in basic_events: 
    File "R:\Python27\lib\site-packages\ijson\backends\python.py", line 185, in basic_parse 
    for value in parse_value(lexer): 
    File "R:\Python27\lib\site-packages\ijson\backends\python.py", line 127, in parse_value 
    raise UnexpectedSymbol(symbol, pos) 
ijson.backends.python.UnexpectedSymbol: Unexpected symbol u'\ufeff' at 0 

迎向了那最后一行(从ijson错误),我发现this SO thread,但我只是不知道我应该如何使用它来获取火力地堡流导入工作。

我删除使用议会从JSON文件,我要上传的字节顺序标记,现在我一分钟左右的时间里运行的进口商得到这个错误:

Traceback (most recent call last): 
    File "import.py", line 90, in <module> 
    main(argParser.parse_args()) 
    File "import.py", line 20, in main 
    for prefix, event, value in parser: 
    File "R:\Python27\lib\site-packages\ijson\common.py", line 65, in parse 
    for event, value in basic_events: 
    File "R:\Python27\lib\site-packages\ijson\backends\python.py", line 185, in basic_parse 
    for value in parse_value(lexer): 
    File "R:\Python27\lib\site-packages\ijson\backends\python.py", line 116, in parse_value 
    for event in parse_array(lexer): 
    File "R:\Python27\lib\site-packages\ijson\backends\python.py", line 138, in parse_array 
    for event in parse_value(lexer, symbol, pos): 
    File "R:\Python27\lib\site-packages\ijson\backends\python.py", line 119, in parse_value 
    for event in parse_object(lexer): 
    File "R:\Python27\lib\site-packages\ijson\backends\python.py", line 170, in parse_object 
    pos, symbol = next(lexer) 
    File "R:\Python27\lib\site-packages\ijson\backends\python.py", line 51, in Lexer 
    buf += data 
MemoryError 

的火力地堡流进口商应该能够处理250MB以上的文件,并且我相当肯定我有足够的内存来处理这个文件。任何想法为什么这个错误出现?

如果看到实际的JSON文件,我尝试使用Firebase Streaming Import上传会有所帮助,here it is

回答

0

我放弃了Firebase流导入并编写了我自己的工具,该工具使用csvtojson转换CSV,然后使用Firebase Node API一次上传每个对象。

这里的脚本:

var firebase = require("firebase"); 
firebase.initializeApp({ 
    serviceAccount: "./credentials.json", 
    databaseURL: "https://necir-hackathon.firebaseio.com/" 
}); 

var db = firebase.database(); 
var ref = db.ref("/reports"); 
var fs = require('fs'); 
var Converter = require("csvtojson").Converter; 
var header = "Report_ID,Status,CPF_ID,Filing_ID,Report_Type_ID,Report_Type_Description,Amendment,Amendment_Reason,Amendment_To_Report_ID,Amended_By_Report_ID,Filing_Date,Reporting_Period,Report_Year,Beginning_Date,Ending_Date,Beginning_Balance,Receipts,Subtotal,Expenditures,Ending_Balance,Inkinds,Receipts_Unitemized,Receipts_Itemized,Expenditures_Unitemized,Expenditures_Itemized,Inkinds_Unitemized,Inkinds_Itemized,Liabilities,Savings_Total,Report_Month,UI,Reimbursee,Candidate_First_Name,Candidate_Last_Name,Full_Name,Full_Name_Reverse,Bank_Name,District_Code,Office,District,Comm_Name,Report_Candidate_First_Name,Report_Candidate_Last_Name,Report_Office_District,Report_Comm_Name,Report_Bank_Name,Report_Candidate_Address,Report_Candidate_City,Report_Candidate_State,Report_Candidate_Zip,Report_Treasurer_First_Name,Report_Treasurer_Last_Name,Report_Comm_Address,Report_Comm_City,Report_Comm_State,Report_Comm_Zip,Category,Candidate_Clarification,Rec_Count,Exp_Count,Inkind_Count,Liab_Count,R1_Count,CPF9_Count,SV1_Count,Asset_Count,Savings_Account_Count,R1_Item_Count,CPF9_Item_Count,SV1_Item_Count,Filing_Mechanism,Also_Dissolution,Segregated_Account_Type,Municipality_Code,Current_Report_ID,Location,Individual_Or_Organization,Notable_Contributor,Currently_Accessed" 
var queue = []; 
var count = 0; 
var upload_lock = false; 
var lineReader = require('readline').createInterface({ 
    input: fs.createReadStream('test.csv') 
}); 

lineReader.on('line', function (line) { 
    var line = line.replace(/'/g, "\\'"); 
    var csvString = header + '\n' + line; 
    var converter = new Converter({}); 
    converter.fromString(csvString, function(err,result){ 
     if (err) { 
      var errstring = err + "\n"; 
      fs.appendFile('converter_error_log.txt', errstring, function(err){ 
       if (err) { 
       console.log("Converter: Append Log File Error Below:"); 
       console.error(err); 
       process.exit(1); 
      } else { 
       console.log("Converter Error Saved"); 
      } 
      }); 
     } else { 
      result[0].Location = ""; 
      result[0].Individual_Or_Organization = ""; 
      result[0].Notable_Contributor = ""; 
      result[0].Currently_Accessed = ""; 
      var reportRef = ref.child(result[0].Report_ID); 
      count += 1; 
      reportRef.set(result[0]); 
      console.log("Sent #" + count); 
     } 
    }); 
}); 

唯一需要注意的是,尽管该脚本可以快速发送出去的所有对象,火力地堡显然需要连接保持,而这是他们节省的,因为所有对象后关闭脚本被发送导致很多对象没有出现在数据库中。 (我等了20分钟,但可能会更短)

相关问题