2016-01-08 53 views
0

在我的node.js应用程序中,我从AWS Kinesis流中读取消息,并且需要将最后一分钟的所有消息存储在缓存(Redis)中。我在一个节点上运行工人下面的代码:在node.js中并发写入到redis

var loopCallback = function(record) { 
    var nowMinute = moment.utc(record.Data.ts).minute(); 
    //get all cached kinesis records 
    var key = "kinesis"; 
    cache.get(key,function (err, cachedData) { 
     if (err) { 
      utils.logError(err); 
     } else { 

      if(!cachedData) { 
       cachedData = []; 
      } else { 
       cachedData = JSON.parse(cachedData); 
      } 

      //get records with the same minute 
      var filtered = _.filter(cachedData, function (item) { 
       return moment.utc(item.ts).minute() === nowMinute; 
      }); 

      filtered.push(record.Data); 

      cache.set(key, JSON.stringify(filtered), function (saveErr) { 
       if (saveErr) { 
        utils.logError(saveErr); 
       } 

       //do other things with record; 
      }); 
     } 
    }); 
}; 

大多数的记录(数十个)我收到完全相同的同一时刻。所以当我尝试保存它时,一些记录不会被存储。 我明白这是由于竞赛条件而发生的。 节点读取来自Redis的数组版本old,并在将另一个记录写入缓存时覆盖阵列。 我已经阅读了关于redis交易的信息,但据我了解,它不会帮助我,因为只有一个交易将完成,而其他交易将被拒绝。 在我的情况下,有办法将所有记录保存到缓存中? 谢谢

+0

你为什么要存储元素作为Redis的字符串值(键= JSON.stringify(.. ))。例如,您应该使用RPUSH将它们存储在重新列出的列表中。这将绕过你的并发问题。 –

+0

谢谢。但我也需要删除前一分钟的数据,所以它意味着三个命令:LRANGE/DEL/RPUSH? – Ilya

+0

如果你说“只保留N个元素”而不是“最后一分钟”,会更容易。环顾这个工具应该给你想法https://github.com/tj/node-redis-histogram –

回答

0

你可以使用一个有序集合,比分是Unix时间戳 ZADD kinesis <unixtimestamp> "some data to be cached"

要获得元素的加入不到一分钟前,创建(现在 - 60秒)的时间戳,然后使用ZRANGEBYSCORE得到最古老的元素第一: ZRANGEBYSCORE myzset -inf (timestamp

ZREVRANGEBYSCORE如果你想要最新的元素第一: ZRANGEBYSCORE myzset -inf (timestamp

要删除的元素年长超过一分钟,创建(现在 - 60秒)时间戳然后用ZREMRANGEBYSCORE ZREMRANGEBYSCORE myzset -inf (timestamp