我试图设置一个简单的文档进纸器来对ElasticSearch进行基准测试,并且我选择了NodeJS,因为我认为使用简单的JSON结构是最简单的。不幸的是,我似乎在向自己射击。构造NodeJS异步代码,以提高内存效率
下面是相关位:
var logResults = function (results) {
docsIndexed++;
var now = +new Date();
if (docsIndexed % 10000 === 0) {
log("Indexed " + docsIndexed + " documents in " + (now - start) + "ms");
}
}
var submitDocument = function (source, index, type) {
var doc = ejs.Document(index, type).source(source);
doc.doIndex(logResults);
}
var schemas = {};
_(10).times(function (idx) {
schemas[pickRandomWord()] = generateRandomDocumentSchema(_.random(idx, 15), 10);
});
var docCount = 0, docsIndexed = 0, start = +new Date();
Object.keys(schemas).forEach(function (schemaName, idx) {
var register = function() {
submitDocument(generateRandomDocument(schemas[schemaName]),
'documents', schemaName);
docCount++;
};
_((idx + 1) * 1000).times(register);
log("Registered " + ((idx + 1) * 1000) + " documents for indexing for schema "
+ schemaName + ". Total: " + docCount);
});
这工作正常的高达10万条记录的数据集,但如果我要对数百万在我身上吹了一个内存不足的错误。
中的doIndex
函数是异步的,我怀疑许多对象在实际执行之前正在排队。当这个数字变得重要时,这个过程就会消失。我不明白为什么在循环结束之前没有执行回调。我想要的是一种使其同步的方法,或者为其设置某种类型的池,以便在发送其他对象之前不排队更多的对象。
有人可以请建议一个图书馆,可以帮助这个或更好的方式来构建代码?谢谢。
更新
我已经试过彼得的建议,使用async.queue,我想出了这一点:
/** Submit QUANT * (idx + 1) documents for each schema into the index */
var QUANT = 100
, docCount = 0
, docsIndexed = 0
, queue = async.queue(submitDocument, 1000)
, paused = false
, start = +new Date();
queue.saturated = function() {
log("queue is full");
paused = true;
};
queue.empty = function() {
log("All items were given to workers");
paused = false;
};
Object.keys(schemas).forEach(function (schemaName, idx) {
var count = 0;
while (count < (idx + 1) * QUANT) {
if (!paused) {
queue.push({
source: generateRandomDocument(schemas[schemaName]),
index: 'documents',
type: schemaName
});
count++; docCount++;
}
};
log("Registered " + count + " documents for indexing for schema "
+ schemaName + ". Total: " + docCount);
});
如果它得到循环暂停,它挂起永远(即调用queue.saturated
,暂停设置为true,然后程序停留在while循环中)。 queue.empty callback
永远不会被调用。如果我的队列并发限制高于我想要处理的数字,则这可以正常工作 - 所有消息都按预期记录。我应该在这里改变什么?
更新#2
我已经改变了使用异步循环的代码,现在它的工作原理。我得到了一个RangeError: Maximum call stack size exceeded
错误,我一直在努力。
Object.keys(schemas).forEach(function (schemaName, idx) {
var count = 0, executions = 0;
async.whilst(
function() {
var test = count < (idx + 1) * QUANT;
if (!test) log("Registered " + count + " documents for indexing for schema "
+ schemaName + ". Executions: " + executions + ". Total: " + docCount);
return test;
},
function (callback) {
executions++;
if (!paused) {
queue.push({ source: generateRandomDocument(schemas[schemaName]), index: 'documents', type: schemaName });
count++; docCount++;
}
setTimeout(callback, 0);
// also tried with "return process.nextTick(callback)"
// and return callback();
// this blows up nicely with an out of memory error
},
function (err) {}
);
});
我开始感到沮丧,因为我不认为这个用例真的很复杂,我希望我对语言的工作原理有一个公正的理解。
谢谢,我会试试看。 –