2010-11-11 200 views
72

我想在python中创建一个高效的circular buffer(目标是获取缓冲区中整数值的平均值)。高效循环缓冲区?

这是使用列表来收集值的有效方法吗?

def add_to_buffer(self, num): 
    self.mylist.pop(0) 
    self.mylist.append(num) 

什么会更有效率(以及为什么)?

回答

137

我会用collections.dequemaxlen ARG

>>> import collections 
>>> d = collections.deque(maxlen=10) 
>>> d 
deque([], maxlen=10) 
>>> for i in xrange(20): 
...  d.append(i) 
... 
>>> d 
deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10) 

有一个在文档的deque类似于你想要什么recipe。我认为最有效率的说法完全取决于它是由一个令人难以置信的熟练的工作人员以C语言实现的,这个工作人员习惯于开发出顶尖的代码。

+5

+1是的,这是包括很好的电池方式。循环缓冲区的操作是O(1),正如你所说的额外的开销是在C中,所以应该仍然是相当快的 – 2010-11-11 09:38:57

+1

我不喜欢这个解决方案,因为文档不保证O(1) maxlen'被定义。当'deque'可以增长到无限时,O(n)是可以理解的,但是如果给出'maxlen',索引一个元素应该是不变的时间。 – lvella 2015-11-13 21:57:38

+0

我的猜测是它的实现是一个链表而不是一个数组。 – 2017-01-24 14:56:51

9

从列表的头部弹出导致要复制的整个列表,因此是低效

您应该使用固定的大小,并且通过在添加缓冲移动的索引的列表/阵列/删除项目

+2

同意。不管它看起来多么优雅或不雅,或者使用什么语言。实际上,你对垃圾收集器(或堆管理器或分页/映射机制或其他任何实际内存魔法)的影响越小越好。 – 2010-11-11 04:56:58

+0

@RocketSurgeon这不是魔术,它只是它的第一个元素被删除的数组。因此,对于大小为n的数组,这意味着n-1个复制操作。这里不涉及垃圾收集器或类似的设备。 – Christian 2012-09-26 11:49:59

+2

我同意。这样做也比一些人想象的容易。只需使用不断增加的计数器,并在访问该项目时使用模运算符(%arraylen)。 – 2012-12-06 17:41:44

5

确定与使用双端队列类的,但对于问题(平均)的requeriments这是我的解决方案:

>>> from collections import deque 
>>> class CircularBuffer(deque): 
...  def __init__(self, size=0): 
...    super(CircularBuffer, self).__init__(maxlen=size) 
...  @property 
...  def average(self): # TODO: Make type check for integer or floats 
...    return sum(self)/len(self) 
... 
>>> 
>>> cb = CircularBuffer(size=10) 
>>> for i in range(20): 
...  cb.append(i) 
...  print "@%s, Average: %s" % (cb, cb.average) 
... 
@deque([0], maxlen=10), Average: 0 
@deque([0, 1], maxlen=10), Average: 0 
@deque([0, 1, 2], maxlen=10), Average: 1 
@deque([0, 1, 2, 3], maxlen=10), Average: 1 
@deque([0, 1, 2, 3, 4], maxlen=10), Average: 2 
@deque([0, 1, 2, 3, 4, 5], maxlen=10), Average: 2 
@deque([0, 1, 2, 3, 4, 5, 6], maxlen=10), Average: 3 
@deque([0, 1, 2, 3, 4, 5, 6, 7], maxlen=10), Average: 3 
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8], maxlen=10), Average: 4 
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], maxlen=10), Average: 4 
@deque([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], maxlen=10), Average: 5 
@deque([2, 3, 4, 5, 6, 7, 8, 9, 10, 11], maxlen=10), Average: 6 
@deque([3, 4, 5, 6, 7, 8, 9, 10, 11, 12], maxlen=10), Average: 7 
@deque([4, 5, 6, 7, 8, 9, 10, 11, 12, 13], maxlen=10), Average: 8 
@deque([5, 6, 7, 8, 9, 10, 11, 12, 13, 14], maxlen=10), Average: 9 
@deque([6, 7, 8, 9, 10, 11, 12, 13, 14, 15], maxlen=10), Average: 10 
@deque([7, 8, 9, 10, 11, 12, 13, 14, 15, 16], maxlen=10), Average: 11 
@deque([8, 9, 10, 11, 12, 13, 14, 15, 16, 17], maxlen=10), Average: 12 
@deque([9, 10, 11, 12, 13, 14, 15, 16, 17, 18], maxlen=10), Average: 13 
@deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10), Average: 14 
+0

当我试图调用'average'方法时,得到'TypeError:'numpy.float64'对象不可调用' – scls 2013-10-03 13:03:58

+4

对于我的文章?,代码不能在任何地方使用numpy。 ¿? – SmartElectron 2013-10-04 16:01:29

+0

是的......事实上,我猜Deque在内部使用numpy数组(在删除@property后它工作正常) – scls 2013-10-07 07:21:16

3

你也可以看到这个很老Python recipe

这是我自己的版本与NumPy的数组:

#!/usr/bin/env python 

import numpy as np 

class RingBuffer(object): 
    def __init__(self, size_max, default_value=0.0, dtype=float): 
     """initialization""" 
     self.size_max = size_max 

     self._data = np.empty(size_max, dtype=dtype) 
     self._data.fill(default_value) 

     self.size = 0 

    def append(self, value): 
     """append an element""" 
     self._data = np.roll(self._data, 1) 
     self._data[0] = value 

     self.size += 1 

     if self.size == self.size_max: 
      self.__class__ = RingBufferFull 

    def get_all(self): 
     """return a list of elements from the oldest to the newest""" 
     return(self._data) 

    def get_partial(self): 
     return(self.get_all()[0:self.size]) 

    def __getitem__(self, key): 
     """get element""" 
     return(self._data[key]) 

    def __repr__(self): 
     """return string representation""" 
     s = self._data.__repr__() 
     s = s + '\t' + str(self.size) 
     s = s + '\t' + self.get_all()[::-1].__repr__() 
     s = s + '\t' + self.get_partial()[::-1].__repr__() 
     return(s) 

class RingBufferFull(RingBuffer): 
    def append(self, value): 
     """append an element when buffer is full""" 
     self._data = np.roll(self._data, 1) 
     self._data[0] = value 
+0

+1使用numpy – shx2 2014-03-14 21:01:33

+0

+1使用numpy,但-1不使用循环缓冲区。你实现它的方式,你每次添加一个元素都会移动所有的数据,这会花费'O(n)'的时间。要实现一个合适的[循环缓冲区](https://en.wikipedia.org/wiki/Circular_buffer),你应该同时拥有一个索引和一个大小变量,并且你需要正确处理数据'绕过'缓冲区的结尾。在检索数据时,您可能必须在缓冲区的开始和结尾连接两个部分。 – 2017-06-07 09:35:50

1

这一个不需要任何库。它增长一个列表,然后通过索引循环。

占地面积非常小(没有库),运行速度至少比出队快两倍。这对计算移动的平均值很有帮助,但请注意,这些项目并不像上面那样按年龄分类。

class CircularBuffer(object): 
    def __init__(self, size): 
     """initialization""" 
     self.index= 0 
     self.size= size 
     self._data = [] 

    def record(self, value): 
     """append an element""" 
     if len(self._data) == self.size: 
      self._data[self.index]= value 
     else: 
      self._data.append(value) 
     self.index= (self.index + 1) % self.size 

    def __getitem__(self, key): 
     """get element by index like a regular array""" 
     return(self._data[key]) 

    def __repr__(self): 
     """return string representation""" 
     return self._data.__repr__() + ' (' + str(len(self._data))+' items)' 

    def get_all(self): 
     """return a list of all the elements""" 
     return(self._data) 

要得到的平均值,例如:

q= CircularBuffer(1000000); 
for i in range(40000): 
    q.record(i); 
print "capacity=", q.size 
print "stored=", len(q.get_all()) 
print "average=", sum(q.get_all())/len(q.get_all()) 

结果:

capacity= 1000000 
stored= 40000 
average= 19999 

real 0m0.024s 
user 0m0.020s 
sys 0m0.000s 

这是出队相当于约1/3的时间。

0

原来的问题是: “高效” 循环缓冲器。 根据这个效率的要求,aaronasterling的答案似乎是绝对正确的。 使用在Python中编程的专用类,并将时间处理与collections.deque进行比较,显示使用deque的x5.2倍加速! 这是非常简单的代码来测试这个:

class cb: 
    def __init__(self, size): 
     self.b = [0]*size 
     self.i = 0 
     self.sz = size 
    def append(self, v): 
     self.b[self.i] = v 
     self.i = (self.i + 1) % self.sz 

b = cb(1000) 
for i in range(10000): 
    b.append(i) 
# called 200 times, this lasts 1.097 second on my laptop 

from collections import deque 
b = deque([], 1000) 
for i in range(10000): 
    b.append(i) 
# called 200 times, this lasts 0.211 second on my laptop 

要变换双端队列到一个列表,只需使用:

my_list = [v for v in my_deque] 

然后,您将得到双端队列项目O(1)随机访问。当然,如果您在设置一次后需要对deque进行许多随机访问,这才是有价值的。

4

根据MoonCactus's answer,这里是一个circularlist类。与他的版本不同的是,这里c[0]将始终给最老的元素,c[-1]附加最新元素,c[-2]倒数第二...这对于应用程序更自然。

c = circularlist(4) 
c.append(1); print c, c[0], c[-1] #[1] (1 items)    first 1, last 1 
c.append(2); print c, c[0], c[-1] #[1, 2] (2 items)   first 1, last 2 
c.append(3); print c, c[0], c[-1] #[1, 2, 3] (3 items)  first 1, last 3 
c.append(8); print c, c[0], c[-1] #[1, 2, 3, 8] (4 items)  first 1, last 8 
c.append(10); print c, c[0], c[-1] #[10, 2, 3, 8] (4 items) first 2, last 10 
c.append(11); print c, c[0], c[-1] #[10, 11, 3, 8] (4 items) first 3, last 11 

类:

class circularlist(object): 
    def __init__(self, size): 
     """Initialization""" 
     self.index = 0 
     self.size = size 
     self._data = [] 

    def append(self, value): 
     """Append an element""" 
     if len(self._data) == self.size: 
      self._data[self.index] = value 
     else: 
      self._data.append(value) 
     self.index = (self.index + 1) % self.size 

    def __getitem__(self, key): 
     """Get element by index, relative to the current index""" 
     if len(self._data) == self.size: 
      return(self._data[(key + self.index) % self.size]) 
     else: 
      return(self._data[key]) 

    def __repr__(self): 
     """Return string representation""" 
     return self._data.__repr__() + ' (' + str(len(self._data))+' items)' 
1

我已经做串行编程之前这个问题。在一年前的时候,我找不到任何有效的实现,所以我最终编写了one as a C extension,并且在MIT许可证下它也可用on pypi。它是超级基础的,只处理8位有符号字符的缓冲区,但是长度很灵活,所以如果你需要字符以外的其他东西,你可以使用Struct或其他东西。我现在看到谷歌搜索,尽管现在有几个选项,所以你也可以看看这些。

0

你的回答是不正确的。 循环缓冲区主要有两个坚持的原则(https://en.wikipedia.org/wiki/Circular_buffer

  1. 缓冲器的lenth被设置好的;
  2. 先进先出;
  3. 当您添加或删除一个项目,其他项目不应该把他们的位置

您的代码如下:

def add_to_buffer(self, num): 
    self.mylist.pop(0) 
    self.mylist.append(num) 

让我们考虑的情况下,该列表已满,通过使用代码:

self.mylist = [1, 2, 3, 4, 5] 

现在我们追加6,列表改为

self.mylist = [2, 3, 4, 5, 6] 

项目预计1名单已经改变了他们的位置

你的代码是一个队列,而不是一个圈子的缓冲区。

Basj的答案,我认为是最有效的一个。

顺便说一下,圆形缓冲区可以提高 操作的性能以添加项目。

0

这是将相同的主体应用于一些旨在容纳最新文本消息的缓冲区。

import time 
import datetime 
import sys, getopt 

class textbffr(object): 
    def __init__(self, size_max): 
     #initialization 
     self.posn_max = size_max-1 
     self._data = [""]*(size_max) 
     self.posn = self.posn_max 

    def append(self, value): 
     #append an element 
     if self.posn == self.posn_max: 
      self.posn = 0 
      self._data[self.posn] = value 
     else: 
      self.posn += 1 
      self._data[self.posn] = value 

    def __getitem__(self, key): 
     #return stored element 
     if (key + self.posn+1) > self.posn_max: 
      return(self._data[key - (self.posn_max-self.posn)]) 
     else: 
      return(self._data[key + self.posn+1]) 


def print_bffr(bffr,bffer_max): 
    for ind in range(0,bffer_max): 
     stored = bffr[ind] 
     if stored != "": 
      print(stored) 
    print ('\n') 

def make_time_text(time_value): 
    return(str(time_value.month).zfill(2) + str(time_value.day).zfill(2) 
     + str(time_value.hour).zfill(2) + str(time_value.minute).zfill(2) 
     + str(time_value.second).zfill(2)) 


def main(argv): 
    #Set things up 
    starttime = datetime.datetime.now() 
    log_max = 5 
    status_max = 7 
    log_bffr = textbffr(log_max) 
    status_bffr = textbffr(status_max) 
    scan_count = 1 

    #Main Loop 
    # every 10 secounds write a line with the time and the scan count. 
    while True: 

     time_text = make_time_text(datetime.datetime.now()) 
     #create next messages and store in buffers 
     status_bffr.append(str(scan_count).zfill(6) + " : Status is just fine at : " + time_text) 
     log_bffr.append(str(scan_count).zfill(6) + " : " + time_text + " : Logging Text ") 

     #print whole buffers so far 
     print_bffr(log_bffr,log_max) 
     print_bffr(status_bffr,status_max) 

     time.sleep(2) 
     scan_count += 1 

if __name__ == '__main__': 
    main(sys.argv[1:])