在我的node.js应用程序中,我从AWS Kinesis流中读取消息,并且需要将最后一分钟的所有消息存储在缓存(Redis)中。我在一个节点上运行工人下面的代码:在node.js中并发写入到redis
var loopCallback = function(record) {
var nowMinute = moment.utc(record.Data.ts).minute();
//get all cached kinesis records
var key = "kinesis";
cache.get(key,function (err, cachedData) {
if (err) {
utils.logError(err);
} else {
if(!cachedData) {
cachedData = [];
} else {
cachedData = JSON.parse(cachedData);
}
//get records with the same minute
var filtered = _.filter(cachedData, function (item) {
return moment.utc(item.ts).minute() === nowMinute;
});
filtered.push(record.Data);
cache.set(key, JSON.stringify(filtered), function (saveErr) {
if (saveErr) {
utils.logError(saveErr);
}
//do other things with record;
});
}
});
};
大多数的记录(数十个)我收到完全相同的同一时刻。所以当我尝试保存它时,一些记录不会被存储。 我明白这是由于竞赛条件而发生的。 节点读取来自Redis的数组版本old
,并在将另一个记录写入缓存时覆盖阵列。 我已经阅读了关于redis交易的信息,但据我了解,它不会帮助我,因为只有一个交易将完成,而其他交易将被拒绝。 在我的情况下,有办法将所有记录保存到缓存中? 谢谢
你为什么要存储元素作为Redis的字符串值(键= JSON.stringify(.. ))。例如,您应该使用RPUSH将它们存储在重新列出的列表中。这将绕过你的并发问题。 –
谢谢。但我也需要删除前一分钟的数据,所以它意味着三个命令:LRANGE/DEL/RPUSH? – Ilya
如果你说“只保留N个元素”而不是“最后一分钟”,会更容易。环顾这个工具应该给你想法https://github.com/tj/node-redis-histogram –