2014-09-30 69 views
4

我正在尝试使用mrjob在EMR上运行hadoop,并且无法弄清楚如何设置日志记录(用户在地图/减少步骤中生成日志),以便我可以访问它们集群终止后。mrjob:在EMR上设置日志记录

我试图使用logging模块设置日志记录,printsys.stderr.write(),但目前为止没有运气。唯一适用于我的选项是将日志写入文件,然后SSH将机器读取并读取,但其繁琐。我希望我的日志转到stderr/stdout/syslog并自动收集到S3,以便在群集终止后查看它们。

这里是日志的word_freq例如:

"""The classic MapReduce job: count the frequency of words. 
""" 
from mrjob.job import MRJob 
import re 
import logging 
import logging.handlers 
import sys 

WORD_RE = re.compile(r"[\w']+") 


class MRWordFreqCount(MRJob): 

    def mapper_init(self): 
     self.logger = logging.getLogger() 
     self.logger.setLevel(logging.INFO) 
     self.logger.addHandler(logging.FileHandler("/tmp/mr.log")) 
     self.logger.addHandler(logging.StreamHandler()) 
     self.logger.addHandler(logging.StreamHandler(sys.stdout)) 
     self.logger.addHandler(logging.handlers.SysLogHandler()) 

    def mapper(self, _, line): 
     self.logger.info("Test logging: %s", line) 
     sys.stderr.write("Test stderr: %s\n" % line) 
     print "Test print: %s" % line 
     for word in WORD_RE.findall(line): 
      yield (word.lower(), 1) 

    def combiner(self, word, counts): 
     yield (word, sum(counts)) 

    def reducer(self, word, counts): 
     yield (word, sum(counts)) 


if __name__ == '__main__': 
    MRWordFreqCount.run() 

回答

3

出的所有选项,只有一个真正起作用的是使用标准错误与直接写入(sys.stderr.write)或使用带有StreamHandler中的标准错误的记录器。

[s3_log_uri]/[jobflow-ID] /任务尝试/ [作业ID]/[尝试:在作业完成后(成功或有错误),从可以在以后被检索

日志-id]/stderr

请务必将日志保存在您的runners.emr.cleanup配置中。