我正在dask构建一个非常大的DAG,以提交给分布式调度程序,其中节点在数据框上运行,这些数据框本身可能非常大。一种模式是我有大约50-60个函数来加载数据并构建每个数百MB的熊猫数据框(并且逻辑上表示单个表的分区)。我想将它们连接成图中下游节点的单个dask数据帧,同时最小化数据移动。我链接像这样的任务:Dask图形执行和内存使用
dfs = [dask.delayed(load_pandas)(i) for i in disjoint_set_of_dfs]
dfs = [dask.delayed(pandas_to_dask)(df) for df in dfs]
return dask.delayed(concat_all)(dfs)
其中
def pandas_to_dask(df):
return dask.dataframe.from_pandas(df).to_delayed()
,我曾尝试过各种concat_all
implentations,但是这似乎是合理的:
def concat_all(dfs):
dfs = [dask.dataframe.from_delayed(df) for df in dfs]
return dask.dataframe.multi.concat(dfs, axis='index', join='inner')
所有大熊猫dataframes位于不相连的索引和排序/单调。
但是,我遇到了在这个concat_all
函数中死亡的工作人员(集群管理器正在杀死他们超出他们的内存预算),尽管每个内存的预算实际上相当大,我不希望它移动周围的数据。我有理由相信,在使用dask数据帧的图点节点中调用compute()
之前,我总是切片到一个合理的数据子集。
我在玩--memory-limit
目前为止没有成功。我至少是否正确地处理了这个问题?我有缺少的考虑吗?
感谢您的快速响应。我观察到dd.from_delayed(dfs)立即评估'dfs [0]'以提取元数据。出于某种原因,这对我造成了问题。有没有另一种方式推迟评估,直到图形完全构建完成?我会尝试放置一个repro。 –
您可以为'meta ='关键字提供一个示例数据框。我会在答案中添加一个例子。 – MRocklin