2013-06-19 39 views
0

我试图设置一个简单的文档进纸器来对ElasticSearch进行基准测试,并且我选择了NodeJS,因为我认为使用简单的JSON结构是最简单的。不幸的是,我似乎在向自己射击。构造NodeJS异步代码,以提高内存效率

下面是相关位:

var logResults = function (results) { 
    docsIndexed++; 
    var now = +new Date(); 
    if (docsIndexed % 10000 === 0) { 
     log("Indexed " + docsIndexed + " documents in " + (now - start) + "ms"); 
    } 
    } 

    var submitDocument = function (source, index, type) { 
    var doc = ejs.Document(index, type).source(source); 
    doc.doIndex(logResults); 
    } 

    var schemas = {}; 
    _(10).times(function (idx) { 
    schemas[pickRandomWord()] = generateRandomDocumentSchema(_.random(idx, 15), 10); 
    }); 

    var docCount = 0, docsIndexed = 0, start = +new Date(); 
    Object.keys(schemas).forEach(function (schemaName, idx) { 
    var register = function() { 
     submitDocument(generateRandomDocument(schemas[schemaName]), 
     'documents', schemaName); 
     docCount++; 
    }; 
    _((idx + 1) * 1000).times(register); 
    log("Registered " + ((idx + 1) * 1000) + " documents for indexing for schema " 
     + schemaName + ". Total: " + docCount); 
    }); 

这工作正常的高达10万条记录的数据集,但如果我要对数百万在我身上吹了一个内存不足的错误。

中的doIndex函数是异步的,我怀疑许多对象在实际执行之前正在排队。当这个数字变得重要时,这个过程就会消失。我不明白为什么在循环结束之前没有执行回调。我想要的是一种使其同步的方法,或者为其设置某种类型的池,以便在发送其他对象之前不排队更多的对象。

有人可以请建议一个图书馆,可以帮助这个或更好的方式来构建代码?谢谢。


更新

我已经试过彼得的建议,使用async.queue,我想出了这一点:

/** Submit QUANT * (idx + 1) documents for each schema into the index */ 
    var QUANT = 100 
    , docCount = 0 
    , docsIndexed = 0 
    , queue = async.queue(submitDocument, 1000) 
    , paused = false 
    , start = +new Date(); 

    queue.saturated = function() { 
    log("queue is full"); 
    paused = true; 
    }; 
    queue.empty = function() { 
    log("All items were given to workers"); 
    paused = false; 
    }; 

    Object.keys(schemas).forEach(function (schemaName, idx) { 
    var count = 0; 
    while (count < (idx + 1) * QUANT) { 
     if (!paused) { 
     queue.push({ 
      source: generateRandomDocument(schemas[schemaName]), 
      index: 'documents', 
      type: schemaName 
     }); 
     count++; docCount++; 
     } 
    }; 
    log("Registered " + count + " documents for indexing for schema " 
     + schemaName + ". Total: " + docCount); 
    }); 

如果它得到循环暂停,它挂起永远(即调用queue.saturated,暂停设置为true,然后程序停留在while循环中)。 queue.empty callback永远不会被调用。如果我的队列并发限制高于我想要处理的数字,则这可以正常工作 - 所有消息都按预期记录。我应该在这里改变什么?

更新#2

我已经改变了使用异步循环的代码,现在它的工作原理。我得到了一个RangeError: Maximum call stack size exceeded错误,我一直在努力。

Object.keys(schemas).forEach(function (schemaName, idx) { 
    var count = 0, executions = 0; 
    async.whilst(
     function() { 
     var test = count < (idx + 1) * QUANT; 
     if (!test) log("Registered " + count + " documents for indexing for schema " 
      + schemaName + ". Executions: " + executions + ". Total: " + docCount); 
     return test; 
     }, 

     function (callback) { 
     executions++; 
     if (!paused) { 
      queue.push({ source: generateRandomDocument(schemas[schemaName]), index: 'documents', type: schemaName }); 
      count++; docCount++; 
     } 
     setTimeout(callback, 0); 
     // also tried with "return process.nextTick(callback)" 
     // and return callback(); 
     // this blows up nicely with an out of memory error 
     }, 
     function (err) {} 
    ); 
    }); 

我开始感到沮丧,因为我不认为这个用例真的很复杂,我希望我对语言的工作原理有一个公正的理解。

回答

1

最好的选择是async.queue有一个很大的限制。只要确保你不只是在队列已经饱和之后就将其添加到队列中。使用队列的饱和度作为背压,等待一些工作完成,然后开始排队更多任务。该队列具有挂钩以支持这些关键事件。

+0

谢谢,我会试试看。 –