2016-06-28 24 views
0

我正在编写一个批处理过程来构建一些日志类型条目。它将是我的API中的一个单独的应用程序,它只会执行这一项任务。现在,我有一个Express API和一个logMsg传递到的端点。这些来自许多用户,每个用户每10秒钟发送一次。为了让事情继续下去,我对logMsg做了几个简单的操作,并将其粘贴到我的Postgres数据库中。在knex查询生成器结果上同步处理

现在我想遍历这些消息来构建一些logEntries,以便我可以更轻松地计算活动期间的时间。每个logMsg进来后,都有一个活动(或缺少一个活动),一个用户和一个时间戳。 LogEntry具有用户,活动,startTime和endTime。通过这种方式,API应用程序可以更轻松地计算用户/活动为一组用户所做的/不做活动的时间。

所以我想设置一个批处理作业,抓取n个未处理的点,将它们标记为处理中,然后处理它们,然后将它们标记为完整。我遇到麻烦的地方是保持秩序。

let activityLogJob = new CronJob('30 * * * * *', function() { 

    return knex 
    .raw() //grab a chunk of rows and mark as In Progress 
    .then((processingRows) => { 
     _.forEach(logMsgs, (msg) => { 
      console.log('message id:', msg.id); 
      return knex.select() //latest log entry for msg.user 
       .then((logEntry) => { 
        if (logEntry) { 
         if (logEntry.activity_id !== msg.activity_id) { 
          console.log('new activity started. end old one'); 
          return knex.transaction((trx) => { 
           trx 
           .update() //logEntry.endTime = msg.startTime 
           .then((update) => { 
            trx 
            .insert(); //create new logEntry for new activity 
           }); 
          }); 
         } 
        } else { 
         console.log('first log entry for user'); 
         knex 
         .insert(); //create new logEntry since we don't have one for the user 
        } 
       }) 
       .then((result) => { 
        console.log('finished msg :', msg.id); 
        knex 
        .update(); //set msg.status = LOGGED 
       }) 
       .catch((err) => { 
        logger.error("ERROR: ", err); 
       }); 
     });  
    }); 

}, null, true); 

所以我看到上面的代码是它抓住了所有的行很好,但它实际上处理它们时变得很奇怪。日志语句显示如下:

message 1 
first log entry for user 
message 2 
first log entry for user 
... 
message n 
first log entry for user 

finished message 1 
finished message 2 
... 
finished message n 

所以我认为这里有两个问题。
1.它永远不会找到现有的LogEntry,并且每次都会创建一个新的LogEntry。这感觉就像在处理所有消息之后才发生插入。
2.我本来期望的日志报表更像

message 1 
first entry for user 
finished message 1 
message 2 
first entry for user 
finished message 2 
... 

我敢肯定我的问题是,我仍然无法正常包裹我的头周围的承诺。

回答

0

您需要返回嵌套在另一个承诺的回调中的每个承诺。如果你不这样做,那么嵌套诺言的结果将不会通过链条。这应该可以帮助你的信息同步。

return knex 
.raw() //grab a chunk of rows and mark as In Progress 
.then((processingRows) => { 
    _.forEach(logMsgs, (msg) => { 
     console.log('message id:', msg.id); 
     return knex.select() //latest log entry for msg.user 
      .then((logEntry) => { 
       if (logEntry) { 
        if (logEntry.activity_id !== msg.activity_id) { 
         console.log('new activity started. end old one'); 
         return knex.transaction((trx) => { 
          return trx ##### 
          .update() //logEntry.endTime = msg.startTime 
          .then((update) => { 
           return trx ##### 
           .insert(); //create new logEntry for new activity 
          }); 
         }); 
        } 
       } else { 
        console.log('first log entry for user'); 
        return knex ##### 
        .insert(); //create new logEntry since we don't have one for the user 
       } 
      }) 
      .then((result) => { 
       console.log('finished msg :', msg.id); 
       return knex ##### 
       .update(); //set msg.status = LOGGED 
      }) 
      .catch((err) => { 
       logger.error("ERROR: ", err); 
      }); 
    });  
});