我正在使用MRjob在我们的HBase实例上运行Hadoop Streaming作业。对于我的生活,我无法弄清楚如何将一个参数传递给我的reducer。我有两个参数,我想从我运行作业时传递给reducer:startDate和endDate。这是我目前的减速机看起来像︰将参数传递给MRjob中的reducer
def reducer(self, groupId, meterList):
"""
Print bucket.
"""
sys.stderr.write("Working on group = " + str(groupId) + "\n")
#print "Opening connection..."
conn = open_connection(hostname)
#print "Getting table..."
table = get_table(conn, tableName)
compositeDf = DataFrame()
for meterId in meterList:
sys.stderr.write("Querying: " + str(meterId) + "\n")
df = extract_meter_data(table, meterId, startDate, endDate)
我似乎无法将startDate和endDate作为参数传递给我的减速器。我可以通过一个全局变量来获取参数。
startDate = datetime.datetime(2012, 6, 10)
endDate = datetime.datetime(2012, 6, 11)
class MRDataQuality(MRJob):
"""
MapReduce job that does a data quality check on the meter data in HBase.
"""
但这很脏。我想通过它来调用这个工作。我尝试了很多方法。将它设置为一个实例变量,将其设置为一个静态类变量,为MRDataQualityJob创建一个重载的构造函数....似乎没有任何工作。我从我的顶层脚本编程调用它像这样:
if args.hadoop:
mrdq_job = MRDataQuality(args=['-r', 'hadoop', '--conf-path', 'mrjob.conf', '--jobconf', 'mapred.reduce.tasks=42', meterFile])
else:
mrdq_job = MRDataQuality(args=[meterFile])
with mrdq_job.make_runner() as runner:
runner.run()
无论我做什么它似乎像runner.run(在mrdq_job实例)使用其没有按类的新鲜的新实例没有定义实例或静态变量。我怎样才能将我的参数传递给reducer?我可以在普通的Hadoop Streaming中通过传递一个字符串来实现:“--reducer reducer.py arg1 arg2”。 MRjob是否有任何等价物?
注意'get_jobconf_value'被贬值。 https://pythonhosted.org/mrjob/utils-compat.html#mrjob.compat.jobconf_from_env – dranxo