我正在编写一个批处理过程来构建一些日志类型条目。它将是我的API中的一个单独的应用程序,它只会执行这一项任务。现在,我有一个Express API和一个logMsg传递到的端点。这些来自许多用户,每个用户每10秒钟发送一次。为了让事情继续下去,我对logMsg做了几个简单的操作,并将其粘贴到我的Postgres数据库中。在knex查询生成器结果上同步处理
现在我想遍历这些消息来构建一些logEntries,以便我可以更轻松地计算活动期间的时间。每个logMsg进来后,都有一个活动(或缺少一个活动),一个用户和一个时间戳。 LogEntry具有用户,活动,startTime和endTime。通过这种方式,API应用程序可以更轻松地计算用户/活动为一组用户所做的/不做活动的时间。
所以我想设置一个批处理作业,抓取n个未处理的点,将它们标记为处理中,然后处理它们,然后将它们标记为完整。我遇到麻烦的地方是保持秩序。
let activityLogJob = new CronJob('30 * * * * *', function() {
return knex
.raw() //grab a chunk of rows and mark as In Progress
.then((processingRows) => {
_.forEach(logMsgs, (msg) => {
console.log('message id:', msg.id);
return knex.select() //latest log entry for msg.user
.then((logEntry) => {
if (logEntry) {
if (logEntry.activity_id !== msg.activity_id) {
console.log('new activity started. end old one');
return knex.transaction((trx) => {
trx
.update() //logEntry.endTime = msg.startTime
.then((update) => {
trx
.insert(); //create new logEntry for new activity
});
});
}
} else {
console.log('first log entry for user');
knex
.insert(); //create new logEntry since we don't have one for the user
}
})
.then((result) => {
console.log('finished msg :', msg.id);
knex
.update(); //set msg.status = LOGGED
})
.catch((err) => {
logger.error("ERROR: ", err);
});
});
});
}, null, true);
所以我看到上面的代码是它抓住了所有的行很好,但它实际上处理它们时变得很奇怪。日志语句显示如下:
message 1
first log entry for user
message 2
first log entry for user
...
message n
first log entry for user
finished message 1
finished message 2
...
finished message n
所以我认为这里有两个问题。
1.它永远不会找到现有的LogEntry,并且每次都会创建一个新的LogEntry。这感觉就像在处理所有消息之后才发生插入。
2.我本来期望的日志报表更像
message 1
first entry for user
finished message 1
message 2
first entry for user
finished message 2
...
我敢肯定我的问题是,我仍然无法正常包裹我的头周围的承诺。