2017-02-21 24 views
1

我DASK据帧具有约120mm行4列:错误而导出DASK数据帧到csv

df_final.dtypes 

cust_id  int64 
score   float64 
total_qty  float64 
update_score float64 
dtype: object 

和我做对连接到Linux机器jupyter笔记本这个操作:

%time df_final.to_csv('/path/claritin-files-*.csv') 

,并抛出了这个错误:

--------------------------------------------------------------------------- 
ValueError        Traceback (most recent call last) 
<ipython-input-24-46468ae45023> in <module>() 
----> 1 get_ipython().magic(u"time df_final.to_csv('path/claritin-files-*.csv')") 

/home/mspra/anaconda2/lib/python2.7/site-packages/IPython/core/interactiveshell.pyc in magic(self, arg_s) 
    2334   magic_name, _, magic_arg_s = arg_s.partition(' ') 
    2335   magic_name = magic_name.lstrip(prefilter.ESC_MAGIC) 
-> 2336   return self.run_line_magic(magic_name, magic_arg_s) 
    2337 
    2338  #------------------------------------------------------------------------- 

/home/mspra/anaconda2/lib/python2.7/site-packages/IPython/core/interactiveshell.pyc in run_line_magic(self, magic_name, line) 
    2255     kwargs['local_ns'] = sys._getframe(stack_depth).f_locals 
    2256    with self.builtin_trap: 
-> 2257     result = fn(*args,**kwargs) 
    2258    return result 
    2259 

/home/mspra/anaconda2/lib/python2.7/site-packages/IPython/core/magics/execution.pyc in time(self, line, cell, local_ns) 

/home/mspra/anaconda2/lib/python2.7/site-packages/IPython/core/magic.pyc in <lambda>(f, *a, **k) 
    191  **# but it's overkill for just that one bit of state.** 
    192  def magic_deco(arg): 
--> 193   call = lambda f, *a, **k: f(*a, **k) 
    194 
    195   if callable(arg): 

/home/mspra/anaconda2/lib/python2.7/site-packages/IPython/core/magics/execution.pyc in time(self, line, cell, local_ns) 
    1161   if mode=='eval': 
    1162    st = clock2() 
-> 1163    out = eval(code, glob, local_ns) 
    1164    end = clock2() 
    1165   else: 

<timed eval> in <module>() 

/home/mspra/anaconda2/lib/python2.7/site-packages/dask/dataframe/core.pyc in to_csv(self, filename, **kwargs) 
    936   """ See dd.to_csv docstring for more information """ 
    937   from .io import to_csv 
--> 938   return to_csv(self, filename, **kwargs) 
    939 
    940  def to_delayed(self): 

/home/mspra/anaconda2/lib/python2.7/site-packages/dask/dataframe/io/csv.pyc in to_csv(df, filename, name_function, compression, compute, get, **kwargs) 
    411  if compute: 
    412   from dask import compute 
--> 413   compute(*values, get=get) 
    414  else: 
    415   return values 

/home/mspra/anaconda2/lib/python2.7/site-packages/dask/base.pyc in compute(*args, **kwargs) 
    177   dsk = merge(var.dask for var in variables) 
    178  keys = [var._keys() for var in variables] 
--> 179  results = get(dsk, keys, **kwargs) 
    180 
    181  results_iter = iter(results) 

/home/mspra/anaconda2/lib/python2.7/site-packages/dask/threaded.pyc in get(dsk, result, cache, num_workers, **kwargs) 
    74  results = get_async(pool.apply_async, len(pool._pool), dsk, result, 
    75       cache=cache, get_id=_thread_get_id, 
---> 76       **kwargs) 
    77 
    78  # Cleanup pools associated to dead threads 

/home/mspra/anaconda2/lib/python2.7/site-packages/dask/async.pyc in get_async(apply_async, num_workers, dsk, result, cache, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, dumps, loads, **kwargs) 
    491      _execute_task(task, data) # Re-execute locally 
    492     else: 
--> 493      raise(remote_exception(res, tb)) 
    494    state['cache'][key] = res 
    495    finish_task(dsk, key, state, results, keyorder.get) 

**ValueError: invalid literal for long() with base 10: 'total_qty'** 

Traceback 
--------- 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/dask/async.py", line 268, in execute_task 
    result = _execute_task(task, data) 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/dask/async.py", line 249, in _execute_task 
    return func(*args2) 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/dask/dataframe/io/csv.py", line 55, in pandas_read_text 
    coerce_dtypes(df, dtypes) 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/dask/dataframe/io/csv.py", line 83, in coerce_dtypes 
    df[c] = df[c].astype(dtypes[c]) 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/core/generic.py", line 3054, in astype 
    raise_on_error=raise_on_error, **kwargs) 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 3189, in astype 
    return self.apply('astype', dtype=dtype, **kwargs) 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 3056, in apply 
    applied = getattr(b, f)(**kwargs) 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 461, in astype 
    values=values, **kwargs) 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 504, in _astype 
    values = _astype_nansafe(values.ravel(), dtype, copy=True) 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/types/cast.py", line 534, in _astype_nansafe 
    return lib.astype_intsafe(arr.ravel(), dtype).reshape(arr.shape) 
    File "pandas/lib.pyx", line 980, in pandas.lib.astype_intsafe (pandas/lib.c:17409) 
    File "pandas/src/util.pxd", line 93, in util.set_value_at_unsafe (pandas/lib.c:72777) 

我有几个问题:

1)首先这个导出在周五工作正常,它吐出了100个csv文件(因为它有100个分区),我后来汇总了这些文件。那么今天出了什么问题 - 来自错误日志的任何内容?

2)可能是这个问题是这个包的创建者,从这个尺寸的dask数据帧中获取csv提取的最省时的方法是什么,因为它需要大约1.5到2个小时,最后一次工作。

我没有使用dask分布式,这是在linux集群的单核上。

回答

1

这个错误很可能与to_csv很少有关,并且与计算中的其他内容有关。拨打df.to_csv只是您第一次强制计算滚动浏览所有数据。

鉴于此错误,我实际上怀疑这是read_csv中的失败。 Dask.dataframe读取您的第一个文件的前几百千字节以猜测数据类型,但它似乎猜错了。您可能想要尝试在read_csv调用中明确指定dtypes。

关于快速写入CSV的第二个问题,我的第一个答案是“使用Parquet或HDF5”。几乎在每个方面它们都更快更准确。

+0

谢谢!!,是啊怀疑早些时候,因为我正在从csv格式读取数据帧。不知道为什么它没有正确阅读。关于你对第二个问题的建议,是否以镶木地板格式进行阅读和写作(我熟悉实木复合地板)。 –

+0

一个常见原因是整数列有一些缺失值,所以熊猫决定它需要在中途使用浮动。我不明白你对木地板的评论。 – MRocklin

+0

我的意思是,当你说使用实木复合地板或HDF5时,你的意思是读取地板文件转换为dask数据框,然后写入实木复合地板格式而不是csv格式?如果我使用dask分布在一组机器上,csv文件可以更快地导出(我的数据框是130 mm x 4列)? –