2016-11-28 36 views
1

我有一些pickle文件,每个日期在2005年到2010年之间。每个文件都包含一个单词词典,它们各自的频率为该日期。我还有一个“主文件”,包含整个时期的所有独特词汇。总共有大约500万字。具有多个输入的luigi任务的体系结构

我需要获取所有数据并为每个单词生成一个CSV文件,每个日期将有一个CSV文件。例如,例如文件some_word.txt

2005-01-01,0.0003 
2005-01-02,0.00034 
2005-01-03,0.008 

我有麻烦组织与路易吉框架这一进程。我目前的顶级任务需要一个字,查找每个日期的关联频率并将结果存储在CSV文件中。我想我可以循环访问我的主文件中的每个单词并用该单词运行任务,但我估计这需要几个月,如果不是更长的话。这是我的顶级AggregateTokenFreqs任务的简化版本。

class AggregateTokenFreqs(luigi.Task): 
    word = luigi.Parameter() 

    def requires(self): 
     pass # not sure what to require here, master file? 

    def output(self): 
     return luigi.LocalTarget('data/{}.csv'.format(self.word)) 

    def run(self): 
     results = [] 
     for date_ in some_list_of_dates: 
      with open('pickles/{}.p'.format(date_), 'rb') as f: 
       freqs = pickle.load(f) 
       results.append((date_, freqs.get(self.word)) 

     # Write results list to output CSV file 
+1

您需要进行的处理是什么?例如,您是否计划在新的一天的数据到达时重新运行日常流程?如果你只需要运行一次,运行luigi可能没有意义。无论如何,你最好使用多处理技术。 – MattMcKnight

回答

0

@MattMcKnight说你可能会更好使用多处理。但是,如果您想要使用Luigi,您可以执行以下操作:

  • Luigi具有您配置的工人的概念。这是本地进程并行运行不同任务的数量。
  • 你可以模拟任务,而不是通过所有酱菜“循环”,传递一个单一的酱菜到任务(作为参数)。您必须将结果写入具有唯一名称的目录中的TSV。
  • 有一个循环,每个pickle(日期)创建一个任务。配置工人数量(即5)。这样你就可以同时处理5个文件。
  • 您将需要一个额外的任务,将所有单个CSV文件“加入”为一个。

希望这会有所帮助。

相关问题