2014-07-04 80 views
2

我目前正在使用MongoDB并密切监视Nodejs服务器的数据流的Nodejs项目。Nodejs异步上升问题

我的节点服务器的代码都如下:

Receive.js
1.节点服务器接收到一个JSON文本文件
2.节点服务器通知的MongoDB(UPSERT),我们收到了文件

Process.js
3.节点服务器upserts JSON文件的MongoDB
4.节点,服务器会告诉我们的MongoDB处理所述文件。

的问题是,有时#4发生之前#2尽管#1之前#3总是存在的。我的监视器程序开始显示处理的文件比收到的文件多。有没有办法解决这个问题,而不使这台服务器完全同步?

例如,如果我把我的节点服务器500 JSON文本文件的最后时间节点执行#2应该是之前的最后一次#4得到执行。

**注:App.js电话从Receive.js .receive和Receive.js调用.save从Process.js **

app.js

http.createServer(app).listen(app.get('port'), function(){ 
    mongodb.createConnection(function(db){ 
    if(db){ 
     console.log('success connecting to mongodb!'); 
    } 
    }); 
}); 
app.post('/log', logCollection.receive); 

-
Receive.js

exports.receive = function(req, res, next){ 
    var usagelog = req.body.log; 

    if(!usagelog){ 
    log4js.logger.error("Usagelog is Null!"); 
    res.end("fail"); 
    }else{  
    count++; 

    //Upsert the Receive Count for Monitor 
     mongodb.getConnection(function(db){ 

      dateFormat.initDate(); 
      //Upsert trip (usage logs) 
      var currentDateTime = moment().format('YYYY-MM-DD hh:mm:ss'); 
      db.collection("ReceiveCount").update({"date":dateFormat.currentDate(), "pid":process.pid }, 
        {"date":dateFormat.currentDate(), "IPAddress": ip.address(), "pid":process.pid, "countReceive" : count, "LastReceivedTime": currentDateTime}, 
        {upsert:true}, function(err, result) {    
      }); 
      }); 

    //End Receive count 
    var usagelogJSON = convertToJson(usagelog); 
    var usagelogWithCurrentDate = addUpdateTime(usagelogJSON); 
    usagelogDao.save(usagelogWithCurrentDate, res); 
    } 
}; 

-

Process.js

exports.save = function(usagelog, res){ 
    var selector = upsertCondition(usagelog); 

    mongodb.getConnection(function(db){ 
    //Upsert trip (usage logs) 
    db.collection(collectionName()).update(selector, usagelog, {upsert:true, fullResult:true}, function(err, result) { 
     if(err){ 
     log4js.logger.error(err); 
     res.end("fail"); 
     }else{ 

     if(result.nModified == 0) 
      countInsert++; 
     else 
      countUpdate++; 

     //Upsert Processed Count (both Updated and Inserted  
     db.collection("ReceiveCount").find({"date":dateFormat.currentDate(), "pid":process.pid },{}).toArray(function (err, docs) { 

      var receivecount = docs[0].countReceive; 
      var receivetime = docs[0].LastReceivedTime; 
      var currentDateTime = moment().format('YYYY-MM-DD hh:mm:ss'); 
      var MongoCount=0; 

      db.collection("raw_"+dateFormat.currentDate()).count(function(err, count){ 
       console.log("raw_"+dateFormat.currentDate()); 

       MongoCount = count; 
       console.log("Mongo count is :" +MongoCount); 
      }); 

      db.collection("ReceiveCount").update({"date":dateFormat.currentDate(), "pid":process.pid }, 
       {"date":dateFormat.currentDate(), "IPAddress": ip.address(), "pid":process.pid, "countUpdate" : countUpdate, "countInsert":countInsert, "countTotal":countUpdate+countInsert, "LastProcessTime": currentDateTime, 
       "countReceive":receivecount, "LastReceivedTime":receivetime, "MongoCount":MongoCount}, {upsert:true}, function(err, result) {  
      }); 
     }); 
     res.end("success"); 
     } 
    }); 
    }); 
}; 
+1

尽管这里有上调,但在SO条款中并没有一个非常明确的问题。有很多方法可以通过异步处理和库来解决这个问题。但是我们需要看到一些代码是可以回答和解决的问题。显示你的代码,以便我们可以看到你的错在哪里。 Upvotes!=答案。 –

+0

更好的更新,但我认为你在如何调用你的出口“接收”和“保存”,这是你的问题更重要的部分在这里的背景。 –

回答

0

可以使用异步模块(https://github.com/caolan/async)。下面是伪代码:

async.parallel([ 
    fun1(cb) { 
    tellDBYouReceivedFile(..., onFinished() { 
     cb(); 
    }); 
    }, 
    fun2(cb) { 
    saveYourFileToDB(..., onFinished() { 
     cb(); 
    }); 
    }, 
], function() { 
    tellDBYouProcessedFile(); 
}); 

你所想达到不能完全异步的,因为最后一步(#4)必须等待#2,#3首先要完成。上面的例子可以使#2和#3异步,并且保证#4最后被执行。