2016-04-26 65 views
0

MRJob等待,直到每个作业完成后才将控制权交还给用户。我把一个大的EMR步骤分解成更小的步骤,并希望一次性提交它们。使用MRJob将作业提交到EMR集群

该文档讨论programmatically submitting tasks,但示例代码也等待作业完成(因为他们调用runner.run()命令,其中blocks until the job is complete)。

此外,EMR有256个活动作业的限制,但是,我们如何去填充这256个作业,而不是在连接的控制台上循环并获取输出。

回答

0

经过几天的尝试,以下是我能想到的最好的。

我的初始尝试,当我意识到提交的作业在终端被分离时不会被剔除时,是(在bash脚本中)提交并杀死作业。但是,这并不能很好地发挥作用,因为AWS调用EMR调用,因此一些作业在提交之前被杀死。

目前最好的解决方案

from jobs import MyMRJob 
import logging 

logging.basicConfig(
    level=logging.INFO, 
    format = '%(asctime)-15s %(levelname)-8s %(message)s', 
) 
log = logging.getLogger('submitjobs') 

def main(): 
    cluster_id="x-MXMXMX" 
    log.info('Cluster: %s', cluster_id) 
    for i in range(10): 
     n = '%04d' % i 
     log.info('Adding job: %s', n) 
     mr_job = MyMRJob(args=[ 
      '-r', 'emr', 
      '--conf-path', 'mrjob.conf', 
      '--no-output', 
      '--output-dir', 's3://mybucket/mrjob/%s' % n, 
      '--cluster-id', cluster_id, 
      'input/file.%s' % n 
    ]) 
    runner = mr_job.make_runner() 
    # the following is the secret sauce, submits the job and returns 
    # it is a private method though, so may be changed without notice 
    runner._launch() 

if __name__ == '__main__': 
    main() 
相关问题