2016-05-16 121 views
1

我有一堆python文件,为各种数据源做端到端的文件处理。Luigi每天运行熊猫脚本?

例如,survey.py会用熊猫读取文件,添加列,重命名内容,做一些计算,然后将修改的文件保存回磁盘。

driver.py将遵循该文件的相同过程等。对于几十个文件,这种结构正在发生。

然后我有一个名为process_all的文件,它基本上只是以某种顺序运行每个python文件(某些文件依赖于其他文件)。经过进一步的研究,我偶然发现了一个名为luigi的图书馆,看起来如果我需要扩大规模,它似乎更加强大地完成了同样的任务。

问题:我是否继续编写独立的.py文件来处理数据,还是我将所有这些东西放在我的类中luigi?我假设每个原始文件都有一个类将被处理,对吗?

或者我可以写出luigi类中的所有处理步骤,例如下面的CleanDriver类?这似乎会变得非常冗长,因为很多这些单独的代码文件都是20-50行,而且有很多。

class CleanSurvey(luigi.Task): 
    date_interval = luigi.DateIntervalParameter() 

    def run(self): 
     os.system('../py_files/run_ftp.py') 

    def output(self): 
     return luigi.LocalTarget(path + 'test_survey.csv') 

class CleanDriver(luigi.Task): 
    date_interval = luigi.DateIntervalParameter() 

    def run(self): 
     df = pd.read_csv('../file.csv') 
     df['col5'] = do stuff 
     df.rename(columns={:}, inplace=True) 

    def output(self): 
     return luigi.LocalTarget(path + 'test_driver.csv') 

if __name__ == '__main__': 
    luigi.run 
+0

哦,顺便说一句。我从来没有在我的任何代码中使用过类,所以我不习惯它。 – trench

回答

1

我会建议你在进入这个之前学习object orientation in Python。与Luigi合作会有帮助。

Luigi的确帮助创建数据处理管道。管道由任务组成。 (这大概是每一步的一类)。

关于如何将这些步骤包装到类中的问题。因人而异。我喜欢遵循单一责任原则让我的Luigi班上课。这是每个任务做一件事,但是正确的。未来的调试和更改也更容易。

最后在你提到的例子中,你有没有requires()的类。这个函数非常重要,因为它允许您将一个任务的输出提供给另一个任务,从而创建适当的依赖关系树。

+2

谢谢,我通读了链接。我非常习惯将文件读入内存(熊猫),然后对其进行处理,如添加/删除/重命名列,执行计算,聚合或连接数据等。所有这些事情都需要在文件仍然存在时完成在内存中,每个文件都会有所不同。您是否遇到过使用类来清理文件或执行大量文件IO操作的教程?有些东西看起来像是需要明确的(例如将一个文件的id_col重命名为employee_id,将另一个文件的id_col重命名为customer_id等) – trench

+1

我通常将管道本身和业务逻辑分开,所以我建议您另一个带'do_stuff()'函数的python文件,然后导入它并在luigi任务中运行。这样您就不会混淆数据管道逻辑和业务逻辑,并且会尽可能地保持任务。 –