2017-02-21 71 views
0

我有一个luigi预处理任务,将我的原始数据拆分为更小的文件。这些文件将被实际的管道处理。luigi相关性在运行时更改

所以关于参数,我想要求每个管道有一个预处理文件ID作为参数。但是,此文件ID仅在预处理步骤中生成,因此仅在运行时才知道。为了说明我的想法,我提供这个不工作的代码:

import luigi 
import subprocess 
import random 


class GenPipelineFiles(luigi.Task): 

    input_file = luigi.Parameter() 

    def requires(self): 
     pass 

    def output(self): 

     for i in range(random.randint(0,10)): 
      yield luigi.LocalTarget("output/{}_{}.txt".format(self.input_file, i)) 

    def run(self): 

     for iout in self.output: 
      command = "touch {}".format(iout.fname) 
      subprocess.call(command, shell=True) 


class RunPipelineOnSmallChunk(luigi.Task): 
    pass 


class Experiment(luigi.WrapperTask): 

    input_file = luigi.Parameter(default="ex") 

    def requires(self): 

     file_ids = GenPipelineFiles(input_file=self.input_file) 

     for file_id in file_ids: 
      yield RunPipelineOnSmallChunk(directory=self.input_file, file_id=file_id) 


luigi.run() 

的包装任务Experiment应该

  1. 第一,在某种程度上需要对原始数据的分割成文件

  2. 其次,需要使用获得的预处理文件ID的实际流水线。

GenPipelineFiles输出文件的随机数表明此不能被硬编码到Experimentrequires

可能与此问题有关的一个问题是,一个luigi任务正确地只有一个输入目标和一个输出目标。可能关于如何在GenPipelineFiles中建模多个输出的说明也可以解决该问题。

+0

你能解释一下你在这一点上所得到的错误? –

+0

luigi依赖关系图是基于'require'函数的返回而创建的。在这里,GePipelineFiles永远不会被返回,因此不会被安排。这段代码不是我的实际代码,从来不打算没有错误地运行。它只是为了说明依赖性问题 - 我面临的问题 –

回答

0

处理多个输出的一种简单方法是创建一个以输入文件命名的目录,并将输入文件从分割文件放到以输入文件命名的目录中。这样,依赖任务可以检查目录是否存在。假设我有一个输入文件123.txt,然后创建一个包含文件1.txt,2.txt,3.txt的目录123_split作为GenPipelineFiles的输出,然后使用1.txt,2.txt处理目录123_, 3.txt作为RunPipelineOnSmallChunk的输出。

对于您的requires方法Experiment,您必须以列表形式返回您想要运行的任务。你写的file_ids = GenPipelineFiles(input_file=self.input_file)的方式让我觉得该对象的run方法没有被调用,因为它没有被方法返回。

下面是一些示例代码,可以针对每个文件(但不是每个文件的任务)使用目标。我仍然认为让一个目录或者一个哨点文件的单个输出目标脱离某种类型以表明你已经完成是比较安全的。除非任务确保创建每个目标,否则原子性将丢失。

PYTHONPATH=. luigi --module sampletask RunPipelineOnSmallChunk --local-scheduler 

sampletask.py

import luigi 
import os 
import subprocess 
import random 


class GenPipelineFiles(luigi.Task): 

    inputfile = luigi.Parameter() 
    num_targets = random.randint(0,10) 

    def requires(self): 
     pass 

    def get_prefix(self): 
     return self.inputfile.split(".")[0] 

    def get_dir(self): 
     return "split_{}".format(self.get_prefix()) 

    def output(self): 
     targets = [] 
     for i in range(self.num_targets): 
      targets.append(luigi.LocalTarget(" {}/{}_{}.txt".format(self.get_dir(), self.get_prefix(), i))) 
     return targets 

    def run(self): 
     if not os.path.exists(self.get_dir()): 
      os.makedirs(self.get_dir()) 
     for iout in self.output(): 
      command = "touch {}".format(iout.path) 
      subprocess.call(command, shell=True) 


class RunPipelineOnSmallChunk(luigi.Task): 

    inputfile = luigi.Parameter(default="test") 

    def get_prefix(self): 
     return self.inputfile.split(".")[0] 

    def get_dir(self): 
     return "processed_{}".format(self.get_prefix()) 

    @staticmethod 
    def clean_input_path(path): 
     return path.replace("split", "processed") 

    def requires(self): 
     return GenPipelineFiles(self.inputfile) 

    def output(self): 
     targets = [] 
     for target in self.input(): 
      targets.append(luigi.LocalTarget(RunPipelineOnSmallChunk.clean_input_path(target.path))) 
     return targets 

    def run(self): 
     if not os.path.exists(self.get_dir()): 
      os.makedirs(self.get_dir()) 
     for iout in self.output(): 
      command = "touch {}".format(iout.path) 
      subprocess.call(command, shell=True) 
+0

感谢您的回答,我认为在您的提案中有两件事情不是最优的,但也许我错误地理解了您的答案:1.如果'RunPipelineOnSmallChunk'需要整体上述任务的目录中,RunPipelineOnSmallChunk任务不是原子的,而是必须循环遍历所有文件才能执行必要的计算。 2。如果我在'Experiment'中需要两种类型的任务,而这些任务本身需要相互依赖,Luigi调度器会不必要地尝试多次运行子任务。 –

+0

对于2,我会让实验要求'RunPipelineOnSmallChunk'和'RunPipelineOnSmallChunk'需要'GenPipelineFiles'。 – MattMcKnight

+0

在那里添加了一个修改后的代码版本 – MattMcKnight