我给它含有具有逗号分隔的数据文件名files
列表进行清洁,以及通过包含基于文件名信息栏进一步扩展。因此,我实现了一个小read_file
函数,它处理两个,初始清洁,以及附加列的计算。使用db.from_sequence(files).map(read_file)
,我将读取函数映射到所有文件,每个文件都获取一个字典列表。DASK包成DASK数据帧的列
然而,而不是字典的名单,我想我的包包含输入文件作为一个入门的每个单独的线。随后,我想将字典的键映射到dask数据框中的列名。
from dask import bag as db
def read_file(filename):
ret = []
with open(filename, 'r') as fp:
... # reading line of file and storing result in dict
ret.append({'a': val_a, 'b': val_b, 'c': val_c})
return ret
from dask import bag as db
files = ['a.txt', 'b.txt', 'c.txt']
my_bag = db.from_sequence(files).map(read_file)
# a,b,c are the keys of the dictionaries returned by read_file
my_df = my_bag.to_dataframe(columns=['a', 'b', 'c'])
有人能让我知道我必须改变以获得此代码的运行吗?是否有不同的方法更适合?
编辑: 我已经创建了三个测试文件a_20160101.txt
,a_20160102.txt
,a_20160103.txt
。它们都只包含几行,每行只有一个字符串。
asdf
sadfsadf
sadf
fsadff
asdf
sadfasd
fa
sf
ads
f
以前我在read_file
了一个小错误,但现在,调用my_bag.take(10)
后映射到阅读器工作正常:
([{'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'asdf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadfsadf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'fsadff', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'asdf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadfasd', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'fa', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'ads', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'f', 'c': 'XY'}],)
然而my_df = my_bag.to_dataframe(columns=['a', 'b', 'c'])
随后 my_df.head(10)
仍然引起dask.async.AssertionError: 3 columns passed, passed data had 10 columns
有一袋字典,然后传递给to_dataframe有什么问题? – MRocklin