2015-10-25 34 views
9

我的初始文件位于AWS S3。有人能指出我需要如何在Luigi Task中设置这个吗?Luigi Pipeline in S3

我回顾了文档,发现luigi.S3但我不清楚该怎么做,然后我在网上搜索,只得到从mortar-luigi链接和luigi顶部的实现。

UPDATE

遵循为@matagus的例子后(我创建的~/.boto文件的建议太):

# coding: utf-8 

import luigi 

from luigi.s3 import S3Target, S3Client 

class MyS3File(luigi.ExternalTask): 
    def output(self): 
     return S3Target('s3://my-bucket/19170205.txt') 

class ProcessS3File(luigi.Task): 

    def requieres(self): 
     return MyS3File() 

    def output(self): 
     return luigi.LocalTarget('/tmp/resultado.txt') 

    def run(self): 
     result = None 

     for input in self.input(): 
      print("Doing something ...") 
      with input.open('r') as f: 
       for line in f: 
        result = 'This is a line' 

     if result: 
      out_file = self.output().open('w') 
      out_file.write(result) 

当我执行它什么也不会发生

DEBUG: Checking if ProcessS3File() is complete 
INFO: Informed scheduler that task ProcessS3File() has status PENDING 
INFO: Done scheduling tasks 
INFO: Running Worker with 1 processes 
DEBUG: Asking scheduler for work... 
DEBUG: Pending tasks: 1 
INFO: [pid 21171] Worker Worker(salt=226574718, workers=1, host=heliodromus, username=nanounanue, pid=21171) running ProcessS3File() 
INFO: [pid 21171] Worker Worker(salt=226574718, workers=1, host=heliodromus, username=nanounanue, pid=21171) done  ProcessS3File() 
DEBUG: 1 running tasks, waiting for next task to finish 
INFO: Informed scheduler that task ProcessS3File() has status DONE 
DEBUG: Asking scheduler for work... 
INFO: Done 
INFO: There are no more tasks to run at this time 
INFO: Worker Worker(salt=226574718, workers=1, host=heliodromus, username=nanounanue, pid=21171) was stopped. Shutting down Keep-Alive thread 

正如你所看到的,消息Doing something...从不打印。哪里不对?

+2

错误在'def requieres(self):'。它必须是'需要'。 – matagus

+0

Luigi检查该方法以获取输入文件,由于'require'方法不存在,它将返回一个空列表。 – matagus

+0

你是绝对正确的!我是这样的一个潜水员!谢谢! – nanounanue

回答

12

这里的关键是定义一个外部任务没有输入和哪些输出是你已经在生活在S3中的那些文件。路易吉文档中Requiring another Task提到这一点:

Note that requires() can not return a Target object. If you have a simple Target object that is created externally you can wrap it in a Task class

所以,基本上你最终是这样的:

import luigi 

from luigi.s3 import S3Target 

from somewhere import do_something_with 


class MyS3File(luigi.ExternalTask): 

    def output(self): 
     return luigi.S3Target('s3://my-bucket/path/to/file') 

class ProcessS3File(luigi.Task): 

    def requires(self): 
     return MyS3File() 

    def output(self): 
     return luigi.S3Target('s3://my-bucket/path/to/output-file') 

    def run(self): 
     result = None 
     # this will return a file stream that reads the file from your aws s3 bucket 
     with self.input().open('r') as f: 
      result = do_something_with(f) 

     # and the you 
     out_file = self.output().open('w') 
     # it'd better to serialize this result before writing it to a file, but this is a pretty simple example 
     out_file.write(result) 

UPDATE:

路易吉使用boto可以从中读取文件和/或将它们写入AWS S3,所以为了使此代码正常工作,您需要在boto配置文件~/boto中提供凭据(查找其他possible config file locations here):

[Credentials] 
aws_access_key_id = <your_access_key_here> 
aws_secret_access_key = <your_secret_key_here> 
+0

您的代码存在一些问题,请问您能修复它们吗? (例如,第一个'output'方法中的'return'应该是'return S3Target(...' – nanounanue

+0

另一个问题,我应该在哪个部分提供'aws credentials'? – nanounanue

+0

完成更新我的答案。 – matagus