我有一个luigi
预处理任务,将我的原始数据拆分为更小的文件。这些文件将被实际的管道处理。luigi相关性在运行时更改
所以关于参数,我想要求每个管道有一个预处理文件ID作为参数。但是,此文件ID仅在预处理步骤中生成,因此仅在运行时才知道。为了说明我的想法,我提供这个不工作的代码:
import luigi
import subprocess
import random
class GenPipelineFiles(luigi.Task):
input_file = luigi.Parameter()
def requires(self):
pass
def output(self):
for i in range(random.randint(0,10)):
yield luigi.LocalTarget("output/{}_{}.txt".format(self.input_file, i))
def run(self):
for iout in self.output:
command = "touch {}".format(iout.fname)
subprocess.call(command, shell=True)
class RunPipelineOnSmallChunk(luigi.Task):
pass
class Experiment(luigi.WrapperTask):
input_file = luigi.Parameter(default="ex")
def requires(self):
file_ids = GenPipelineFiles(input_file=self.input_file)
for file_id in file_ids:
yield RunPipelineOnSmallChunk(directory=self.input_file, file_id=file_id)
luigi.run()
的包装任务Experiment
应该
第一,在某种程度上需要对原始数据的分割成文件
其次,需要使用获得的预处理文件ID的实际流水线。
在GenPipelineFiles
输出文件的随机数表明此不能被硬编码到Experiment
的requires
。
可能与此问题有关的一个问题是,一个luigi
任务正确地只有一个输入目标和一个输出目标。可能关于如何在GenPipelineFiles
中建模多个输出的说明也可以解决该问题。
你能解释一下你在这一点上所得到的错误? –
luigi依赖关系图是基于'require'函数的返回而创建的。在这里,GePipelineFiles永远不会被返回,因此不会被安排。这段代码不是我的实际代码,从来不打算没有错误地运行。它只是为了说明依赖性问题 - 我面临的问题 –