我是Ruffus的开发者。我不确定我完全理解你正在尝试做什么,但这里有:
等待工作需要完成不同的时间才能完成管道的下一个阶段,这正是Ruffus所做的工作这有希望是直截了当的。
第一个问题是你知道哪些文件是在前面创建的,即在管道运行之前?让我们先假设你做。
from ruffus import *
filenames = ["one.file", "two.file", "three.file"]
让我们编写一个虚拟函数,它在每次调用时创建一个文件。在Ruffus中,任何输入和输出文件名都分别包含在前两个参数中。我们没有输入文件名,所以我们的函数调用应该是这样的:
create_file(None, "one.file")
create_file(None, "two.file")
create_file(None, "three.file")
create_file的定义是这样的:
@files([(None, fn) for fn in filenames])
def create_file(no_input_file_name, output_file_name):
open(output_file_name, "w").write("dummy file")
这些文件中的每一个将在3次独立的呼叫创建到create_file。如果你愿意,这些可以并行运行。
pipeline_run([create_file], multiprocess = 5)
现在来合并文件。 “@Merge”装饰器确实是为此设置的。我们只需要它链接到以前的功能:
@merge(create_file, "merge.file")
def merge_file(input_file_names, output_file_name):
output_file = open(output_file_name, "w")
for i in input_file_names:
output_file.write(open(i).read())
这只会调用merge_file当所有的文件都是从三个电话准备create_file()。
整个代码如下:
from ruffus import *
filenames = ["one.file", "two.file", "three.file"]
from random import randint
from time import sleep
@files([(None, fn) for fn in filenames])
def create_file(no_input_file_name, output_file_name):
# simulate create file process of indeterminate complexity
sleep(randint(1,5))
open(output_file_name, "w").write("dummy file")
@merge(create_file, "merge.file")
def merge_file(input_file_names, output_file_name):
output_file = open(output_file_name, "w")
for i in input_file_names:
output_file.write(open(i).read())
pipeline_run([merge_file], multiprocess = 5)
而这是结果:
>>> pipeline_run([merge_file], multiprocess = 5)
Job = [None -> two.file] completed
Job = [None -> three.file] completed
Job = [None -> one.file] completed
Completed Task = create_file
Job = [[one.file, three.file, two.file] -> merge.file] completed
Completed Task = merge_file