2015-04-01 120 views
3

我试图写一个非常实用的方式。我们使用Highland.js来管理流处理,但是因为我太新了,我想我对如何处理这种独特的情况感到困惑。Highland.js CSV解析

这里的问题是文件流中的所有数据都不一致。文件中的第一行通常是头,我们希望将其存储到内存中,然后压缩流中的所有行。

这是我第一次去吧:

var _  = require('highland'); 
var fs  = require('fs'); 
var stream = fs.createReadStream('./data/gigfile.txt'); 
var output = fs.createWriteStream('output.txt'); 

var headers = []; 

var through = _.pipeline(
    _.split(), 
    _.head(), 
    _.doto(function(col) { 
     headers = col.split(','); 
     return headers; 
    }), 

    ...... 

    _.splitBy(','), 
    _.zip(headers), 
    _.wrapCallback(process) 
); 

_(stream) 
    .pipe(through) 
    .pipe(output); 

在管道中的第一个命令是由行文件分割。下一个抓取标题,doto声明它是一个全局变量。问题是流中接下来的几行不存在,所以进程被阻塞......可能是因为它上面的head()命令。

我试过其他一些变化,但我觉得这个例子给你一个我需要去的地方的感觉。

有关这方面的任何指导都会有所帮助 - 它也提出了我的每行中是否有不同值的问题,我如何在多种不同长度/复杂度的流操作之间分割流程流。

谢谢。

编辑:我产生了一个更好的结果,但我质疑它的效率 - 有没有一种方法我可以优化这个,所以在每次运行我不检查头是否被记录?这仍然感觉不稳定。

var through = _.pipeline(
    _.split(), 
    _.filter(function(row) { 
     // Filter out bogus values 
     if (! row || headers) { 
      return true; 
     } 
     headers = row.split(','); 
     return false; 
    }), 
    _.map(function(row) { 
     return row.split(',') 
    }), 
    _.batch(500), 
    _.compact(), 
    _.map(function(row) { 
     return JSON.stringify(row) + "\n"; 
    }) 
); 

_(stream) 
    .pipe(through) 

回答

3

您可以使用Stream.observe()Stream.fork()分裂流。

var _  = require('highland'); 
var fs  = require('fs'); 
var stream = fs.createReadStream('./data/gigfile.txt'); 
var output = fs.createWriteStream('output.txt'); 
var through = highland.pipeline(function(s) { 
    var headerStream, headers; 
    // setup a shared variable to store the headers 
    headers = []; 
    // setup the csv processing 
    s = s 
     // split input into lines 
     .split() 
     // remove empty lines 
     .compact() 
     // split lines into arrays 
     .map(function(row) { 
      return row.split(','); 
     }); 
    // create a new stream to grab the header 
    headerStream = s.observe(); 
    // pause the original stream 
    s.pause(); 
    // setup processing of the non-header rows 
    s = s 
     // drop the header row 
     .drop(1) 
     // convert the rest of the rows to objects 
     .map(function(row) { 
      var obj = headers.reduce(function(obj, key, i) { 
       obj[key] = row[i]; 
       return obj; 
      }, {}); 
      return JSON.stringify(obj) + "\n"; 
     }); 
    // grab the first row from the header stream 
    // save the headers and then resume the normal stream 
    headerStream.head().toArray(function(rows) { 
     headers = rows[0]; 
     s.resume(); 
    }); 
    return s; 
}); 
_(stream) 
    .pipe(through) 
    .pipe(output); 

也就是说,您的csv解析不会在您的值中转义换行符和逗号。通常情况下,这通过在双引号中包装值来在csv文件中完成。然后双引号通过将两个相邻放在一起来逃脱。这样做有点棘手,所以我建议使用一个处理它的包,比如fast-csv

那么你的代码看起来是这样的:

var _  = require('highland'); 
var fs  = require('fs'); 
var csv = require('fast-csv'); 
var stream = fs.createReadStream('./data/gigfile.txt'); 
var output = fs.createWriteStream('output.txt'); 

_(stream.pipe(csv({headers: true, ignoreEmpty: true}))) 
    .map(function(row) { 
     return JSON.stringify(row) + "\n"; 
    }) 
    .pipe(output);