2015-11-09 45 views
3

我正在尝试使用async.queue进行一系列调用,并且每次调用都有自己的回调,我正在使用mocha来测试返回预期结果。如何避免使用async.queue“回调已被调用”?

当我使用并发值1(我的NUMBER_OF_THREADS变量)时,这一切都很好。但是,当我使用大于1的任何值时,我会收到错误消息,指出“错误:回调已被调用。”例如,如果我发送了10封邮件,并将NUMBER_OF_THREADS设置为5,则前5封邮件将顺利执行,但随后我开始在邮件6或7(见下文)周围看到重复回拨错误。你知道我怎么能避免这个错误?

我的测试文件(其中,异步队列定义):

var myQueue = async.queue(function(options, callback){ 
    var counter = options.counter; 

    myService.sendMyMessage(options.text, counter, function(result) { 
     var myText = result.content.text; 
     console.log("Response " + myService.counter + ": " + myText); 

     responses.push(myText); 
     callback(); 
    }); 
}, NUMBER_OF_THREADS); 

myQueue.drain = function(){ 
    console.log("sends completed"); 
    for (var i = 0; i < numberOfSends; i++) { 
     assert.equal(myExpectedResponse,responses[i],"text doesn't match"); 
    } 
    done(); 
}; 

for (var j = 1; j <= numberOfSends; j++) { 
    var options = { 
     counter: j, 
     text: "Hello_" + j 
    }; 
    myQueue.push(options); 
} 

我的服务文件(其中发送和响应发生):

myService.callback = function() {}; 

myService.sendMyMessage = function(message, counter, myCallback) { 
    console.log("Sending message " + counter + ": " + message); 
    var myContent = JSON.stringify(myModel.makeMessage(message)); 

    myModel.post(content) 
     .then(function(res) { 
      myService.callback = myCallback; 
     }); 
}; 

myService.otherService = function(done) { 
    app = express(); 
    app.use(express.bodyParser()); 

app.post('/myRoute/events', function(req, res, next) { 
    var response = { 
     "myId": "1234567890", 
     "myVersion": 1 
    }; 

    res.set('Content-Type', 'application/json;charset=UTF-8'); 
    res.send(JSON.stringify(response)); 
    if (myService.callback) 
    { 
     myService.counter ++; 

     myService.callback(req.body); 
     //myService.callback = null; 
    } 
    else 
    { 
     console.log('the callback is NULL'); 
    } 
}); 

我的成绩在控制台:

Sending message 1: Hello_1 
Sending message 2: Hello_2 
Sending message 3: Hello_3 
Sending message 4: Hello_4 
Sending message 5: Hello_5 
Response 1: myResponse 
Sending message 6: Hello_6 
Response 2: myResponse 
Sending message 7: Hello_7 
Response 3: myResponse 
Sending message 8: Hello_8 
Response 4: myResponse 
Sending message 9: Hello_9 
Response 5: myResponse 
Sending message 10: Hello_10 
Response 6: myResponse 
Response 7: myResponse 
Error: Callback was already called. 
    at myFile.js:12:34 

如果我对myService.callback = null行取消注释,则最后一批的第一次发送会导致myService.ca反馈太早。例如,如果我发送10个NUMBER_OF_THREADS = 5的请求,则1到5的请求将会很好。但是,一旦我发送请求1到10,请求#10将过早地使myService.callback无效。响应示例:

Sending message 1: Hello_1 
Sending message 2: Hello_2 
Sending message 3: Hello_3 
Sending message 4: Hello_4 
Sending message 5: Hello_5 
Response 1: myResponse 
Sending message 6: Hello_6 
Response 2: myResponse 
Sending message 7: Hello_7 
Response 3: myResponse 
Sending message 8: Hello_8 
Response 4: myResponse 
Sending message 9: Hello_9 
Response 5: myResponse 
Sending message 10: Hello_10 
Response 6: myResponse 
the callback is NULL 
the callback is NULL 
the callback is NULL 
the callback is NULL 

回答

1

我已经修复了这个问题。

在我的测试文件中,我现在简单地调用sendMyMessage;我不再期待sendMyMessage的回调。在流失中,我会等待响应的总数被击中,然后循环处理这些响应。

var myQueue = async.queue(function(options, callback){ 
    var counter = options.counter; 

    myService.sendMyMessage(options.text, counter); 
    callback(); 
}, NUMBER_OF_THREADS); 

myQueue.drain = function(){ 
    var myInterval = setInterval(function() { 

    if (myService.responseCounter == myNumberOfMessages) { 

     clearInterval(myInterval); 
     for (var i = 0; i < myNumberOfMessages; i++) { 
     assert.equal(myExpectedResponse,myService.responses[i],"error"); 
     } 
     done(); 
    } 
    }, 5000); 

}; 

for (var j = 1; j <= myNumberOfMessages; j++) { 
    var options = { 
    counter: j, 
    text: "Hello" 
    }; 

    myQueue.push(options); 
} 

然后,在我的服务文件,我现在使用回调的阵列;我不再依赖单个回调进行设置或取消。每次调用myModel.post都会在此回调数组中定义一个新元素。

myService.sendLineMessage = function(message, counter) { 

myModel.post(content, sign, config, request, log, run) 
    .then(function(res) { 
     myService.callbacks[counter] = function(result) { 
      var myText = result.content.text; 
      myService.responses.push(resultText); 
     }; 
    }); 
}; 

myService.otherService = function(done) { 
    app = express(); 
    app.use(express.bodyParser()); 

    app.post('/myRoute/events', function(req, res, next) { 

    var response = { 
     "myId": "1234567890", 
     "myVersion": 1 
    }; 

    res.set('Content-Type', 'application/json;charset=UTF-8'); 
    res.send(JSON.stringify(response)); 

    myService.responseCounter++; 

    if (myService.callbacks[myService.responseCounter]) 
    { 
     myService.callbacks[myService.responseCounter](req.body); 
     myService.callbacks[myService.responseCounter] = null; 
    } 
    else 
    { 
     console.log('the callback is NULL'); 
    } 
});