2013-09-21 46 views
2

今天我开始研究rhdfs和rmr2包。在R中调试mapreduce()函数

一维向量上的mapreduce()函数按预期运行良好。一维向量 一段代码

a1 <- to.dfs(1:20) 
a2 <- mapreduce(input=a1, map=function(k,v) keyval(v, v^2)) 
a3 <- as.data.frame(from.dfs(a2()) 

它返回下列数据帧

Key Val 
1  1 1 
2 10 100 
3 11 121 
4 12 144 
5 13 169 
6 14 196 
7 15 225 
8 16 256 
9 17 289 
10 18 324 
11 19 361 
12 2 4 
13 20 400 
14 3 9 
15 4 16 
16 5 25 
17 6 36 
18 7 49 
19 8 64 
20 9 81 

到目前为止,它是精细。

但是,在mtcars数据集上使用mapreduce函数时,出现以下错误消息。无法进一步调试。请提供一些线索前进。

我的一段代码:

rs1 <- mapreduce(input=mtcars, 
        map=function(k, v) { 
         if (mtcars$hp > 150) keyval("Bigger", 1) }, 
        reduce=function(k, v) keyval(k, sum(v)) 
       ) 

与上面一段代码错误消息。

13/09/21 07:24:49 ERROR streaming.StreamJob: Missing required option: input 
Usage: $HADOOP_HOME/bin/hadoop jar \ 
      $HADOOP_HOME/hadoop-streaming.jar [options] 
Options: 
    -input <path>  DFS input file(s) for the Map step 
    -output <path>  DFS output directory for the Reduce step 
    -mapper <cmd|JavaClassName>  The streaming command to run 
    -combiner <cmd|JavaClassName> The streaming command to run 
    -reducer <cmd|JavaClassName>  The streaming command to run 
    -file  <file>  File/dir to be shipped in the Job jar file 
    -inputformat TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName Optional. 
    -outputformat TextOutputFormat(default)|JavaClassName Optional. 
    -partitioner JavaClassName Optional. 
    -numReduceTasks <num> Optional. 
    -inputreader <spec> Optional. 
    -cmdenv <n>=<v> Optional. Pass env.var to streaming commands 
    -mapdebug <path> Optional. To run this script when a map task fails 
    -reducedebug <path> Optional. To run this script when a reduce task fails 
    -io <identifier> Optional. 
    -verbose 

Generic options supported are 
-conf <configuration file>  specify an application configuration file 
-D <property=value>   use value for given property 
-fs <local|namenode:port>  specify a namenode 
-jt <local|jobtracker:port> specify a job tracker 
-files <comma separated list of files> specify comma separated files to be copied to the map reduce cluster 
-libjars <comma separated list of jars> specify comma separated jar files to include in the classpath. 
-archives <comma separated list of archives> specify comma separated archives to be unarchived on the compute machines. 

The general command line syntax is 
bin/hadoop command [genericOptions] [commandOptions] 


For more details about these options: 
Use $HADOOP_HOME/bin/hadoop jar build/hadoop-streaming.jar -info 

Streaming Command Failed! 
Error in mr(map = map, reduce = reduce, combine = combine, vectorized.reduce, : 
    hadoop streaming failed with error code 1 

快速而详尽的答复是高度赞赏...

回答

2

你所传递的KEYVAL的数据,认为它作为载体,它不是单一的实体。尝试从下面的代码解释。

局部

  • 加载数据

    data(mtcars) 
    
  • 查看几个数据线

    head(mtcars) 
    hpTest=mtcars$hp # taking required data 
    print(hpTest) 
    
  • 最终总和

    尝试
    sum(hpTest[which(hpTest>150)]) # 2804 
    

对Hadoop的MapReduce的

  • 运行出口ENV变量

    # requied 
    Sys.setenv(HADOOP_HOME="/home/trendwise/apache/hadoop-1.0.4"); 
    Sys.setenv(HADOOP_CMD="/home/trendwise/apache/hadoop-1.0.4/bin/hadoop"); 
    
    #optional 
    Sys.setenv(HADOOP_BIN="/home/trendwise/apache/hadoop-1.0.4/bin"); 
    Sys.setenv(HADOOP_CONF_DIR="/home/trendwise/apache/hadoop-1.0.4/conf"); 
    Sys.setenv(HADOOP_STREAMING='/home/trendwise/apache/hadoop-1.0.4/contrib/streaming/hadoop-streaming-1.0.4.jar') 
    Sys.setenv(LD_LIBRARY_PATH="/lib:/lib/x86_64-linux-gnu:/lib64:/usr/lib:/usr/lib64:/usr/local/lib:/usr/local/lib64:/usr/lib/jvm/jdk1.7.0_10/lib:/usr/lib/jvm/jdk1.7.0_10/jre/lib:/usr/lib/jvm/jdk1.7.0_10/jre/lib/amd64:/usr/lib/jvm/jdk1.7.0_10/jre/lib/amd64/server"); 
    
  • 加载库

    library(rmr2) 
    library(rhdfs) 
    
  • 初始化

    hdfs.init() 
    
  • 把输入到HDFS

    hpInput = to.dfs(mtcars$hp) 
    
  • 运行的MapReduce

    mapReduceResult <- mapreduce(input=hpInput, 
          map=function(k, v) { keyval(rep(1,length(which(inputData > 150))) ,v[which(v>150)])} , 
          reduce=function(k2, v2){ keyval(k2, sum(v2))} 
    
  • 观看MR输出

    from.dfs(mapReduceResult) 
    
  • 输出

    $key 
    [1] 1 
    
    $val 
    [1] 2804  
    
0

您可以使用内置在最新RStudio调试功能。只需用本地MR方式重写你的代码

+0

实际上,不需要重写代码,RHadoop具有本地模式 –