2017-04-22 278 views
2

我有2个数组大小不等的:numpy的阵列比较和索引

>>> np.size(array1) 
4004001 
>>> np.size(array2) 
1000 

现在,在数组2的每个元素需要进行比较,以在ARRAY1的所有元素,以找到具有最接近的值的元素在array2中的这个元素的那个。 找到此值后,我需要将其存储在大小为1000的不同数组中 - 其中一个大小对应于array2。

这样做的单调乏味和粗糙的方式可能是使用for循环,并从数组2中取出每个元素,从数组1中减去其绝对值,然后取最小值 - 这会让我的代码非常慢。

我想使用numpy矢量化操作来做到这一点,但我有点碰壁。

+1

首先对两个数组进行排序。然后遍历大数组,保持小数组中当前最接近的元素的索引。根据需要增加索引。如果itertools中有些东西会加快速度,我不会感到惊讶。 –

+1

[在numpy数组中找到最接近的值]的可能重复(http://stackoverflow.com/questions/2566412/find-nearest-value-in-numpy-array) –

回答

1

要充分利用numpy并行性,我们需要矢量化函数。此外,使用相同的标准(最近)在相同的数组(array1)中找到所有值。因此,可以制作一个专门用于在array1中搜索的特殊功能。

但是,为了使解决方案更具可重用性,最好制作更通用的解决方案,然后将其转换为更具体的解决方案。因此,作为找到最接近的值的一般方法,我们从this find nearest solution开始。然后我们把它转换成一个更加具体和矢量化它,允许它在一次多个元素上工作:

import math 
import numpy as np 
from functools import partial 

def find_nearest_sorted(array,value): 
    idx = np.searchsorted(array, value, side="left") 
    if idx > 0 and (idx == len(array) or math.fabs(value - array[idx-1]) < math.fabs(value - array[idx])): 
     return array[idx-1] 
    else: 
     return array[idx] 

array1 = np.random.rand(4004001) 
array2 = np.random.rand(1000) 

array1_sorted = np.sort(array1) 

# Partially apply array1 to find function, to turn the general function 
# into a specific, working with array1 only. 
find_nearest_in_array1 = partial(find_nearest_sorted, array1_sorted) 

# Vectorize specific function to allow us to apply it to all elements of 
# array2, the numpy way. 
vectorized_find = np.vectorize(find_nearest_in_array1) 

output = vectorized_find(array2) 

希望这是你想要的,一个新的载体,映射数据array2到最近的值在array1

+0

而且,由于我们要查看'array1'多个次(1000次),首先对数组进行排序,从而节省一次排序成本,以加快随后的每次查找操作。 – JohanL

+0

谢谢@JohanL和大家的帮助!我以前从未使用过functools。这很棒! – sb25

0
import numpy as np 
a = np.random.random(size=4004001).astype(np.float16) 
b = np.random.random(size=1000).astype(np.float16) 
#use numpy broadcasting to compare pairwise difference and then find the min arg in a for each element in b. Finally extract elements from a using the argmin array as indexes. 
output = a[np.argmin(np.abs(b[:,None] -a),axis=1)] 

这个解决方案虽然简单,但可能会占用大量内存。如果在大型阵列上使用它,可能需要进一步优化。

+0

这个解决方案的时间和空间复杂度相当大,因为它将问题扩展到一个尺寸为4004001x1000的矩阵,然后它不对array1进行排序,从而使得find('min')操作比需要的慢成为。 – JohanL

+0

是的,我意识到这一点,并且正在考虑优化它的方法,同时保持其简单性。 – Allen

+0

请编辑您的答案以包含一些解释。仅有代码的答案对未来SO读者的教育很少。您的回答是在低质量的审核队列中。 – mickmackusa

0

最“numpythonic”的方式是使用broadcasting。这是计算距离矩阵的一种快速而简单的方法,然后您可以获取绝对值的argmin

形状的
array1 = np.random.rand(4004001) 
array2 = np.random.rand(1000) 

# Calculate distance matrix (on truncated array1 for memory reasons) 
dmat = array1[:400400] - array2[:,None] 

# Take the abs of the distance matrix and work out the argmin along the last axis 
ix = np.abs(dmat).argmin(axis=1) 

dmat

(1000, 400400) 

的形状ix和内容:

(1000,)  
array([237473, 166831, 72369, 11663, 22998, 85179, 231702, 322752, ...]) 

然而,它的内存饿了,如果你在一个去做这个手术了,居然不在我的8GB机器上处理您指定的阵列大小,这就是为什么我减小了array1的大小的原因。

要使其在内存限制内工作,只需将其中一个数组切片为块,然后依次(或平行)在每个块上应用广播。在这种情况下,我将array2分为10个区块:

# Define number of chunks and calculate chunk size 
n_chunks = 10 
chunk_len = array2.size // n_chunks 

# Preallocate output array 
out = np.zeros(1000) 

for i in range(n_chunks): 
    s = slice(i*chunk_len, (i+1)*chunk_len) 
    out[s] = np.abs(array1 - array2[s, None]).argmin(axis=1) 
+0

你的解决方案仍然相当饿死,即使是大块头。它也很慢,因为它的最小操作是O(n),对于未排序的列表。这就是为什么我觉得需要一种更复杂的方法,但时间复杂性大大提高。 – JohanL

+0

但它的工作原理很容易理解。如果速度和内存是无法通过并行解决的OP的重要问题,那么更复杂的方法是合理的。 – FuzzyDuck