2011-09-27 44 views
13

我有非常大的数据集存储在硬盘上的二进制文件。这里是文件结构的一个例子:从二进制文件创建Numpy数组的有效方法

文件头

149 Byte ASCII Header 

记录开始

4 Byte Int - Record Timestamp 

样品开始

2 Byte Int - Data Stream 1 Sample 
2 Byte Int - Data Stream 2 Sample 
2 Byte Int - Data Stream 3 Sample 
2 Byte Int - Data Stream 4 Sample 

样品结束

每个记录有122,880个样本和每个文件有713个记录。这产生了700,910,521字节的总大小。采样率和记录数量有时会有所不同,所以我必须编码以检测每个文件的每个数量。

目前我使用这个数据导入到阵列中的代码是这样的:

from time import clock 
from numpy import zeros , int16 , int32 , hstack , array , savez 
from struct import unpack 
from os.path import getsize 

start_time = clock() 
file_size = getsize(input_file) 

with open(input_file,'rb') as openfile: 
    input_data = openfile.read() 

header = input_data[:149] 
record_size = int(header[23:31]) 
number_of_records = (file_size - 149)/record_size 
sample_rate = ((record_size - 4)/4)/2 

time_series = zeros(0,dtype=int32) 
t_series = zeros(0,dtype=int16) 
x_series = zeros(0,dtype=int16) 
y_series = zeros(0,dtype=int16) 
z_series = zeros(0,dtype=int16) 

for record in xrange(number_of_records): 

    time_stamp = array(unpack('<l' , input_data[ 149 + (record * record_size) : 149 + (record * record_size) + 4 ]) , dtype = int32) 
    unpacked_record = unpack('<' + str(sample_rate * 4) + 'h' , input_data[ 149 + (record * record_size) + 4 : 149 + ((record + 1) * record_size) ]) 

    record_t = zeros(sample_rate , dtype=int16) 
    record_x = zeros(sample_rate , dtype=int16) 
    record_y = zeros(sample_rate , dtype=int16) 
    record_z = zeros(sample_rate , dtype=int16) 

    for sample in xrange(sample_rate): 

    record_t[sample] = unpacked_record[ (sample * 4) + 0 ] 
    record_x[sample] = unpacked_record[ (sample * 4) + 1 ] 
    record_y[sample] = unpacked_record[ (sample * 4) + 2 ] 
    record_z[sample] = unpacked_record[ (sample * 4) + 3 ] 

    time_series = hstack ((time_series , time_stamp)) 
    t_series = hstack ((t_series , record_t)) 
    x_series = hstack ((x_series , record_x)) 
    y_series = hstack ((y_series , record_y)) 
    z_series = hstack ((z_series , record_z)) 

savez(output_file, t=t_series , x=x_series ,y=y_series, z=z_series, time=time_series) 
end_time = clock() 
print 'Total Time',end_time - start_time,'seconds' 

目前这需要每个700 MB的文件约250秒,这对我来说似乎是非常高的。有没有更有效的方法可以做到这一点?

最终解决

使用与自定义的numpy的FROMFILE方法D型运行时切断至9秒,比上述原始代码27倍快。最终的代码如下。

from numpy import savez, dtype , fromfile 
from os.path import getsize 
from time import clock 

start_time = clock() 
file_size = getsize(input_file) 

openfile = open(input_file,'rb') 
header = openfile.read(149) 
record_size = int(header[23:31]) 
number_of_records = (file_size - 149)/record_size 
sample_rate = ((record_size - 4)/4)/2 

record_dtype = dtype([ ('timestamp' , '<i4') , ('samples' , '<i2' , (sample_rate , 4)) ]) 

data = fromfile(openfile , dtype = record_dtype , count = number_of_records) 
time_series = data['timestamp'] 
t_series = data['samples'][:,:,0].ravel() 
x_series = data['samples'][:,:,1].ravel() 
y_series = data['samples'][:,:,2].ravel() 
z_series = data['samples'][:,:,3].ravel() 

savez(output_file, t=t_series , x=x_series ,y=y_series, z=z_series, fid=time_series) 

end_time = clock() 

print 'It took',end_time - start_time,'seconds' 
+0

它是医疗数据? EDF?如果你不知道我在说什么,不要介意......; o)无论如何,看看我的答案,根据这个问题,我用它来打开医疗数据二进制文件:http://stackoverflow.com/q/5804052/401828。那里有一个有趣的讨论。 – heltonbiker

+0

不是地球物理数据。在发布之前我在研究过程中看到了您的问题。您的数据只包含简短的整数,其中不幸的是整个流中分散了4个字节的整数时间戳。 – Stu

+0

对于它的价值,numpy结构化数组上的许多操作比常规numpy数组慢得多。导入时间可能会更快,但计算时间可能会延长10-100倍:( –

回答

13

一些提示:

是这样的(未经测试,但你的想法):

 
import numpy as np 

file = open(input_file, 'rb') 
header = file.read(149) 

# ... parse the header as you did ... 

record_dtype = np.dtype([ 
    ('timestamp', '<i4'), 
    ('samples', '<i2', (sample_rate, 4)) 
]) 

data = np.fromfile(file, dtype=record_dtype, count=number_of_records) 
# NB: count can be omitted -- it just reads the whole file then 

time_series = data['timestamp'] 
t_series = data['samples'][:,:,0].ravel() 
x_series = data['samples'][:,:,1].ravel() 
y_series = data['samples'][:,:,2].ravel() 
z_series = data['samples'][:,:,3].ravel() 
+0

现在总时间为9.07秒,包括savez!谢谢。我将用最终的代码更新这个问题。 – Stu

+0

伟大的答案,优良的使用numpy内置功能! +1 – heltonbiker

+0

你怎么知道你是否有头,多长时间? – mmann1123

2

numpy的支持从数据映射二进制直接进入阵列等经由numpy.memmap对象。您可能能够通过偏移映射文件并提取所需的数据。

对于字节序的正确性只使用numpy.byteswap对你们在读什么,你可以使用条件表达式来检查主机系统的字节序:

if struct.pack('=f', np.pi) == struct.pack('>f', np.pi): 
    # Host is big-endian, in-place conversion 
    arrayName.byteswap(True) 
+0

我已经看过,但似乎没有办法指定数据的字节顺序。代码需要在两个窗口下工作, unix所以endian-ness需要明确陈述 – Stu

+0

你可以使用[numpy.byteswap](http://docs.scipy.org/doc/numpy/reference/generated/numpy.memmap.html)来设置正确的如果需要的话,数据的字节顺序,请参阅编辑 – talonmies

+0

对于函数'numpy.memmap',这看起来非常有用 – heltonbiker

2

一个明显的低效率是在使用hstack一个循环:

time_series = hstack ((time_series , time_stamp)) 
    t_series = hstack ((t_series , record_t)) 
    x_series = hstack ((x_series , record_x)) 
    y_series = hstack ((y_series , record_y)) 
    z_series = hstack ((z_series , record_z)) 

在每次迭代,这对于分配给每一个系列,并将所有数据读取到目前为止到它的一个稍微大一点的数组。这涉及到批次的不必要的复制,并可能导致内存碎片错误。

我积累的time_stamp值在列表中,做在一端hstack,并会做同样的record_t

如果没有携带足够的性能提升,我会注释掉循环的主体,并开始将事情一次性带回,以查看所花的时间。

+0

好吧,执行此操作可以将时间缩短到110秒!!谢谢,我是 – Stu

+0

也是110秒的时间,大概有40个是我不能优化的savez函数,虽然比较加载.npz只需要20秒 – Stu

+0

我必须有蜜蜂n关于savez错误,因为使用自定义dtype的fromfile方法,包括savez的时间降至9秒。 – Stu

0

通过使用arraystruct.unpack,我得到了令人满意的结果以及类似的问题(多分辨率多通道二进制数据文件)。在我的问题中,我希望每个通道都有连续的数据,但文件具有面向间隔的结构,而不是面向通道的结构。

的“秘密”是先读取整个文件,并且只有然后分发已知大小的切片,以所希望的容器(下面的代码,self.channel_content[channel]['recording']array类型的对象):

f = open(somefilename, 'rb')  
fullsamples = array('h') 
fullsamples.fromfile(f, os.path.getsize(wholefilename)/2 - f.tell()) 
position = 0 
for rec in xrange(int(self.header['nrecs'])): 
    for channel in self.channel_labels: 
     samples = int(self.channel_content[channel]['nsamples']) 
     self.channel_content[channel]['recording'].extend(fullsamples[position:position+samples]) 
      position += samples 

当然,我不能说这比其他答案更好或更快,但至少是你可能评估的东西。

希望它有帮助!