2016-08-08 25 views
2

我能够按列,使用此代码离散一个大熊猫数据帧:如何通过与熊猫/ DASK可变箱列离散的大数据帧

import numpy as np 
import pandas as pd 

def discretize(X, n_scale=1): 

    for c in X.columns: 
     loc = X[c].median() 

     # median absolute deviation of the column 
     scale = mad(X[c]) 

     bins = [-np.inf, loc - (scale * n_scale), 
       loc + (scale * n_scale), np.inf] 
     X[c] = pd.cut(X[c], bins, labels=[-1, 0, 1]) 

    return X 

我想用的参数离散每一列:LOC(列的中位数)和比例(列的median absolute deviation)。

对于小数据框,所需的时间是可以接受的(因为它是单线程解决方案)。

但是,对于较大的数据框,我想利用更多线程(或进程)来加速计算。

我不是Dask的专家,它应该为这个问题提供解决方案。

然而,在我的情况下,离散化应与代码是可行的:

import dask.dataframe as dd 
import numpy as np 
import pandas as pd 

def discretize(X, n_scale=1): 

    # I'm using only 2 partitions for this example 
    X_dask = dd.from_pandas(X, npartitions=2) 

    # FIXME: 
    # how can I define bins to compute loc and scale 
    # for each column? 
    bins = [-np.inf, loc - (scale * n_scale), 
      loc + (scale * n_scale), np.inf] 

    X = X_dask.apply(pd.cut, axis=1, args=(bins,), labels=[-1, 0, 1]).compute() 

    return X 

但这里的问题是,locscale取决于列值,所以它们应该被计算为每一列,无论是在申请之前或期间。

怎么办?

回答

1

我从来没有用过dask,但我想你可以定义一个新的函数用于apply

import dask.dataframe as dd 
import multiprocessing as mp 
import numpy as np 
import pandas as pd 

def discretize(X, n_scale=1): 

    X_dask = dd.from_pandas(X.T, npartitions=mp.cpu_count()+1) 
    X = X_dask.apply(_discretize_series, 
        axis=1, args=(n_scale,), 
        columns=X.columns).compute().T 

    return X 

def _discretize_series(x, n_scale=1): 

    loc = x.median() 
    scale = mad(x) 
    bins = [-np.inf, loc - (scale * n_scale), 
      loc + (scale * n_scale), np.inf] 
    x = pd.cut(x, bins, labels=[-1, 0, 1]) 

    return x 
+0

谢谢。我使用工作解决方案编辑了您的问题。如果您认为它足够,请接受它。 – gc5