2017-08-16 50 views
0

我正在尝试读取并解析一个大的csv文件,并且对于每一行,我必须执行一些异步计算并在完成操作后增加计数器。所以我创建了一个Promise p并尝试链接很多.then(xxx),并在csv读取结束时它是最后的.then(yyy)以输出计数。承诺中的某些可执行文件未被执行

但是这个数字没有加起来。但如果我做p = p.then(xxx)p = p.then(yyy)这个数字会加起来(对于较小的csv文件),但有时我会面对内存泄漏(对于大型csv文件)。

有什么我做错了吗?

var fs = require('fs') 
const csv = require('fast-csv'); 
var Promise = require('bluebird') 
var count = 0; 
var actual = 0; 
let p = Promise.resolve(); 
const stream = fs.createReadStream(`/Users/ssmlee/Desktop/KingKong_Sims_5M.txt`); 
const csvStream = csv({ 
    delimiter: ';' 
}) 
.on('data', (row) => { 
    count++ 
    if (count % 10000 === 0) { 
     console.log(count) 
     console.log(process.memoryUsage().heapUsed) 
    } 
    p.then(() => { // instead if we do p = p.then(() => it will work correctly 
    return Promise.resolve().delay(5) 
    .then(function() { 
     actual++ 
    }) 
    }); 
}) 
.on('end',() => { 
    p.then(() => { // instead if we do p = p.then(() => it will work correctly 
    console.log(actual); // 4999977 or something like this 
    console.log(count); // 5000000 
    }); 
}); 
stream.pipe(csvStream); 
+1

你为什么认为这种方案会泄漏内存? “p = p.then(...)'没有任何问题。该结构本身不会导致内存泄漏。 – jfriend00

+0

我在做'process.memoryUsage()。heapUsed'来检查我的内存使用情况,结果我的内存没有被垃圾回收。你可能会生成一个5m行的随机文件,看到这种情况,我想不知道为什么。 –

+0

由于您选择实施此方法,因此您正在使用大量内存。您可以通过适当的排序来减少内存,而不是在同一时间将数十万个承诺放在飞行中。例如,您可以读取第一行,暂停CSV流,执行异步操作,然后完成释放CSV流。如果你真的想要并行执行一些操作来加速端到端的时间,但想要合理的内存使用,那么你需要一次保留一些适度的并行操作数(如10),而不是500,000。 – jfriend00

回答

1

如果增加actual计数和延迟,但从来没有等待的承诺(投掷then远的结果),流可能与并非所有的增量已经发生了结束。在你的例子中,23个回调仍然在等待5ms的延迟。顺便说一句,链接所有这些在相同的p = Promise.resolve()没有多大意义,你可以立即执行一切。

如果你正在做p = p.then(…)你建立一个非常长的诺言链。这不应该泄漏任何内存,但会占用大量内存 - 所有5ms延迟都会按顺序链接在一起,并且脚本将至少需要25000秒才能运行。该文件在开始时读入,生成数百万个承诺,然后依次解析(并且可以被垃圾收集)。

要执行此顺序方法执行,您可能应该使用流的背压系统。

但你也可以等待并行的延迟,同时具有不太多活着的承诺:

p = Promise.all([p, Promise.delay(5)]).then(() => { 
    actual++; 
}); 
+0

我的理解是,对于这两种情况,您都有一个很长的承诺链,承诺在所有5m承诺解决后都会解决。在第一种情况下,'.then()'内的'('end')'是最后一个链接的承诺。这不是两回事吗? –

+1

@ Shih-MinLee不,如果你没有改变'p',那么所有的回调函数都附加到相同的承诺中('Promise.resolve()')。这不是一条长链,而是一棵树。所有回调将立即运行(只要解析了Promise.resolve()),*不等待其他*。 – Bergi

+0

Gotcha。谢谢。 –

1

好你想要的承诺并行运行,所以不能把它们连。

allp = []; 
.... 
.on('data', (row) => { 
    ... 
    allp.push(p.then(() => {...})); 
} 
... 
.on('end',() => { 
Promise.all(allp).then(() => {}) 

当然,您正在为每个事件创建一个Promise。

如果你需要在结束之前释放承诺,那么你需要自己做。

既然你似乎不感兴趣的承诺的返回值,但只有在它们的副作用(增加计数),你可以做

.on('data', (row) => { 
    ... 
    if (allp.length > 50) allp = [Promise.all(allp).then(()=>null)]; 
    allp.push(p.then(() => {...})); 
} 

这样的50个承诺将进行分组,并且一旦他们解决,他们被单个承诺取代(这将进入下一个50 ...)

.then(()=>null)确保Promise.all的结果数组也被丢弃。 (而不是一个承诺,为空将在allp)

这不取决于Promise.all的实施。如果Promise.all解决了每个承诺(并且结果可用),那么这是完美的。

如果Promise.all等待所有50个承诺,然后释放他们,那么这仍然有效,除非每个组50具有一个非常长时间运行的承诺。


您可以使用延期承诺的反模式。

在开始时创建一个延期承诺。

var resolve; 
var asyncRunningCount = 1; // start with 1 
var p2 = new Promise(function() { 
    resolve = arguments[0]; 
}); 

在上数据

.on('data', (row) => { 
    ... 
    asyncRunningCount++; 
    p.then(() => {work})) 
    .then(() { 
     asyncRunningCount--; 
     if (asyncRunningCount == 0) resolve(); // no more tasks running 
    }); 
} 

.on('end',() => { 
    asyncRunningCount--; 
    // remove the 1 that was set on start. No more new tasks will be added 
    if (asyncRunningCount == 0) resolve(); // no more tasks running 
    p2.then(() => { all done }) 

在启动时的值1防止得到解决P2,如果正在运行的任务的计数暂时下降到0。

在上(端部)的1递减。如果所有任务完成,asyncRunningCount将为0.这可以通过on(end)中的递减或on(data)中的递减来实现。

p2.then,将在所有任务完成时运行。

所有其他承诺完成后将被释放。事实上在(数据)你不需要承诺。刚开始你的任务异步,而当异步任务完成递减asyncRunningCount,以及0


检查,如果数据来自于非常快,是许多承诺的并行启动这仍然意味着。 但是,如果你没有开始承诺,那么你需要存储传入的数据,所以内存将被用于任何方式。

+0

是的,两种解决方案都可以工作,但为什么这么复杂?在第一种方法中 - 为什么等待50个承诺积累在数组中,而不是每次都执行'Promise.all'?在第二种方法中 - 没有理由使用延迟反模式,只需将整个事件发射器事件移到'new Promise'回调中。 (当然,即使那样,它仍然是重塑'Promise.all'反模式:D) – Bergi

+0

如果你为* every *事件做了Promise.all(除了第一个可能,因为那时只有一个),那么你创建两次(一次不到两次)的许诺,就像你没有使用Promise.all一样。是的,只要他们完成,他们都会被释放,所以记忆不是问题。但执行时间可能成为一个问题... – Martin

+0

只要它具有线性时间和内存的复杂性,我通常不会在乎:-)执行时间受到异步工作的限制(OP的示例中的'delay(5)'),额外的承诺并不重要。 – Bergi