2012-06-27 48 views
10

我需要制定出实时读取正在使用node.js写入文件的数据的最佳方法。麻烦的是,Node是一种快速移动的船舶,它使寻找解决困难问题的最佳方法成为可能。使用Node.js实时读取文件

我想做
我有一个做的东西,然后写的这个东西它到一个文本文件的结果的java程序。它通常需要5分钟到5小时的任何时间来运行,数据一直写入,并且可以达到一些相当高的吞吐率(大约1000行/秒)。

我想实时读取此文件,然后使用节点聚合数据并将其写入到可在客户端上绘制图形的套接字中。

客户端,图表,套接字和聚合逻辑都已完成,但我对阅读文件的最佳方法感到困惑。

我曾尝试(或至少与饰演)
FIFO - 我可以告诉我的Java程序来写入FIFO和阅读本使用节点,其实这是我们如何有这个目前使用Perl implemted ,但是因为其他所有内容都在节点中运行,所以移植代码很有意义。

Unix Sockets - 如上所述。

fs.watchFile - 这会工作,我们需要什么?

fs.createReadStream - 这比WatchFile更好吗?

fs & tail -f - 看起来像一个黑客。

什么,其实是我的问题
我倾向于使用Unix套接字,这似乎是最快的选择。但是节点是否具有更好的实时读取fs文件的内置功能?

回答

6

如果你想保持文件作为数据的持久化存储,以防止流的损失在系统崩溃或成员之一的网络中的情况下,正在运行的进程死亡后,您仍然可以继续写入文件并从中读取数据。

如果您不需要此文件作为您的Java进程生成结果的持久性存储,那么使用Unix套接字对于易用性和性能都要好得多。

fs.watchFile()不是你所需要的,因为它在文件系统报告时对文件统计信息起作用,并且由于你想读取正在写入的文件,所以这不是你想要的。

短的更新:我很抱歉地认识到,虽然我曾指责fs.watchFile()在前面的段落使用文件统计,我做了我自己同样的事情在我下面的示例代码!尽管我已经告诫读者“保重!”因为我已经在几分钟内写完了,甚至没有经过很好的测试;但是,如果底层系统支持它,则可以通过使用fs.watch()而不是watchFilefstatSync来做得更好。

读/从文件写作,我刚才写下面的乐趣在我的休息:

测试-FS-writer.js:因为你在Java编写的文件你不会需要这个过程]

var fs = require('fs'), 
    lineno=0; 

var stream = fs.createWriteStream('test-read-write.txt', {flags:'a'}); 

stream.on('open', function() { 
    console.log('Stream opened, will start writing in 2 secs'); 
    setInterval(function() { stream.write((++lineno)+' oi!\n'); }, 2000); 
}); 

测试-FS-reader.js:小心,这只是演示,检查ERR对象]

var fs = require('fs'), 
    bite_size = 256, 
    readbytes = 0, 
    file; 

fs.open('test-read-write.txt', 'r', function(err, fd) { file = fd; readsome(); }); 

function readsome() { 
    var stats = fs.fstatSync(file); // yes sometimes async does not make sense! 
    if(stats.size<readbytes+1) { 
     console.log('Hehe I am much faster than your writer..! I will sleep for a while, I deserve it!'); 
     setTimeout(readsome, 3000); 
    } 
    else { 
     fs.read(file, new Buffer(bite_size), 0, bite_size, readbytes, processsome); 
    } 
} 

function processsome(err, bytecount, buff) { 
    console.log('Read', bytecount, 'and will process it now.'); 

    // Here we will process our incoming data: 
     // Do whatever you need. Just be careful about not using beyond the bytecount in buff. 
     console.log(buff.toString('utf-8', 0, bytecount)); 

    // So we continue reading from where we left: 
    readbytes+=bytecount; 
    process.nextTick(readsome); 
} 

您可以安全避免直接使用nextTick并直接致电readsome()。由于我们仍然在这里同步工作,所以在任何意义上都没有必要。我只是喜欢它。 :P

EDIT通过Oliver Lloyd

以上面的例子,但它延伸到读CSV数据给出:

var lastLineFeed, 
    lineArray; 
function processsome(err, bytecount, buff) { 
    lastLineFeed = buff.toString('utf-8', 0, bytecount).lastIndexOf('\n'); 

    if(lastLineFeed > -1){ 

     // Split the buffer by line 
     lineArray = buff.toString('utf-8', 0, bytecount).slice(0,lastLineFeed).split('\n'); 

     // Then split each line by comma 
     for(i=0;i<lineArray.length;i++){ 
      // Add read rows to an array for use elsewhere 
      valueArray.push(lineArray[i].split(',')); 
     } 

     // Set a new position to read from 
     readbytes+=lastLineFeed+1; 
    } else { 
     // No complete lines were read 
     readbytes+=bytecount; 
    } 
    process.nextTick(readFile); 
} 
+0

这是直接解决我的问题的一个很好的例子。这需要加强,但一次只能处理一行,但可以说这是一件好事;节点缺乏现有的fs接口意味着它是完全可定制的,所以即使我必须编写额外的代码,我也可以实现我所需要的。 –

+0

我扩展了上面的示例以使用CSV文件。 –

+0

这在绝对当作节点运行,但我怎样才能把这段代码放在app.js中并在html页面中得到结果呢? – sand

4

为什么你认为tail -f是黑客?

虽然我发现一个很好的例子,我会做类似的事情。使用的是Node.js和WebSocket的 实时在线活动监视器例如:
http://blog.new-bamboo.co.uk/2009/12/7/real-time-online-activity-monitor-example-with-node-js-and-websocket

只是为了让这个完整的答案,我写给你这将0.8.0下运行的示例代码 - (HTTP服务器是一个黑客可能)。

孩子进程产生与尾运行,并且因为一个子进程是三流的EventEmitter(我们在我们的例子标准输出使用),你可以只用on

文件名添加了一个监听器:tailServer。JS

用法:node tailServer /var/log/filename.log

var http = require("http"); 
var filename = process.argv[2]; 


if (!filename) 
    return console.log("Usage: node tailServer filename"); 

var spawn = require('child_process').spawn; 
var tail = spawn('tail', ['-f', filename]); 

http.createServer(function (request, response) { 
    console.log('request starting...'); 

    response.writeHead(200, {'Content-Type': 'text/plain' }); 

    tail.stdout.on('data', function (data) { 
     response.write('' + data);     
    }); 
}).listen(8088); 

console.log('Server running at http://127.0.0.1:8088/'); 
+0

我与尾-f关注的是,它需要读取过程是活动的文件写入之前,如果不是数据丢失。我的用例是这样的,读取可能会在数据写入后很长时间发生。对于更新到0.8的+1,虽然这对于从同一个源控制写入和读取的位置来说是一个很好的解决方案。 –

+0

watchFile也是事件驱动的,但根据文档不稳定。上面的例子handels通过在高级代码中进行轮询来修改文件。对我来说,这看起来像一个黑客。但只要它对你有用,这是很好的做法。否则,如果文件不存在,则可以“触摸”该文件,并且不会丢失任何数据,并且可以用“wc -l message.text | awk'{print $ 1}''并将其交给'tail -f -n' – vik

0

我从@hasanyasin处取得了答案并将其包裹变成模块化的承诺。基本的想法是,你传递一个文件和一个处理函数来处理从文件读取的字符串化缓冲区。如果处理函数返回true,那么该文件将停止读取。你也可以设置一个超时,如果处理程序没有足够快地返回真,它将会终止读取。

如果resolve()由于超时被调用,promiser将返回true,否则它将返回false。

查看底部的使用示例。

// https://stackoverflow.com/a/11233045 

var fs = require('fs'); 
var Promise = require('promise'); 

class liveReaderPromiseMe { 
    constructor(file, buffStringHandler, opts) { 
     /* 
      var opts = { 
       starting_position: 0, 
       byte_size: 256, 
       check_for_bytes_every_ms: 3000, 
       no_handler_resolution_timeout_ms: null 
      }; 
     */ 

     if (file == null) { 
      throw new Error("file arg must be present"); 
     } else { 
      this.file = file; 
     } 

     if (buffStringHandler == null) { 
      throw new Error("buffStringHandler arg must be present"); 
     } else { 
      this.buffStringHandler = buffStringHandler; 
     } 

     if (opts == null) { 
      opts = {}; 
     } 

     if (opts.starting_position == null) { 
      this.current_position = 0; 
     } else { 
      this.current_position = opts.starting_position; 
     } 

     if (opts.byte_size == null) { 
      this.byte_size = 256; 
     } else { 
      this.byte_size = opts.byte_size; 
     } 

     if (opts.check_for_bytes_every_ms == null) { 
      this.check_for_bytes_every_ms = 3000; 
     } else { 
      this.check_for_bytes_every_ms = opts.check_for_bytes_every_ms; 
     } 

     if (opts.no_handler_resolution_timeout_ms == null) { 
      this.no_handler_resolution_timeout_ms = null; 
     } else { 
      this.no_handler_resolution_timeout_ms = opts.no_handler_resolution_timeout_ms; 
     } 
    } 


    startHandlerTimeout() { 
     if (this.no_handler_resolution_timeout_ms && (this._handlerTimer == null)) { 
      var that = this; 
      this._handlerTimer = setTimeout(
       function() { 
        that._is_handler_timed_out = true; 
       }, 
       this.no_handler_resolution_timeout_ms 
      ); 
     } 
    } 

    clearHandlerTimeout() { 
     if (this._handlerTimer != null) { 
      clearTimeout(this._handlerTimer); 
      this._handlerTimer = null; 
     } 
     this._is_handler_timed_out = false; 
    } 

    isHandlerTimedOut() { 
     return !!this._is_handler_timed_out; 
    } 


    fsReadCallback(err, bytecount, buff) { 
     try { 
      if (err) { 
       throw err; 
      } else { 
       this.current_position += bytecount; 
       var buff_str = buff.toString('utf-8', 0, bytecount); 

       var that = this; 

       Promise.resolve().then(function() { 
        return that.buffStringHandler(buff_str); 
       }).then(function(is_handler_resolved) { 
        if (is_handler_resolved) { 
         that.resolve(false); 
        } else { 
         process.nextTick(that.doReading.bind(that)); 
        } 
       }).catch(function(err) { 
        that.reject(err); 
       }); 
      } 
     } catch(err) { 
      this.reject(err); 
     } 
    } 

    fsRead(bytecount) { 
     fs.read(
      this.file, 
      new Buffer(bytecount), 
      0, 
      bytecount, 
      this.current_position, 
      this.fsReadCallback.bind(this) 
     ); 
    } 

    doReading() { 
     if (this.isHandlerTimedOut()) { 
      return this.resolve(true); 
     } 

     var max_next_bytes = fs.fstatSync(this.file).size - this.current_position; 
     if (max_next_bytes) { 
      this.fsRead((this.byte_size > max_next_bytes) ? max_next_bytes : this.byte_size); 
     } else { 
      setTimeout(this.doReading.bind(this), this.check_for_bytes_every_ms); 
     } 
    } 


    promiser() { 
     var that = this; 
     return new Promise(function(resolve, reject) { 
      that.resolve = resolve; 
      that.reject = reject; 
      that.doReading(); 
      that.startHandlerTimeout(); 
     }).then(function(was_resolved_by_timeout) { 
      that.clearHandlerTimeout(); 
      return was_resolved_by_timeout; 
     }); 
    } 
} 


module.exports = function(file, buffStringHandler, opts) { 
    try { 
     var live_reader = new liveReaderPromiseMe(file, buffStringHandler, opts); 
     return live_reader.promiser(); 
    } catch(err) { 
     return Promise.reject(err); 
    } 
}; 

然后用上面这样的代码:

var fs = require('fs'); 
var path = require('path'); 
var Promise = require('promise'); 
var liveReadAppendingFilePromiser = require('./path/to/liveReadAppendingFilePromiser'); 

var ending_str = '_THIS_IS_THE_END_'; 
var test_path = path.join('E:/tmp/test.txt'); 

var s_list = []; 
var buffStringHandler = function(s) { 
    s_list.push(s); 
    var tmp = s_list.join(''); 
    if (-1 !== tmp.indexOf(ending_str)) { 
     // if this return never occurs, then the file will be read until no_handler_resolution_timeout_ms 
     // by default, no_handler_resolution_timeout_ms is null, so read will continue forever until this function returns something that evaluates to true 
     return true; 
     // you can also return a promise: 
     // return Promise.resolve().then(function() { return true; }); 
    } 
}; 

var appender = fs.openSync(test_path, 'a'); 
try { 
    var reader = fs.openSync(test_path, 'r'); 
    try { 
     var options = { 
      starting_position: 0, 
      byte_size: 256, 
      check_for_bytes_every_ms: 3000, 
      no_handler_resolution_timeout_ms: 10000, 
     }; 

     liveReadAppendingFilePromiser(reader, buffStringHandler, options) 
     .then(function(did_reader_time_out) { 
      console.log('reader timed out: ', did_reader_time_out); 
      console.log(s_list.join('')); 
     }).catch(function(err) { 
      console.error('bad stuff: ', err); 
     }).then(function() { 
      fs.closeSync(appender); 
      fs.closeSync(reader); 
     }); 

     fs.write(appender, '\ncheck it out, I am a string'); 
     fs.write(appender, '\nwho killed kenny'); 
     //fs.write(appender, ending_str); 
    } catch(err) { 
     fs.closeSync(reader); 
     console.log('err1'); 
     throw err; 
    } 
} catch(err) { 
    fs.closeSync(appender); 
     console.log('err2'); 
    throw err; 
}