2017-06-06 39 views
2

我正在dask构建一个非常大的DAG,以提交给分布式调度程序,其中节点在数据框上运行,这些数据框本身可能非常大。一种模式是我有大约50-60个函数来加载数据并构建每个数百MB的熊猫数据框(并且逻辑上表示单个表的分区)。我想将它们连接成图中下游节点的单个dask数据帧,同时最小化数据移动。我链接像这样的任务:Dask图形执行和内存使用

dfs = [dask.delayed(load_pandas)(i) for i in disjoint_set_of_dfs] 
dfs = [dask.delayed(pandas_to_dask)(df) for df in dfs] 
return dask.delayed(concat_all)(dfs) 

其中

def pandas_to_dask(df): 
    return dask.dataframe.from_pandas(df).to_delayed() 

,我曾尝试过各种concat_all implentations,但是这似乎是合理的:

def concat_all(dfs): 
    dfs = [dask.dataframe.from_delayed(df) for df in dfs] 
    return dask.dataframe.multi.concat(dfs, axis='index', join='inner') 

所有大熊猫dataframes位于不相连的索引和排序/单调。

但是,我遇到了在这个concat_all函数中死亡的工作人员(集群管理器正在杀死他们超出他们的内存预算),尽管每个内存的预算实际上相当大,我不希望它移动周围的数据。我有理由相信,在使用dask数据帧的图点节点中调用compute()之前,我总是切片到一个合理的数据子集。

我在玩--memory-limit目前为止没有成功。我至少是否正确地处理了这个问题?我有缺少的考虑吗?

回答

2

鉴于你的延迟值的列表计算,以大熊猫dataframes

>>> dfs = [dask.delayed(load_pandas)(i) for i in disjoint_set_of_dfs] 
>>> type(dfs[0].compute()) # just checking that this is true 
pandas.DataFrame 

它们传递到dask.dataframe.from_delayed功能

>>> ddf = dd.from_delayed(dfs) 

默认情况下,这会,以确定元数据运行的第一个计算(列名称,dtypes等,对dask.dataframe很重要)。您可以通过构建示例数据框并将其传递给meta=关键字来避免此情况。

>>> meta = pd.DataFrame({'value': [1.0], 'name': ['foo'], 'id': [0]}) 
>>> ddf = dd.from_delayed(dfs, meta=meta) 

这个example notebook也可能有帮助。

通常情况下,您将永远不需要从其他dask函数中调用dask函数(如通过延迟调用from_pandas所做的那样)。 Dask.dataframe函数本身已经很懒,不需要进一步延迟。

+0

感谢您的快速响应。我观察到dd.from_delayed(dfs)立即评估'dfs [0]'以提取元数据。出于某种原因,这对我造成了问题。有没有另一种方式推迟评估,直到图形完全构建完成?我会尝试放置一个repro。 –

+0

您可以为'meta ='关键字提供一个示例数据框。我会在答案中添加一个例子。 – MRocklin