2017-05-05 28 views
0

这是我的previous question的后续操作,关于ETL过程中经常使用的模式。Luigi任务不会触发ETL过程的要求

今天,我写的机器学习工作正在运行。我下载需要的输入文件,学习和预测事情,输出一个.csv文件,然后将其复制到数据库中。

但是,由于这是投入生产,我需要自动化所有这些过程。所需的输入文件将每个月(并最终更频繁)到达提供商的S3存储桶。作为一个例子,我试图在Luigi中实现这一点,但将S3更改为本地目录,所以事情会更简单。这个程序应该

  • 关注新文件
  • 当一些新的文件中找到输入目录,由Transform任务(使用algorithm功能),它由Extract任务提取数据目录
  • 过程
  • 它加载到PostgreSQL数据库的Load任务

import glob 

import luigi 
from luigi.contrib import postgres 
import pandas as pd 


class ReadFile(luigi.ExternalTask): 
    # Simply load the new file from input directory 
    filename = luigi.Parameter() 
    def output(self): 
     return luigi.hdfs.LocalTarget('input/' + self.filename) 


class Extract(luigi.Task): 
    # Extract from input directory and put in the data directory 
    filename = luigi.Parameter() 
    def requires(self): 
     return ReadFile(self.filename) 
    def output(self): 
     return luigi.hdfs.LocalTarget('data/' + self.filename) 
    def run(self): 
     with self.input().open('r') as input_file: 
      data = input_file.read() 
     with self.output().open('w') as output_file: 
      write(output_file, data) 


class Transform(luigi.Task): 
    # Transform the file from data directory using the transform function 
    filename = luigi.Parameter() 
    def requires(self): 
     return Extract(self.filename) 
    def output(self, filename): 
     return luigi.hdfs.LocalTarget('results/' + self.filename) 
    def run(self): 
     with self.input().open('r') as input_file: 
      data = input_file.read() 
     result = trasnform(data) 
     with self.output().open('w') as output_file: 
      result.to_csv(output_file) 
     mark_as_done(self.filename) 


class Load(luigi.Task): 
    # Find new files, run the Transform function and load into the PostgreSQL DB 
    date = luigi.DateParameter() 
    def requires(self): 
     return [Transform(filename) for filename in new_files('input/')] 
    def output(self): 
     return postgres.PostgresTarget(host='db', database='luigi', user='luigi', password='luigi', table='test', update_id=self.date) 
    def run(self): 
     for input in self.input(): 
      with input.open('r') as inputfile: 
       result = pd.read_csv(inputfile) 
      connection = self.output().connect() 
      for row in result.itertuples(): 
       cursor = connection.cursor() 
       cursor.execute('INSERT INTO test VALUES (?,?)', row) 

# Get connection to the SQLite DB, which will store the files that were already processed 
SQLITE_CONNECTION = None 
def get_connection(): 
    if SQLITE_CONNECTION is None: 
     SQLITE_CONNECTION = sqlite3.connect('processed.db') 
    return SQLITE_CONNECTION 


# Mark filename as done in the SQLite DB 
def mark_as_done(filename): 
    connection = get_connection() 
    cursor = connection.cursor() 
    cursor.execute('INSERT INTO processed_files VALUES (?)', (filename,)) 


# Check of the file were already processed 
def new_file(filename): 
    connection = get_connection() 
    cursor = connection.cursor() 
    cursor.execute('SELECT * FROM processed_files WHERE file=%s', (filename,)) 
    return cursor.rowcount == 0 


# Yields filenames of files that were not processed yet 
def new_files(path): 
    for filename in glob.glob(path + '*.csv'): 
     if new_file(filename): 
      yield filename 


# Mock of the transform process 
def trasnform(data): 
    return pd.DataFrame({'a': [1,2,3], 'b': [1,2,3]}) 

问题:

  • 当我把一个文件输入目录,并触发负载任务,它不火的提取任务。我究竟做错了什么?
  • 负载是否每次都会使用这个update_id参数激发?
+0

为什么代码使用4个空格缩进并且没有显示为格式化代码? – prcastro

+0

因为它被假定为第4项中的块的一部分。 – wildplasser

+0

谢谢,那就解释一下吧 – prcastro

回答

0

Load任务只创建Transform任务时在result/目录中的文件。它不应该在input目录中寻找新文件吗?

+0

不是问题,还是没有解决起来的要求。在更改我使用的真实文件夹的名称时,这是一个错字 – prcastro