2016-08-22 46 views
1

我给它含有具有逗号分隔的数据文件名files列表进行清洁,以及通过包含基于文件名信息栏进一步扩展。因此,我实现了一个小read_file函数,它处理两个,初始清洁,以及附加列的计算。使用db.from_sequence(files).map(read_file),我将读取函数映射到所有文件,每个文件都获取一个字典列表。DASK包成DASK数据帧的列

然而,而不是字典的名单,我想我的包包含输入文件作为一个入门的每个单独的线。随后,我想将字典的键映射到dask数据框中的列名。

from dask import bag as db 

def read_file(filename): 
    ret = [] 
    with open(filename, 'r') as fp: 
     ... # reading line of file and storing result in dict 
     ret.append({'a': val_a, 'b': val_b, 'c': val_c}) 

    return ret 

from dask import bag as db 
files = ['a.txt', 'b.txt', 'c.txt'] 
my_bag = db.from_sequence(files).map(read_file) 
# a,b,c are the keys of the dictionaries returned by read_file 
my_df = my_bag.to_dataframe(columns=['a', 'b', 'c']) 

有人能让我知道我必须改变以获得此代码的运行吗?是否有不同的方法更适合?

编辑: 我已经创建了三个测试文件a_20160101.txt,a_20160102.txt,a_20160103.txt。它们都只包含几行,每行只有一个字符串。

asdf 
sadfsadf 
sadf 
fsadff 
asdf 
sadfasd 
fa 
sf 
ads 
f 

以前我在read_file了一个小错误,但现在,调用my_bag.take(10)后映射到阅读器工作正常:

([{'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'asdf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadfsadf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'fsadff', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'asdf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadfasd', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'fa', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'ads', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'f', 'c': 'XY'}],) 

然而my_df = my_bag.to_dataframe(columns=['a', 'b', 'c'])随后 my_df.head(10)仍然引起dask.async.AssertionError: 3 columns passed, passed data had 10 columns

+0

有一袋字典,然后传递给to_dataframe有什么问题? – MRocklin

回答

0

你可能需要调用concat

您的文件名的包看起来是这样的:

['a.txt', 
'b.txt', 
'c.txt'] 

在调用映射你的包是这样的:

[[{'a': 1, 'b': 2, 'c': 3}, {'a': 10, 'b': 20, 'c': 30}], 
[{'a': 1, 'b': 2, 'c': 3}], 
[{'a': 1, 'b': 2, 'c': 3}, {'a': 10, 'b': 20, 'c': 30}]] 

每个文件被辟为类型的字典列表。现在你的包有点像一个列表的列表。

.to_dataframe方法,希望你有一个列表的-类型的字典。因此,让我们的行李连接成一个单一的扁平化集合的书稿

my_bag = db.from_sequence(files).map(read_file).concat() 

[{'a': 1, 'b': 2, 'c': 3}, {'a': 10, 'b': 20, 'c': 30}, 
{'a': 1, 'b': 2, 'c': 3}, 
{'a': 1, 'b': 2, 'c': 3}, {'a': 10, 'b': 20, 'c': 30}] 
+0

''concat()'它完美的工作,谢谢!使用不同的方法会更好吗?还是变得更好?如果我没记错的[DASK教程(https://github.com/dask/dask-tutorial)正确,包装袋上应罚款摄入/转换,这是正确的? – sim

+0

这对我来说似乎是一个合理的方法。你也可以尝试[dask.delayed](http://dask.readthedocs.io/en/latest/delayed.html)。请参阅[使用藏品]的注释(http://dask.readthedocs.io/en/latest/delayed-collections.html) – MRocklin