2017-07-26 48 views
2

我希望提交一个DASK的任务,将做到以下几点:DASK计算与子期货

  1. 使用dask.bag(def fakejob
  2. 计算图表建立一个懒DASK图表从1和保存它要实木复合地板(留下这部分,只是一个动机)

我需要为多个输入做到这一点,所以我一直在尝试使用dask.distributed的期货功能。

from dask.distributed import Client 

client = Client(processes=True) 

def fakejob(path): 
    return (
     dask.bag 
     .read_text(path) 
     .to_dataframe() 
    ) 

futures = client.map(fakejob, [input_path1, input_path2]) 

问题是我不断收到:AssertionError: daemonic processes are not allowed to have children

我试过以下this link并结束了第二个版本(均不同于第一1号线),但期货留“待定”永远。

from dask.distributed import Client 

client = Client(processes=True) 

def fakejob(path): 
    with dask.set_options(get=client.get): 
     return (
      dask.bag 
      .read_text(path) 
      .to_dataframe() 
     ) 

futures = client.map(fakejob, [input_path1, input_path2]) 

任何关于如何做到这一点的线索?

干杯。

回答

2

奇怪的和稍微巨大的错误信息来自于尝试构建工作进程内的dask图(这是一个包),如果用client.map调用,事情最终会结束。如果您可以将整个工作流程放在该功能中,包括写入实木复合地板,并且不尝试将包裹传递给呼叫者,则您的第二次尝试将与本地客户端一起工作。

解决方案更简单。

bags = [dask.bag.read_text(path) 
     .to_dataframe() for path in [input_path1, input_path2]) 
futures = client.compute(bags) # run in background on the cluster 
client.gather(futures) # wait and get results 

这里,bags是DASK袋,即工作任务定义的列表,但还没有运行。您可以用dask.compute(*bags)取代最后两行,以获得结果而不用担心期货。

+0

嗨!感谢您的回复:) 您发布的解决方案确实更简单,但我认为它不能解决我的问题。 'bags = ...'中的列表理解将按顺序构建每个图形,对吗?我实际上希望图的构建也是平行的。换句话说,我希望列表理解能够并行发生。 有道理,还是我弄错了什么? –

+0

正确,列表的建立是在本地线程中的串行 - 但它对你来说很慢吗?我认为唯一的选择是使用'dask.delayed'和'db.from_delayed',但是这是包在内部在字节块上做的事情;花时间唯一需要的是找到每个文件的大小。 – mdurant

+0

我明白了。实际上需要一段时间。我正在数据科学团队办公室与dask一起进行实验。我使用'dask.bag'来解析大约100GB /天的json数据,并将其转换为parquet进行非常具体的分析项目。它一直运行良好,但大约需要10分钟才能构建图的惰性对象。我们计划进一步发展,这种开销可能是未来的一个问题。现在你的建议是完美的。谢谢! –