2011-03-27 46 views
12

我需要从我的减速器中的映射器访问计数器。这可能吗?如果是这样怎么办?从减速器访问映射器的计数器

举个例子:我 映射是:

public class CounterMapper extends Mapper<Text,Text,Text,Text> { 

    static enum TestCounters { TEST } 

    @Override 
    protected void map(Text key, Text value, Context context) 
        throws IOException, InterruptedException { 
     context.getCounter(TestCounters.TEST).increment(1); 
     context.write(key, value); 
    } 
} 

我的减速器是

public class CounterReducer extends Reducer<Text,Text,Text,LongWritable> { 

    @Override 
    protected void reduce(Text key, Iterable<Text> values, Context context) 
         throws IOException, InterruptedException { 
     Counter counter = context.getCounter(CounterMapper.TestCounters.TEST); 
     long counterValue = counter.getValue(); 
     context.write(key, new LongWritable(counterValue)); 
    } 
} 

的对价始终是0 我是不是做错了什么或者是这是不可能的?

回答

2

map/reduce的重点在于并行化作业。将会有许多独特的映射器/缩减器,因此除了映射/缩减对的运行之外,该值不会是正确的。

他们有字数例如:

http://wiki.apache.org/hadoop/WordCount

你可以改变context.write(字,一个)context.write(线,一个)

1

全球计数器值从不向每个映射器或减速器回放。如果您希望减速器可以使用映射器记录的数量,则需要依靠某种外部机制来执行此操作。

+0

JobTracker将跟踪计数器。 – 2011-03-28 04:26:07

9

在Reducer的配置(JobConf)中,可以使用JobConf对象查找Reducer自己的作业ID。因此,您的Reducer可以创建自己的JobClient - 即与作业跟踪器的连接 - 并查询计数器以查找此作业(或针对此问题的任何作业)。

// in the Reducer class... 
private long mapperCounter; 

@Override 
public void configure(JobConf conf) { 
    JobClient client = new JobClient(conf); 
    RunningJob parentJob = 
     client.getJob(JobID.forName(conf.get("mapred.job.id"))); 
    mapperCounter = parentJob.getCounters().getCounter(MAP_COUNTER_NAME); 
} 

现在你可以在reduce()方法本身内使用mapperCounter了。

你实际上需要一个尝试在这里。我使用旧的API,但不应该很难适应新的API。

注意,映射器柜台都应该任何减速开始前完成,因此违背了贾斯汀·托马斯的评论,我相信你应该得到精确的值(只要减速不增加专柜的一样!)

+0

看起来与直觉相反,映射器中的计数器在reducer中不可用,但在'Hadoop'中,reducer可以比所有映射器完成更早开始执行。在这种情况下,计数器的值可能在不同的时间被读取为不同的。要详细了解如何可以在时间映射程序完成执行之前启动reducer,请访问以下文章:http://stackoverflow.com/questions/11672676/when-do-reduce-tasks-start-in-hadoop – abhinavkulkarni 2013-10-10 19:09:34

+2

@abhinavkulkarni实际上,** only **还原器的混洗阶段可以在所有映射器启动之前启动,这与计数器无关。所以,当减速机的减速阶段开始时,所有的映射计数器都是正确的。在同一篇文章中:“另一方面,分类和缩减只能在所有的mapper完成后才能启动。” – vefthym 2014-05-12 14:22:47

8

上实现新的API杰夫G公司的解决方案:

@Override 
    public void setup(Context context) throws IOException, InterruptedException{ 
     Configuration conf = context.getConfiguration(); 
     Cluster cluster = new Cluster(conf); 
     Job currentJob = cluster.getJob(context.getJobID()); 
     mapperCounter = currentJob.getCounters().findCounter(COUNTER_NAME).getValue(); 
    } 
+2

我试过这个,但是我在下面一行mapperCounter = currentJob.getCounters()。findCounter(COUNTER_NAME),我用自定义计数器替换了COUNTER_NAME – 2015-12-09 05:32:51

+0

似乎'cluster.getJob( context.getJobID());'在hadoop的独立操作中不起作用。 在单节点群集模式下运行时,这对我很有用。 – dauer 2016-11-23 17:08:11

1

我问this question,但我还没有解决我的问题。然而,我想到了另一种解决方案。在映射器中,字数被计数,并且它可以在运行映射器结束的清除函数中用最小密钥写入中间输出(以便这个值在头)。在减速器中,单词的数量是通过在头部添加值来计算的。示例代码及其输出的一部分在下面提供。

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 

import java.io.IOException; 
import java.util.StringTokenizer; 

/** 
* Created by tolga on 1/26/16. 
*/ 
public class WordCount { 
    static enum TestCounters { TEST } 
    public static class Map extends Mapper<Object, Text, Text, LongWritable> { 
     private final static LongWritable one = new LongWritable(1); 
     private Text word = new Text(); 

     public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 
      String line = value.toString(); 
      StringTokenizer tokenizer = new StringTokenizer(line); 
      while (tokenizer.hasMoreTokens()) { 
       word.set(tokenizer.nextToken()); 
       context.write(word, one); 
       context.getCounter(TestCounters.TEST).increment(1); 
      } 
     } 

     @Override 
     protected void cleanup(Context context) throws IOException, InterruptedException { 
      context.write(new Text("!"),new LongWritable(context.getCounter(TestCounters.TEST).getValue())); 
     } 
    } 

    public static class Reduce extends Reducer<Text, LongWritable, Text, LongWritable> { 

     public void reduce(Text key, Iterable<LongWritable> values, Context context) 
       throws IOException, InterruptedException { 
      int sum = 0; 
      for (LongWritable val : values) { 
       sum += val.get(); 
      } 
      context.write(key, new LongWritable(sum)); 
     } 
    } 

    public static void main(String[] args) throws Exception { 
     Configuration conf = new Configuration(); 

     Job job = new Job(conf, "WordCount"); 
     job.setJarByClass(WordCount.class); 

     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(LongWritable.class); 

     job.setMapperClass(Map.class); 
     job.setReducerClass(Reduce.class); 

     job.setInputFormatClass(TextInputFormat.class); 
     job.setOutputFormatClass(TextOutputFormat.class); 

     FileInputFormat.addInputPath(job, new Path(args[0])); 
     FileOutputFormat.setOutputPath(job, new Path(args[1])); 

     job.waitForCompletion(true); 
    } 
} 

文本文件:

Turgut Özal University is a private university located in Ankara, Turkey. It was established in 2008 by the Turgut Özal Thought and Action Foundation and is named after former Turkish president Turgut Özal. 

中间输出

**! \t 33** 
 
2008 \t 1 
 
Action \t 1 
 
Ankara, \t 1 
 
Foundation \t 1 
 
It \t 1 
 
Thought \t 1 
 
Turgut \t 1 
 
Turgut \t 1 
 
Turgut \t 1

**! \t 33** 
 
2008 \t 1 
 
Action \t 1 
 
Ankara, \t 1 
 
Foundation \t 1 
 
It \t 1 
 
Thought \t 1 
 
Turgut \t 3

0

从伊扎基的回答

findCounter(COUNTER_NAME)改进是不再支持 - https://hadoop.apache.org/docs/r2.7.0/api/org/apache/hadoop/mapred/Counters.html

@Override 
public void setup(Context context) throws IOException, InterruptedException{ 
    Configuration conf = context.getConfiguration(); 
    Cluster cluster = new Cluster(conf); 
    Job currentJob = cluster.getJob(context.getJobID()); 
    mapperCounter = currentJob.getCounters().findCounter(GROUP_NAME, COUNTER_NAME).getValue(); 
} 

GROUP_NAME指定,调用计数器时。例如

context.getCounter("com.example.mycode", "MY_COUNTER").increment(1); 

然后

mapperCounter = currentJob.getCounters().findCounter("com.example.mycode", "MY_COUNTER").getValue(); 

同样,如果计数器不存在,一个重要的点将初始化一个具有值为0

相关问题