2017-03-07 35 views
0

是否可以从fs.readStream处理数据流中的大块数据?示例:读取一个文件并计算一个字符出现的次数,而不将整个文件存储在内存中。简单地解析流中的每个块并汇总一个数字..我尝试创建一个函数传递给.pipe(),但是fn需要定义.on()并且失败。是否有可能从fs.readStream拦截和定制过程流块?

我在流媒体上有点初学者。我做了一堆谷歌搜索,阅读,试验,但没有任何帮助。我也无法找到有关管道工作方式的信息,或者是否可以制作自己的定制管道接收器功能。

感谢

+0

你看看https://github.com/substack/stream-handbook? –

+0

我没有 - 我现在要感谢 – KMongiello

回答

1

的easies的事情你可以做的是刚刚订阅的“数据”事件,并简单地计算的出现是这样的:

'use strict'; 
 

 
const fs = require('fs'); 
 

 
const countCharsInString = function (st, char) { 
 
\t //Use a regex to calculate this 
 
\t let regex = new RegExp(char, 'g'); 
 
\t return (st.match(regex) || []).length 
 
}; 
 

 
const CHAR_TO_COUNT = '1'; 
 
let total_count = 0; 
 

 
let fileStream = fs.createReadStream('./log.txt'); 
 

 
//We'll calculate the occurrences as the stream emits data event 
 
//As we don't keep any references to 'chunk' and we don't collect the data anywhere 
 
//The garbage collector will clean the memory and we'll not run out of the RAM. 
 
fileStream.on('data', chunk => { 
 
\t let string = chunk.toString(); 
 
\t total_count += countCharsInString(string, CHAR_TO_COUNT); 
 
}); 
 
fileStream.on('error', err => console.error(err.stack)); 
 
//Log the count when everything is done. 
 
fileStream.on('end',() => console.log(`All done. There are ${total_count} occurrences of character '${CHAR_TO_COUNT}'.`));

但是如果你想要做的通过数据流进行处理,即时汇总数据,然后将数据传输到其他地方,以下是您可以执行的操作:

'use strict'; 
 

 
const fs = require('fs'); 
 
const Transform = require('stream').Transform; 
 

 
//We inherit from the Transform stream class 
 
class OcurrenceCountingStream extends Transform { 
 
\t constructor(options) { 
 
\t \t super(options); 
 

 
\t \t //Allowing to pass an option here 
 
\t \t this.charToCount = options.charToCount; 
 
\t \t this.totalCount = 0; 
 
\t } 
 

 
\t //This is now a static method as it's a pure function 
 
\t //That does not depend on the object state 
 
\t static countCharsInString(st, char) { 
 
\t \t //Use a regex to calculate this 
 
\t \t let regex = new RegExp(char, 'g'); 
 
\t \t return (st.match(regex) || []).length 
 
\t } 
 

 
\t //We should implement _transform function 
 
\t //in order to make all piping mechanisms working 
 
\t _transform(chunk, encoding, callback) { 
 
\t \t //Get our string, process and update totalCount 
 
\t \t let string = chunk.toString(); 
 
\t \t this.totalCount += OcurrenceCountingStream.countCharsInString(string, this.charToCount); 
 

 
\t \t //Pass the data further 
 
\t \t callback(null, chunk); 
 
\t } 
 
} 
 

 
let fileStream = fs.createReadStream('./log.txt'); 
 
let countingStream = new OcurrenceCountingStream({charToCount: '1'}); 
 

 
//So now we can pipe 
 
fileStream.pipe(countingStream); 
 
/* 
 
Here is a little moment. 
 
The stream should be in a flowing mode. This means that is started reading the data 
 
From the writable that was piped to it and will keep reading until the writer is ended 
 
So nothing basically happens if we just pipe it like this fileStream.pipe(countingStream); 
 
There are some ways to make countingStream flowing: 
 
1) Subscribe on its 'data' event. 
 
2) Pipe it somewhere 
 
3) Call .resume() is we don't really care of the data that's comming out. 
 
*/ 
 
countingStream.resume(); 
 

 
countingStream.on('error', err => console.error(err.stack)); 
 
countingStream.on('end',() => console.log(`All done. There are ${countingStream.totalCount} occurrences of character '${countingStream.charToCount}'.`));

相关问题