2015-05-27 33 views
0

我正在使用Hadoop流式传输作业。在bash脚本中使用expr时hadoop流式传输错误

我的映射器是用bash编写的。它使用job_id。

mapred_job_id=`echo "$mapred_job_id" | awk -F "_" '{print $NF}'` 

它工作正常(空空的debuggin目的的愚蠢wordcound映射器操作),直到我有以下行,使作业崩溃:

mapred_job_id=`expr $mapred_job_id \* 2` 

的错误是:

INFO mapreduce.Job: Task Id : attempt_1432766867252_0019_m_000007_0, Status : FAILED 
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 2 
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:330) 
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:543) 
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130) 
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:81) 
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34) 
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:432) 
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343) 
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:175) 
at java.security.AccessController.doPrivileged(Native Method) 
at javax.security.auth.Subject.doAs(Subject.java:415) 
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) 
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:170) 

任何想法为什么它不工作?直接在命令行上执行。我也尝试使用名称为dummy的另一个变量。同样的错误。

铌:这条线是因为应该在映射器中的其他作业中需要获取ID并且不能与之前作业的ID相混淆。我正在尝试使用适用于allreduce的vowpal wabbit集群实现。

+0

您为什么要将工作ID加倍?我没有按照你想要达到的目标.. – ShellFish

+0

我认为它与allreduce的vowpalwabbit实现有关。有一个守护进程(生成树)监听所有节点并用ID标识它们。但是由于两个具有两个不同结果的工作是由一个映射器产生的,所以我明白这就是为什么你需要这样做。他们的实现在github上:https://github.com/JohnLangford/vowpal_wabbit/blob/master/cluster/runvw-yarn.sh – bosonfute

+0

其实,它很奇怪。我之前误读了。他们实际上统计了mapred_job_id中的字段数......我不明白,这与我的理解相矛盾。 – bosonfute

回答

1

我发现问题是什么。在bash脚本是不是能够得到Hadoop的数据流的环境变量用命令:

mapper=`printenv mapred_task_id' 

或直接致电例如

$mapreduce_output_fileoutputformat_outputdir 

,因为它是在脚本中做了Vowpal Wabbit集群目录发现github上。 我通过编写一个可以使用os.environ得到这些变量的python脚本来解决这个问题。 os.environ返回一个包含Hadoop流式传输的所有环境变量的字典。这是非常有用的,因为各种版本的mapreduce之间变量的名称已经改变。 Cloudera提供的字典与我自己的版本不一致。这里是得到了我所需要的变量的脚本:

#!/usr/bin/env python 

import sys 
import os 
import subprocess 

nmappers=os.environ["mapreduce_job_maps"] 
submit_host=os.environ["mapreduce_job_submithostname"] 
output_dir=os.environ["mapreduce_output_fileoutputformat_outputdir"] 
mapred_job_id=os.environ["mapreduce_job_id"].split("_")[-1] 
mapper=os.environ["mapreduce_task_id"].split("_")[4] 

print nmappers, submit_host, output_dir, mapred_job_id, mapper 

调用从写在bash映射器这个剧本,然后我就可以使用命令:

mapred_job_id=`expr $mapred_job_id \* 2` 

根据需要改变作业ID由John Langford实现的wowbit的并行化(见https://github.com/JohnLangford/vowpal_wabbit/blob/master/cluster/)。