我想实现一个简单的Hadoop地图使用Cloudera的5.5.0 地图&减少步骤应该使用Python 2.6.6实现减少例如为什么hadoop mapreduce与python失败,但脚本正在命令行上工作?
问题:
- 如果脚本正在在unix命令行上执行,他们工作得非常好,并产生预期的输出。
cat join2 * .txt | ./join3_mapper.py |排序| ./join3_reducer.py
- 但执行脚本作为Hadoop的任务非常失败:
Hadoop的罐子/usr/lib/hadoop-mapreduce/hadoop-streaming.jar -input /user/cloudera/inputTV/join2_gen*.txt -output /用户/ Cloudera的/ output_tv -mapper /home/cloudera/join3_mapper.py -reducer /home/cloudera/join3_reducer.py -numReduceTasks 1
16/01/06 12:32:32 INFO mapreduce.Job: Task Id : attempt_1452069211060_0026_r_000000_0, Status : FAILED Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1 at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325) at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538) at org.apache.hadoop.streaming.PipeReducer.close(PipeReducer.java:134) at org.apache.hadoop.io.IOUtils.cleanup(IOUtils.java:244) at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:459) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
映射器的工作原理,如果hadoop的命令与-numReduceTasks 0, Hadoop的作业正在执行仅地图步骤执行时,成功地结束,并且输出目录从地图步骤包含结果的文件。
我想减法步骤一定有什么问题呢?
- 在色彩上标准错误日志不显示任何有关:
日志上传时间:星期三年1月6 12时33分十秒-0800 2016 日志长度:222 的log4j:警告没有附加目的地可以发现记录仪(org.apache.hadoop.ipc.Server)。 log4j:WARN请正确初始化log4j系统。 log4j:警告有关更多信息,请参见http://logging.apache.org/log4j/1.2/faq.html#noconfig。
代码的脚本: 第一档:join3_mapper.py
#!/usr/bin/env python
import sys
for line in sys.stdin:
line = line.strip() #strip out carriage return
tuple2 = line.split(",") #split line, into key and value, returns a list
if len(tuple2) == 2:
key = tuple2[0]
value = tuple2[1]
if value == 'ABC':
print('%s\t%s' % (key, value))
elif value.isdigit():
print('%s\t%s' % (key, value))
的第二个选项:join3_reducer.py
#!/usr/bin/env python
import sys
last_key = None #initialize these variables
running_total = 0
abcFound =False;
this_key = None
# -----------------------------------
# Loop the file
# --------------------------------
for input_line in sys.stdin:
input_line = input_line.strip()
# --------------------------------
# Get Next Key value pair, splitting at tab
# --------------------------------
tuple2 = input_line.split("\t")
this_key = tuple2[0]
value = tuple2[1]
if value.isdigit():
value = int(value)
# ---------------------------------
# Key Check part
# if this current key is same
# as the last one Consolidate
# otherwise Emit
# ---------------------------------
if last_key == this_key:
if value == 'ABC': # filter for only ABC in TV shows
abcFound=True;
else:
if isinstance(value, (int,long)):
running_total += value
else:
if last_key: #if this key is different from last key, and the previous
# (ie last) key is not empy,
# then output
# the previous <key running-count>
if abcFound:
print('%s\t%s' % (last_key, running_total))
abcFound=False;
running_total = value #reset values
last_key = this_key
if last_key == this_key:
print('%s\t%s' % (last_key, running_total))
我曾尝试声明输入文件到的各种不同的方式hadoop命令,没有区别,没有成功。
我在做什么错?提示,想法非常赞赏谢谢
您是否需要toolrunner才能够从命令行运行jar文件? –
另外,Java程序不是jar文件吗? –
我不是自己执行一个jar文件,我正在执行hadoop命令并告诉hadoop执行声明的jar文件。库路径后面的其余部分是与hadoop-streaming.jar相关的参数,并与执行的MapReduce操作相关。是的,jar文件是java程序 –