2012-11-21 52 views
10

我有一个3D阵列,我需要插入一个轴(最后一维)。比方说y.shape = (nx, ny, nz),我想每(nx, ny)插入nz。不过,我想在每个[i, j]内插一个不同的值。三维阵列快速插值

这里有一些代码来举例说明。如果我想,内插一个值,说new_z,我会用scipy.interpolate.interp1d这样

# y is a 3D ndarray 
# x is a 1D ndarray with the abcissa values 
# new_z is a number 
f = scipy.interpolate.interp1d(x, y, axis=-1, kind='linear') 
result = f(new_z) 

然而,对于这个问题是什么其实我想要的是插值到不同new_z每个y[i, j]。所以我这样做:

# y is a 3D ndarray 
# x is a 1D ndarray with the abcissa values 
# new_z is a 2D array 
result = numpy.empty(y.shape[:-1]) 
for i in range(nx): 
    for j in range(ny): 
     f = scipy.interpolate.interp1d(x, y[i, j], axis=-1, kind='linear') 
     result[i, j] = f(new_z[i, j]) 

不幸的是,由于多个循环,这变得低效率和慢。有没有更好的方法来做这种插值?线性插值就足够了。一种可能性是在Cython中实现这一点,但我试图避免这种情况,因为我想要更改为三次插值的灵活性,并且不想在Cython中手动完成。

回答

6

要加速高阶内插,您只能拨打interp1d()一次,然后在_fitpack模块中使用_spline属性和低级功能_bspleval()。下面是代码:

from scipy.interpolate import interp1d 
import numpy as np 

nx, ny, nz = 30, 40, 50 
x = np.arange(0, nz, 1.0) 
y = np.random.randn(nx, ny, nz) 
new_x = np.random.random_integers(1, (nz-1)*10, size=(nx, ny))/10.0 

def original_interpolation(x, y, new_x): 
    result = np.empty(y.shape[:-1]) 
    for i in xrange(nx): 
     for j in xrange(ny): 
      f = interp1d(x, y[i, j], axis=-1, kind=3) 
      result[i, j] = f(new_x[i, j]) 
    return result 

def fast_interpolation(x, y, new_x): 
    from scipy.interpolate._fitpack import _bspleval 
    f = interp1d(x, y, axis=-1, kind=3) 
    xj,cvals,k = f._spline 
    result = np.empty_like(new_x) 
    for (i, j), value in np.ndenumerate(new_x): 
     result[i, j] = _bspleval(value, x, cvals[:, i, j], k, 0) 
    return result 

r1 = original_interpolation(x, y, new_x) 
r2 = fast_interpolation(x, y, new_x) 

>>> np.allclose(r1, r2) 
True 

%timeit original_interpolation(x, y, new_x) 
%timeit fast_interpolation(x, y, new_x) 
1 loops, best of 3: 3.78 s per loop 
100 loops, best of 3: 15.4 ms per loop 
+0

谢谢。你的解决方案也很有趣。我对这么多好的答案感到惊讶。不幸的是,我只能接受一个。尽管您的解决方案没有加速Cython或@pv的解决方案,但它更适合构建问题。而且在插值方面最为灵活。所以我接受它。 – tiago

+0

我想运行这段代码,但我得到这个错误'BSpline'对象不可迭代 – Delosari

3

我不认为interp1d有一个快速的方法,所以你不能避免这里的循环。

用Cython你可能仍然编码了使用np.searchsorted,像这样(未测试)的线性插值避免:

def interp3d(x, y, new_x): 
    assert x.ndim == 1 and y.ndim == 3 and new_x.ndim == 2 
    assert y.shape[:2] == new_x.shape and x.shape == y.shape[2:] 

    nx, ny = y.shape[:2] 
    new_x = new_x.ravel() 
    j = np.arange(len(new_x)) 
    k = np.searchsorted(x, new_x).clip(1, len(x) - 1) 
    y = y.reshape(-1, x.shape[0]) 
    p = (new_x - x[k-1])/(x[k] - x[k-1]) 
    result = (1 - p) * y[j,k-1] + p * y[j,k] 
    return result.reshape(nx, ny) 

不立方插值帮助,虽然。

编辑:使其成为函数并修正了错误的一个错误。一些与Cython比较(500x500x500网格):

In [58]: %timeit interp3d(x, y, new_x) 
10 loops, best of 3: 82.7 ms per loop 

In [59]: %timeit cyfile.interp3d(x, y, new_x) 
10 loops, best of 3: 86.3 ms per loop 

In [60]: abs(interp3d(x, y, new_x) - cyfile.interp3d(x, y, new_x)).max() 
Out[60]: 2.2204460492503131e-16 

虽然,可以争辩说,Cython代码更容易阅读。

+0

谢谢,它肯定是与numpy做它的一个优雅的方式。我结束了一个快速的Cython解决方案(请参阅我的答案)。编写Cython比等待python版本完成运行速度要快。 – tiago

+0

@pv。你可以通过[作为矢量操作执行插值]来避免循环(http://stackoverflow.com/a/13495570/709852) –

+0

@HenryGomersall:是的(我在上面的代码中试过),但我的意思是这是不可能的,如果你想坚持interp1d。 –

3

由于上面的numpy建议时间太长,我可以等待,这里是cython版本,以备将来参考。从一些松散的基准测试中,它的速度大约快了3000倍(当然,这只是线性插值,并不如interp1d,但对于这个目的来说没关系)。

import numpy as N 
cimport numpy as N 
cimport cython 

DTYPEf = N.float64 
ctypedef N.float64_t DTYPEf_t 

@cython.boundscheck(False) # turn of bounds-checking for entire function 
@cython.wraparound(False) # turn of bounds-checking for entire function 
cpdef interp3d(N.ndarray[DTYPEf_t, ndim=1] x, N.ndarray[DTYPEf_t, ndim=3] y, 
       N.ndarray[DTYPEf_t, ndim=2] new_x): 
    """ 
    interp3d(x, y, new_x) 

    Performs linear interpolation over the last dimension of a 3D array, 
    according to new values from a 2D array new_x. Thus, interpolate 
    y[i, j, :] for new_x[i, j]. 

    Parameters 
    ---------- 
    x : 1-D ndarray (double type) 
     Array containg the x (abcissa) values. Must be monotonically 
     increasing. 
    y : 3-D ndarray (double type) 
     Array containing the y values to interpolate. 
    x_new: 2-D ndarray (double type) 
     Array with new abcissas to interpolate. 

    Returns 
    ------- 
    new_y : 3-D ndarray 
     Interpolated values. 
    """ 
    cdef int nx = y.shape[0] 
    cdef int ny = y.shape[1] 
    cdef int nz = y.shape[2] 
    cdef int i, j, k 
    cdef N.ndarray[DTYPEf_t, ndim=2] new_y = N.zeros((nx, ny), dtype=DTYPEf) 

    for i in range(nx): 
     for j in range(ny): 
      for k in range(1, nz): 
       if x[k] > new_x[i, j]: 
        new_y[i, j] = (y[i, j, k] - y[i, j, k - 1]) * \ 
        (new_x[i, j] - x[k-1])/(x[k] - x[k - 1]) + y[i, j, k - 1] 
        break 
    return new_y 
1

你可以使用map_coordinates为:

from numpy import random, meshgrid, arange 
from scipy.ndimage import map_coordinates 

(nx, ny, nz) = (4, 5, 6) 
# some random array 
A = random.rand(nx, ny, nz) 

# random floating-point indices in [0, nz-1] 
Z = random.rand(nx, ny)*(nz-1) 

# regular integer indices of shape (nx,ny) 
X, Y = meshgrid(arange(nx), arange(ny), indexing='ij') 

coords = (X, Y, Z) # X, Y, and Z are of shape (nx, ny) 

print map_coordinates(A, coords, order=1, cval=-999.) 
+0

我曾想过'map_coordinates',感谢您的建议。在我的情况下,'nx,ny,nz'每个接近500,所以我认为'map_coordinates'可能会对RAM有点贪婪。我将运行一些基准和报告。 – tiago

2

大厦@pv.'s answer,并且向量化内部循环,下面给出大幅加速(编辑:改变了昂贵numpy.tile使用numpy.lib.stride_tricks.as_strided):

import numpy 
from scipy import interpolate 

nx = 30 
ny = 40 
nz = 50 

y = numpy.random.randn(nx, ny, nz) 
x = numpy.float64(numpy.arange(0, nz)) 

# We select some locations in the range [0.1, nz-0.1] 
new_z = numpy.random.random_integers(1, (nz-1)*10, size=(nx, ny))/10.0 

# y is a 3D ndarray 
# x is a 1D ndarray with the abcissa values 
# new_z is a 2D array 

def original_interpolation(): 
    result = numpy.empty(y.shape[:-1]) 
    for i in range(nx): 
     for j in range(ny): 
      f = interpolate.interp1d(x, y[i, j], axis=-1, kind='linear') 
      result[i, j] = f(new_z[i, j]) 

    return result 

grid_x, grid_y = numpy.mgrid[0:nx, 0:ny] 
def faster_interpolation(): 
    flat_new_z = new_z.ravel() 
    k = numpy.searchsorted(x, flat_new_z) 
    k = k.reshape(nx, ny) 

    lower_index = [grid_x, grid_y, k-1] 
    upper_index = [grid_x, grid_y, k] 

    tiled_x = numpy.lib.stride_tricks.as_strided(x, shape=(nx, ny, nz), 
     strides=(0, 0, x.itemsize)) 

    z_upper = tiled_x[upper_index] 
    z_lower = tiled_x[lower_index] 

    z_step = z_upper - z_lower 
    z_delta = new_z - z_lower 

    y_lower = y[lower_index] 
    result = y_lower + z_delta * (y[upper_index] - y_lower)/z_step 

    return result 

# both should be the same (giving a small difference) 
print numpy.max(
     numpy.abs(original_interpolation() - faster_interpolation())) 

在我的机器上给出以下时间:

In [8]: timeit foo.original_interpolation() 
10 loops, best of 3: 102 ms per loop 

In [9]: timeit foo.faster_interpolation() 
1000 loops, best of 3: 564 us per loop 

nx = 300ny = 300nz = 500,给人以130X加速:

In [2]: timeit original_interpolation() 
1 loops, best of 3: 8.27 s per loop 

In [3]: timeit faster_interpolation() 
10 loops, best of 3: 60.1 ms per loop 

你需要一个写自己的算法立方插值,但它不应该是这么难。

2

虽然有几个不错的答案, 他们还在做在一个固定的500多头排列250K插值:

j250k = np.searchsorted(X500, X250k) # indices in [0, 500) 

这可以用LUT有待加快,查找表有说5K插槽:

lut = np.interp(np.arange(5000), X500, np.arange(500)).round().astype(int) 
xscale = (X - X.min()) * (5000 - 1) \ 
     /(X.max() - X.min()) 
j = lut.take(xscale.astype(int), mode="clip") # take(floats) in numpy 1.7 ? 

#--------------------------------------------------------------------------- 
# X  | |  | |    | 
# j  0 1  2 3    4 ... 
# LUT |....|.......|.|.............|.... -> int j (+ offset in [0, 1)) 
#--------------------------------------------------------------------------- 

searchsorted是蛮快的,时间〜LN2 500, 所以这可能是快不了多少。
但LUTs很快在C,非常一个简单的速度/内存折衷。