2014-10-19 40 views
0

我试图以编程方式运行简单的wordcount示例,但我无法让代码在hadoop集群上工作。mrjob坏 - 在Hadoop集群上使用make_runner时出现错误

作业test_job.py:在mr_job_test.py

from mrjob.job import MRJob 
import re 


WORD_RE = re.compile(r"[\w']+") 

class MRWordFreqCount(MRJob): 

    def mapper(self, _, line): 
     for word in WORD_RE.findall(line): 
      yield word.lower(), 1 

    def combiner(self, word, counts): 
     yield word, sum(counts) 

    def reducer(self, word, counts): 
     yield word, sum(counts) 

亚军:

from test_jobs import MRWordFreqCount 

def test_runner(in_args, input_dir): 
    tmp_output = [] 
    args = in_args + input_dir 
    mr_job = MRWordFreqCount(args.split()) 
    with mr_job.make_runner() as runner: 
     runner.run() 
     for line in runner.stream_output(): 
      tmp_output = tmp_output + [line] 
    return tmp_output 

if __name__ == '__main__': 
    input_dir = 'hdfs:///test_input/' 
    args = '-r hadoop ' 
    print test_runner(args, input_dir) 

我可以(与inline选项)本地运行这段代码,但在Hadoop我:

> Traceback (most recent call last): File "mr_job_tester.py", line 17, 
> in <module> 
>  print test_runner(args, input_dir) File "mr_job_tester.py", line 8, in test_runner 
>  runner.run() File "/usr/local/lib/python2.7/dist-packages/mrjob/runner.py", line 458, in 
> run 
>  self._run() File "/usr/local/lib/python2.7/dist-packages/mrjob/hadoop.py", line 239, in 
> _run 
>  self._run_job_in_hadoop() File "/usr/local/lib/python2.7/dist-packages/mrjob/hadoop.py", line 295, in 
> _run_job_in_hadoop 
>  for step_num in xrange(self._num_steps()): File "/usr/local/lib/python2.7/dist-packages/mrjob/runner.py", line 742, in 
> _num_steps 
>  return len(self._get_steps()) File "/usr/local/lib/python2.7/dist-packages/mrjob/runner.py", line 721, in 
> _get_steps 
>  raise ValueError("Bad --steps response: \n%s" % stdout) ValueError: Bad --steps response: 

回答

0

According to this)mrjob提交作业文件并在映射器内部远程执行的方式和减速器,使得有必要在工作申报文件中有以下几行:

if __name__ == "__main__": 
    MRWordFreqCount.run()