这是我的previous question的后续操作,关于ETL过程中经常使用的模式。Luigi任务不会触发ETL过程的要求
今天,我写的机器学习工作正在运行。我下载需要的输入文件,学习和预测事情,输出一个.csv文件,然后将其复制到数据库中。
但是,由于这是投入生产,我需要自动化所有这些过程。所需的输入文件将每个月(并最终更频繁)到达提供商的S3存储桶。作为一个例子,我试图在Luigi中实现这一点,但将S3更改为本地目录,所以事情会更简单。这个程序应该
- 关注新文件
- 当一些新的文件中找到输入目录,由
Transform
任务(使用algorithm
功能),它由Extract
任务提取数据目录 - 过程
- 它加载到PostgreSQL数据库的
Load
任务
个
import glob
import luigi
from luigi.contrib import postgres
import pandas as pd
class ReadFile(luigi.ExternalTask):
# Simply load the new file from input directory
filename = luigi.Parameter()
def output(self):
return luigi.hdfs.LocalTarget('input/' + self.filename)
class Extract(luigi.Task):
# Extract from input directory and put in the data directory
filename = luigi.Parameter()
def requires(self):
return ReadFile(self.filename)
def output(self):
return luigi.hdfs.LocalTarget('data/' + self.filename)
def run(self):
with self.input().open('r') as input_file:
data = input_file.read()
with self.output().open('w') as output_file:
write(output_file, data)
class Transform(luigi.Task):
# Transform the file from data directory using the transform function
filename = luigi.Parameter()
def requires(self):
return Extract(self.filename)
def output(self, filename):
return luigi.hdfs.LocalTarget('results/' + self.filename)
def run(self):
with self.input().open('r') as input_file:
data = input_file.read()
result = trasnform(data)
with self.output().open('w') as output_file:
result.to_csv(output_file)
mark_as_done(self.filename)
class Load(luigi.Task):
# Find new files, run the Transform function and load into the PostgreSQL DB
date = luigi.DateParameter()
def requires(self):
return [Transform(filename) for filename in new_files('input/')]
def output(self):
return postgres.PostgresTarget(host='db', database='luigi', user='luigi', password='luigi', table='test', update_id=self.date)
def run(self):
for input in self.input():
with input.open('r') as inputfile:
result = pd.read_csv(inputfile)
connection = self.output().connect()
for row in result.itertuples():
cursor = connection.cursor()
cursor.execute('INSERT INTO test VALUES (?,?)', row)
# Get connection to the SQLite DB, which will store the files that were already processed
SQLITE_CONNECTION = None
def get_connection():
if SQLITE_CONNECTION is None:
SQLITE_CONNECTION = sqlite3.connect('processed.db')
return SQLITE_CONNECTION
# Mark filename as done in the SQLite DB
def mark_as_done(filename):
connection = get_connection()
cursor = connection.cursor()
cursor.execute('INSERT INTO processed_files VALUES (?)', (filename,))
# Check of the file were already processed
def new_file(filename):
connection = get_connection()
cursor = connection.cursor()
cursor.execute('SELECT * FROM processed_files WHERE file=%s', (filename,))
return cursor.rowcount == 0
# Yields filenames of files that were not processed yet
def new_files(path):
for filename in glob.glob(path + '*.csv'):
if new_file(filename):
yield filename
# Mock of the transform process
def trasnform(data):
return pd.DataFrame({'a': [1,2,3], 'b': [1,2,3]})
问题:
- 当我把一个文件输入目录,并触发负载任务,它不火的提取任务。我究竟做错了什么?
- 负载是否每次都会使用这个
update_id
参数激发?
为什么代码使用4个空格缩进并且没有显示为格式化代码? – prcastro
因为它被假定为第4项中的块的一部分。 – wildplasser
谢谢,那就解释一下吧 – prcastro