2013-02-14 18 views
1

给定一个numpy的阵列,这样,包含任意的数据:迭代一个numpy的阵列,选择性地收集一个或两个值,给定的判据

>>> data 
array([ 1, 172, 32, ..., 42, 189, 29], dtype=int8) # SIGNED int8 

...我需要构造一个numpy的阵列“结果'如下:

(请原谅伪代码的实现,如果我知道该怎么做,我不会问,如果我有一个工作的numpy实现,我会直接把我的问题改为CodeReview。

for value in data, check: 
    if value & 0x01: 
     result.append((value >> 1 << 8) + next(value).astype(numpy.uint8)) 
     # that is: take TWO values from 'data', one signed, the next un-signed, glue them together, appending ONE int16 to result 
    else: 
     result.append(value >> 1) 
     # that is: take ONE value from 'data', appending ONE int8 to result 

我已经在“普通”的Python中实现了这个。它工作得很好,但可以使用numpy和非常高效的数组操作进行优化。我想摆脱名单和追加。可悲的是,我不知道如何完成它:

# data is a string of 'bytes' received from a device 
def unpack(data): 
    l = len(data) 
    p = 0 
    result = [] 

    while p < l: 
     i1 = (((ord(data[p]) + 128) % 256) - 128) 
     p += 1 
     if i1 & 0x01: 
      # read next 'char' as an uint8 
      # 
      # due to the nature of the protocol, 
      # we will always have sufficient data 
      # available to avoid reading past the end 
      i2 = ord(data[p]) 
      p += 1 
      result.append((i1 >> 1 << 8) + i2) 
     else: 
      result.append(i1 >> 1) 

    return result 

更新:感谢@Jaime我已经设法实现高效的解压缩功能。这与他的非常相似,虽然速度更快。 while循环当然是关键部分。我在这里发布它,以防万一有兴趣:

def new_np_unpack(data): 
    mask = (data & 0x01).astype(numpy.bool) 

    true_positives = None 

    while True: 
     # check for 'true positives' in the tentative mask 
     # the next item must by definition be a false one 
     true_positives = numpy.nonzero(numpy.logical_and(mask, numpy.invert(numpy.concatenate(([False], mask[:-1])))))[0] 

     # loop until no more 'false positives' 
     if not numpy.any(mask[true_positives+1]): 
      break 

     mask[true_positives+1] = False 

    result = numpy.empty(data.shape, dtype='int16') 
    result[:] = data.astype('int8') >> 1 
    result[true_positives] = (result[true_positives] << 8) + data[true_positives + 1] 
    mask = numpy.ones(data.shape, dtype=bool) 
    mask[true_positives + 1] = False 
    return result[mask] 

回答

1

我得到了一些矢量化的工作。为了便于比较,我把ord(...)出你的代码,并馈送它像数据:

data = np.random.randint(256, size=(1000000,)).astype('uint8') 
data[-1] = 0 # to avoid errors with last element 

我的版本的功能:

def np_unpack(data) : 
    # find where condition is met 
    mask = (data & 0x01).astype(bool) 
    # Keep only 1st, 3rd, 5th... consecutive occurrences of True in mask 
    new_mask = mask[:] 
    mult = -1 
    while new_mask.sum() : 
     new_mask = np.logical_and(new_mask, 
            np.concatenate(([False], new_mask[:-1]))) 
     mask += new_mask * mult 
     mult *= -1 
    del new_mask 
    cond = np.nonzero(mask)[0] 
    result = np.empty(data.shape, dtype='int16') 
    result[:] = data.astype('int8') >> 1 
    result[cond] <<= 8 
    result[cond] += data[cond + 1] 
    mask = np.ones(data.shape, dtype=bool) 
    mask[cond + 1] = False 
    return result[mask] 

而且一些测试用1M元素的列表:

In [4]: np.all(unpack(data) == np_unpack(data)) 
Out[4]: True 

In [5]: %timeit unpack(data) 
1 loops, best of 3: 7.11 s per loop 

In [6]: %timeit np_unpack(data) 
1 loops, best of 3: 294 ms per loop 
+0

谢谢!我已验证您的功能,并按预期工作。现在我只需要弄清楚你在做什么。 :-) – Micke 2013-02-15 10:42:32

+1

@Micke唯一棘手的部分是'while'循环。在每次迭代之后添加一些'mask'和'new_mask'的打印,并且给它像'[1,0,1,1,0,1,1,1,0,1,1,1,1]',它应该让事情很清楚。 – Jaime 2013-02-15 14:43:48

+0

我终于明白了。把你的代码逐个分开,我现在明白了。事实上,我做了一些修改,将执行时间缩短了30%。再次感谢您花时间帮助! – Micke 2013-02-15 16:13:34