2015-12-03 33 views
6

我有一个python脚本,它具有以下功能: i。它需要一个输入文件的数据(通常是嵌套的JSON格式) ii。将数据逐行传递到另一个函数,该函数将数据操作为期望的格式 iii。最后它将输出写入文件。dask包不使用所有核心?备择方案?

这是我目前的简单的Python线,这是否......

def manipulate(line): 
    # a pure python function which transforms the data 
    # ... 
    return manipulated_json 

for line in f: 
    components.append(manipulate(ujson.loads(line))) 
    write_to_csv(components)` 

这工作,但与蟒蛇GIL将其限制在服务器上的一个核心,这是痛苦的缓慢,尤其是大量的数据。

我通常处理的数据量大约是4 gig gzip压缩,但偶尔我必须处理数百个gzip压缩的演出数据。它不一定是大数据,但仍然不能在内存中进行处理,而且Python的GIL处理非常缓慢。

在寻找优化数据处理的解决方案时,我遇到了dask。尽管当时PySpark似乎对我来说是明显的解决方案,但dask的承诺和简单性赢得了我的赞赏,我决定尝试一下。

经过对dask的大量研究以及如何使用它,我放了一个非常小的脚本来复制我当前的过程。该脚本是这样的:

import dask.bag as bag 
import json 
bag.from_filenames('input.json.gz').map(json.loads).map(lambda x:manipulate(x)).concat().to_dataframe().to_csv('output.csv.gz')` 

这工作,并产生相同的结果我原来的非DASK脚本,但它仍然仅在服务器上使用一个CPU。所以,它根本没有帮助。实际上,速度较慢。

我在做什么错?我错过了什么吗?我对dask还是比较陌生的,所以让我知道我是否忽略了某些事情,或者我应该做些什么不同的事情。

另外,有没有替代品dask使用服务器的全部容量(即所有的CPU),我需要做什么?

感谢,

牛逼

+0

嗯从来没有听说过'dask',真的有趣,谢谢。你看过盒子标准的'multiprocessing'吗?它很简单(istic),但它的工作原理。 –

+0

您可能想问[Blaze邮件列表](https://groups.google.com/a/continuum.io/forum/#!forum/blaze-dev)。 Dask相对较新且不断变化,从我所看到的情况来看,关于它的问题始终只有20个StackOverflow问题,因此可能没有很多人在这里看到您的问题并且知道足够的帮助。 – BrenBarn

+0

FWIW,我订阅了这个标签,所以总有人在看它。对于这些问题,Stackoverflow是一个很好的地方。 – MRocklin

回答

2

这里的问题是dask.dataframe.to_csv,这迫使你的单核心模式。

我推荐使用dask.bag来做你的阅读和操作,然后转储到一堆CSV文件并行。转储到许多CSV文件比转储到单个CSV文件更容易协调。

import dask.bag as bag 
import json 
b = bag.from_filenames('input.json.gz').map(json.loads).map(manipulate).concat() 
b.map(lambda t: ','.join(map(str, t)).to_textfiles('out.*.csv').compute() 

也有可能是一个问题,试图并行读取一个GZIP文件,但上面的应该让你开始。

+0

Thank @MRocklin!它没有工作。大声笑......但后来我将输入文件分成多个块,它工作。似乎它只使用与输入文件数量一样多的CPU。任何使这个功能动态化的计划,所以你可以传入一个输入文件和包,将它分开并在底层并行处理它? – tamjd1

+0

dask.bag现在这样做,只是不完美。一个可能的问题是GZIP对随机访问的支持不足。 – MRocklin

0

看起来袋子只和他们拥有的分区数量一样平行。

对于我来说,运行

mybag=bag.from_filenames(filename, chunkbytes=1e7) 
mybag.npartitions 

产生

这解决了这个问题,并提出了处理完全并行。

相关问题