2013-08-01 122 views
3

我正在使用MRjob在我们的HBase实例上运行Hadoop Streaming作业。对于我的生活,我无法弄清楚如何将一个参数传递给我的reducer。我有两个参数,我想从我运行作业时传递给reducer:startDate和endDate。这是我目前的减速机看起来像︰将参数传递给MRjob中的reducer

def reducer(self, groupId, meterList): 
    """ 
    Print bucket. 
    """ 
    sys.stderr.write("Working on group = " + str(groupId) + "\n") 
    #print "Opening connection..." 
    conn = open_connection(hostname) 
    #print "Getting table..." 
    table = get_table(conn, tableName) 

    compositeDf = DataFrame() 

    for meterId in meterList: 
     sys.stderr.write("Querying: " + str(meterId) + "\n") 
     df = extract_meter_data(table, meterId, startDate, endDate) 

我似乎无法将startDate和endDate作为参数传递给我的减速器。我可以通过一个全局变量来获取参数。

startDate = datetime.datetime(2012, 6, 10) 
endDate = datetime.datetime(2012, 6, 11) 

class MRDataQuality(MRJob): 
    """ 
    MapReduce job that does a data quality check on the meter data in HBase. 
    """ 

但这很脏。我想通过它来调用这个工作。我尝试了很多方法。将它设置为一个实例变量,将其设置为一个静态类变量,为MRDataQualityJob创建一个重载的构造函数....似乎没有任何工作。我从我的顶层脚本编程调用它像这样:

if args.hadoop: 
    mrdq_job = MRDataQuality(args=['-r', 'hadoop', '--conf-path', 'mrjob.conf', '--jobconf', 'mapred.reduce.tasks=42', meterFile]) 
else: 
    mrdq_job = MRDataQuality(args=[meterFile]) 

with mrdq_job.make_runner() as runner: 
    runner.run() 

无论我做什么它似乎像runner.run(在mrdq_job实例)使用其没有按类的新鲜的新实例没有定义实例或静态变量。我怎样才能将我的参数传递给reducer?我可以在普通的Hadoop Streaming中通过传递一个字符串来实现:“--reducer reducer.py arg1 arg2”。 MRjob是否有任何等价物?

回答

3

如何将您的参数传递给作业配置,然后使用get_jobconf_value读取它们?

事情是这样的:

from mrjob.compat import get_jobconf_value 

class MRDataQuality(MRJob): 

    def reducer(self, groupId, meterList): 
    ... 
    startDate = get_jobconf_value("my.job.settings.startdate") 
    endDate = get_jobconf_value("my.job.settings.enddate") 

    for meterId in meterList: 
     sys.stderr.write("Querying: " + str(meterId) + "\n") 
     df = extract_meter_data(table, meterId, startDate, endDate)  

,然后设置参数代码像你上面

mrdq_job = MRDataQuality(args=['-r', 'hadoop', '--conf-path', 'mrjob.conf', '--jobconf', 'mapred.reduce.tasks=42', '--jobconf', 'my.job.settings.startdate=2013-06-10', '--jobconf', 'my.job.settings.enddate=2013-06-11', meterFile]) 
+1

注意'get_jobconf_value'被贬值。 https://pythonhosted.org/mrjob/utils-compat.html#mrjob.compat.jobconf_from_env – dranxo

1

做怎么样和get_jobconf_value传递参数到工作配置,然后读取它们的内reducer_init?这样你只需要一次读取参数。

事情是这样的:

from mrjob.compat import get_jobconf_value 

class MRDataQuality(MRJob): 

    def reducer_init(self): 
    ... 
    self.startDate = get_jobconf_value("my.job.settings.startdate") 
    self.endDate = get_jobconf_value("my.job.settings.enddate") 

    def reducer(self, groupId, meterList): 
    for meterId in meterList: 
     sys.stderr.write("Querying: " + str(meterId) + "\n") 
     df = extract_meter_data(table, meterId, self.startDate, self.endDate)  

,然后设置参数,在代码中像你这样上述

mrdq_job = MRDataQuality(args=['-r', 'hadoop', '--conf-path', 'mrjob.conf', '--jobconf', 'mapred.reduce.tasks=42', '--jobconf', 'my.job.settings.startdate=2013-06-10', '--jobconf', 'my.job.settings.enddate=2013-06-11', meterFile])