2017-06-28 32 views
2

我试图在Dask中找到相当于Spark的unpersist。我一个明确的unpersist需要 出现在所处的环境:如何在Dask中实现没有数据重复的增量缓存?

  • 调用上下文已经已经持续大df,例如,因为它需要 计算许多聚集了预处理的目的。
  • 调用上下文调用一个函数,该函数还需要调用persist,例如, ,因为它执行一些迭代算法。

一个基本的例子看起来像:

def iterative_algorithm(df, num_iterations): 

    for iteration in range(num_iterations): 

     # Transformation logic requiring e.g. map_partitions 
     def mapper(df): 
      # ... 
      return df 

     df = df.map_partitions(mapper) 
     df = df.persist() 
     # Now I would like to explicitly unpersist the old snapshot 

    return df 

火花,这个问题可以通过释放旧的快照 明确解决。显然Dask没有明确的unpersist,但通过引用计算潜在期货来处理问题 。这意味着上面的示例 将复制数据,因为调用上下文对旧期货持有参考 ,而子函数保留对修改后的 的引用。在我的实际使用情况中,有几个这样的变换调用的嵌套级别,导致数据甚至多次复制。

有没有办法解决迭代缓存没有任何额外的副本?

回答

2

我会发表一些想法如何解决这个问题,但我仍然在寻找更好的 替代品。

由于引用计数,避免副本很复杂,但有 是可能的。这个问题是由于调用者持有参考 到原来的df,并且子功能通过df = df.<method> 调用创建新实例。为了解决这个问题,我们必须提出df本身 可变。不幸的是,Python通常不允许改变函数参数的引用 。

解决方案1:朴素可变引用

最简单的方式工作,周围的限制是对df包装成一个列表 或字典。在这种情况下,子功能可以修改外部参考,例如,由:

df_list[0] = df_list[0].map_partitions(mapper) 
df_list[0] = df_list[0].persist() 

然而,这在语法上是尴尬,人们必须非常小心,因为 简化的语法通过df = df_list[0]再创到 标的的期货,这可能会导致数据重复的新引用。

解决方案2:基于包装-可变引用

改进上,人们可以写一个小包装类,其中包含一个参考 到数据帧。通过这个包装,子函数可以改变参考文献 。为了改进语法问题,可以考虑包装器 应该自动地将功能委托给数据帧还是从中继承 。总的来说,这个解决方案也不太合适。

解决方案3:明确突变

为了避免其他的解决方案,我现在更喜欢以下 变种,通过原有的df的就地改造,有效地模拟了map_partitionspersist可变版本的语法问题实例。

def modify_inplace(old_df, new_df): 
    # Currently requires accessing private fields of a DataFrame, but 
    # maybe this could be officially supported by Dask. 
    old_df.dask = new_df.dask 
    old_df._meta = new_df._meta 
    old_df._name = new_df._name 
    old_df.divisions = new_df.divisions 


def iterative_algorithm(df, num_iterations): 

    for iteration in range(num_iterations): 

     def mapper(df): 
      # Actual transform logic... 
      return df 

     # Simulate mutable/in-place map_partitions 
     new_df = df.map_partitions(mapper) 
     modify_inplace(df, new_df) 

     # Simulate mutable/in-place persist 
     new_df = df.persist() 
     modify_inplace(df, new_df) 

    # Technically no need to return, because all operations were in-place 
    return df 

这工作得相当好,对我来说,需要仔细遵循以下规则:

  • 通过上面的图案更换所有不可变的电话像df = df.<method>
  • 注意创建对df的引用。例如,在调用persist之前,使用像some_col = df["some_sol"]这样的变量在语法上的方便性需要del some_col。否则,以some_col存储的引用将再次导致数据重复。
+1

你会想在你的'modify_inplace'函数中加入'df.divisions' – MRocklin

2

如下你可以写一个释放功能:

from distributed.client import futures_of 

def release(collection): 
    for future in futures_of(collection): 
     future.release() 

这只会释放当前实例。如果您有这些期货躺在附近的多个实例,您可能需要调用它几次或添加一个循环像下面这样:

while future.client.refcount[future.key] > 0: 

但一般调用此多次似乎情况不明智你有其他的拷贝流传合理。