1

我正在使用ruffus编写管道。我有一个并行调用的函数,它会创建多个文件。我想创建一个函数“combineFiles()”,在完成所有这些文件后调用它。由于它们在群集上并行运行,它们不会一起完成。我写了一个函数'getFilenames()',它返回需要创建的一组文件名,但我怎么才能让combineFiles()等待它们在那里呢?要求在Ruffus管道中运行函数之前创建一组文件

我试过如下:

@pipelineFunction 
@files(getFilenames) 
def combineFiles(filenames): 
    # I should only be called if every file in the list 'filenames' exists 

我也试过装饰:

@merge(getFilenames) 

但这也不管用。在由getFilenames给出的文件被创建之前,combineFiles仍然会被错误地调用。如何使那些文件在那里的combineFiles有条件?

谢谢。

回答

2

我是Ruffus的开发者。我不确定我完全理解你正在尝试做什么,但这里有:

等待工作需要完成不同的时间才能完成管道的下一个阶段,这正是Ruffus所做的工作这有希望是直截了当的。

第一个问题是你知道哪些文件是在前面创建的,即在管道运行之前?让我们先假设你做。

from ruffus import * 
filenames = ["one.file", "two.file", "three.file"] 

让我们编写一个虚拟函数,它在每次调用时创建一个文件。在Ruffus中,任何输入和输出文件名都分别包含在前两个参数中。我们没有输入文件名,所以我们的函数调用应该是这样的:

create_file(None, "one.file") 
create_file(None, "two.file") 
create_file(None, "three.file") 

create_file的定义是这样的:

@files([(None, fn) for fn in filenames]) 
def create_file(no_input_file_name, output_file_name): 
    open(output_file_name, "w").write("dummy file") 

这些文件中的每一个将在3次独立的呼叫创建到create_file。如果你愿意,这些可以并行运行。

pipeline_run([create_file], multiprocess = 5) 

现在来合并文件。 “@Merge”装饰器确实是为此设置的。我们只需要它链接到以前的功能:

@merge(create_file, "merge.file") 
def merge_file(input_file_names, output_file_name): 
    output_file = open(output_file_name, "w") 
    for i in input_file_names: 
     output_file.write(open(i).read()) 

这只会调用merge_file当所有的文件都是从三个电话准备create_file()。

整个代码如下:

from ruffus import * 
filenames = ["one.file", "two.file", "three.file"] 

from random import randint 
from time import sleep 

@files([(None, fn) for fn in filenames]) 
def create_file(no_input_file_name, output_file_name): 
    # simulate create file process of indeterminate complexity 
    sleep(randint(1,5)) 
    open(output_file_name, "w").write("dummy file") 

@merge(create_file, "merge.file") 
def merge_file(input_file_names, output_file_name): 
    output_file = open(output_file_name, "w") 
    for i in input_file_names: 
     output_file.write(open(i).read()) 


pipeline_run([merge_file], multiprocess = 5) 

而这是结果:

>>> pipeline_run([merge_file], multiprocess = 5) 

    Job = [None -> two.file] completed 
    Job = [None -> three.file] completed 
    Job = [None -> one.file] completed 
Completed Task = create_file 
    Job = [[one.file, three.file, two.file] -> merge.file] completed 
Completed Task = merge_file 
相关问题