2014-01-24 48 views
5

考虑下面的代码...我想读第5行后暂停流:暂停的readline

var fs   = require('fs'); 
var readline = require('readline'); 
var stream  = require('stream'); 
var numlines = 0; 
var instream = fs.createReadStream("myfile.json"); 
var outstream = new stream; 
var readStream = readline.createInterface(instream, outstream); 
readStream.on('line', function(line){ 
    numlines++; 
    console.log("Read " + numlines + " lines"); 
    if (numlines >= 5) { 
    console.log("Pausing stream"); 
    readStream.pause(); 
    } 
}); 

输出(下一个复制)表明,它使阅读后线暂停。也许readline已经在缓冲区中排队了几行,并将它们反馈给我......如果它继续在后台异步读取,这将是有意义的,但基于文档,我不知道适当的行为应该是。有关如何达到预期效果的任何建议?

Read 1 lines 
Read 2 lines 
Read 3 lines 
Read 4 lines 
Read 5 lines 
Pausing stream 
Read 6 lines 
Pausing stream 
Read 7 lines 

回答

6

因此,事实证明,即使在暂停()之后,readline流也倾向于“滴漏”(即,泄漏一些额外的行)。文件没有说清楚,但这是事实。

如果您希望暂停()切换立即出现,您必须创建自己的行缓冲区并自己累积剩余行。

8

有点unintuitively,the pause methods does not stop queued up line events

调用rl.pause()不会立即从由readline.Interface实例被发射暂停其它事件(包括'line')。

然而,有一个名为line-by-line其中pause确实暂停line事件,直到它恢复第三方模块。

var LineByLineReader = require('line-by-line'), 
    lr = new LineByLineReader('big_file.txt'); 

lr.on('error', function (err) { 
    // 'err' contains error object 
}); 

lr.on('line', function (line) { 
    // pause emitting of lines... 
    lr.pause(); 

    // ...do your asynchronous line processing.. 
    setTimeout(function() { 

     // ...and continue emitting lines. 
     lr.resume(); 
    }, 100); 
}); 

lr.on('end', function() { 
    // All lines are read, file is closed now. 
}); 

(我有一个模块没有隶属关系,只是觉得处理这个问题非常有用。)

+0

感谢您的回答。出于兴趣,这种要求有多普遍?我正在解析需要流式传输到服务器的80GB CSV。还有什么其他用例? –

+1

@ZachSmith我发现在回调不能或不应该同步完成时(比如说,将行插入到数据库中),可以暂停和恢复。如果您正在读取的行比处理它们的速度快,则可能会排队过多请求并耗尽内存。 –

0

加点分:

.on('pause', function() { 
    console.log(numlines) 
}) 

您将得到5.它在node.js document中提到:

  • 输入流未暂停收到 SIGCONT事件。 (参见事件SIGTSTP和SIGCONT)

所以,我在行事件中创建了一个tmp缓冲区。使用标志来确定它是否被触发暂停。

.on('line', function(line) { 
    if (paused) { 
     putLineInBulkTmp(line); 
    } else { 
     putLineInBulk(line); 
    } 
} 

然后在上暂停和恢复:

.on('pause', function() { 
    paused = true; 
    doSomething(bulk, function(resp) { 
     // clean up bulk for the next. 
     bulk = []; 
     // clone tmp buffer. 
     bulk = clone(bulktmp); 
     bulktmp = []; 
     lr.resume(); 
    }); 
}) 
.on('resume',() => { 
    paused = false; 
}) 

用这种方式来处理这种情况。