2015-06-18 55 views
0

我刚开始学习Hadoop。我试图将流式接口与处理文件的Python脚本一起使用:对于每个输入文件,我都会创建一个输出文件并提供关于它的一些信息,所以这是一个没有缩减器的映射作业。我发现的是,文件正在一次处理一个,这不是我想要的。Hadoop Streaming作业中并行映射器任务的数量

我会解释我所做的事情,但之后我还会发布一些代码以防万一我在那里丢失了某些东西。

我有一个输入格式和记录阅读器,它读取整个文件并将其内容用作值和文件名作为键。 (这些文件并不是很大。)另一方面,我有一个输出格式和记录写入器,它将值写入基于密钥名称的文件中。我正在使用-io rawbytes,我的Python脚本知道如何读取和写入键/值对。

这一切工作正常,在生产我期待的产出方面。如果我用例如10个输入文件运行,我会得到10个分割。这意味着每次我的脚本运行时只会得到一个键/值对 - 这并不理想,但这不是什么大问题,我可以看出这可能是不可避免的。更不好的是它在任何时候只有一个脚本正在运行的实例。设置mapreduce.job.maps并没有什么区别(尽管我依稀记得看到这个值只是一个建议,所以也许Hadoop做出了不同的决定)。我错过了什么?

这里是我的代码: -

#!/bin/bash 

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \ 
    -libjars mimi.jar \ 
    -D mapreduce.job.reduces=0 \ 
    -files rawbytes_mapper.py,irrelevant.py \ 
    -inputformat "mimi.WholeFileInputFormat" \ 
    -outputformat "mimi.NamedFileOutputFormat" \ 
    -io rawbytes \ 
    -mapper "rawbytes_mapper.py irrelevant blah blah blah" \ 
    -input "input/*.xml" \ 
    -output output 
#!/usr/bin/python 

def read_raw_bytes(input): 
    length_bytes = input.read(4) 
    if len(length_bytes) < 4: 
     return None 
    length = 0 
    for b in length_bytes: 
     length = (length << 8) + ord(b) 
    return input.read(length) 

def write_raw_bytes(output, s): 
    length = len(s) 
    length_bytes = [] 
    for _ in range(4): 
     length_bytes.append(chr(length & 0xff)) 
     length = length >> 8 
    length_bytes.reverse() 
    for b in length_bytes: 
     output.write(b) 
    output.write(s) 

def read_keys_and_values(input): 
    d = {} 
    while True: 
     key = read_raw_bytes(input) 
     if key is None: break 
     value = read_raw_bytes(input) 
     d[key] = value 
    return d 

def write_keys_and_values(output, d): 
    for key in d: 
     write_raw_bytes(output, key) 
     write_raw_bytes(output, d[key]) 

if __name__ == "__main__": 
    import sys 
    module = __import__(sys.argv[1]) 
    before = read_keys_and_values(sys.stdin) 
    module.init(sys.argv[2:]) 
    after = module.process(before) 
    write_keys_and_values(sys.stdout, after) 
package mimi; 

import java.io.IOException; 
import java.nio.charset.StandardCharsets; 

import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.FSDataInputStream; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.BytesWritable; 
import org.apache.hadoop.io.IOUtils; 
import org.apache.hadoop.mapred.FileInputFormat; 
import org.apache.hadoop.mapred.FileSplit; 
import org.apache.hadoop.mapred.InputSplit; 
import org.apache.hadoop.mapred.JobConf; 
import org.apache.hadoop.mapred.RecordReader; 
import org.apache.hadoop.mapred.Reporter; 

public class WholeFileInputFormat extends FileInputFormat<BytesWritable, BytesWritable> 
{ 
    private static class WholeFileRecordReader implements RecordReader<BytesWritable, BytesWritable> 
    { 
     private FileSplit split; 
     private JobConf conf; 
     private boolean processed = false; 

     public WholeFileRecordReader(FileSplit split, JobConf conf) 
     { 
      this.split = split; 
      this.conf = conf; 
     } 

     @Override 
     public BytesWritable createKey() 
     { 
      return new BytesWritable(); 
     } 

     @Override 
     public BytesWritable createValue() 
     { 
      return new BytesWritable(); 
     } 

     @Override 
     public boolean next(BytesWritable key, BytesWritable value) throws IOException 
     { 
      if (processed) 
      { 
       return false; 
      } 

      byte[] contents = new byte[(int) split.getLength()]; 
      Path file = split.getPath(); 
      String name = file.getName(); 
      byte[] bytes = name.getBytes(StandardCharsets.UTF_8); 
      key.set(bytes, 0, bytes.length); 
      FileSystem fs = file.getFileSystem(conf); 
      FSDataInputStream in = null; 
      try 
      { 
       in = fs.open(file); 
       IOUtils.readFully(in, contents, 0, contents.length); 
       value.set(contents, 0, contents.length); 
      } 
      finally 
      { 
       IOUtils.closeStream(in); 
      } 

      processed = true; 
      return true; 
     } 

     @Override 
     public float getProgress() throws IOException 
     { 
      return processed ? 1.0f : 0.0f; 
     } 

     @Override 
     public long getPos() throws IOException 
     { 
      return processed ? 0l : split.getLength(); 
     } 

     @Override 
     public void close() throws IOException 
     { 
      // do nothing 
     } 
    } 

    @Override 
    protected boolean isSplitable(FileSystem fs, Path file) 
    { 
     return false; 
    } 

    @Override 
    public RecordReader<BytesWritable, BytesWritable> getRecordReader(InputSplit split, 
                     JobConf conf, 
                     Reporter reporter) 
    throws IOException 
    { 
     return new WholeFileRecordReader((FileSplit) split, conf); 
    } 
} 
package mimi; 

import java.io.IOException; 
import java.nio.charset.StandardCharsets; 

import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.FSDataOutputStream; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.BytesWritable; 
import org.apache.hadoop.mapred.lib.MultipleOutputFormat; 
import org.apache.hadoop.mapred.FileOutputFormat; 
import org.apache.hadoop.mapred.JobConf; 
import org.apache.hadoop.mapred.RecordWriter; 
import org.apache.hadoop.mapred.Reporter; 
import org.apache.hadoop.util.Progressable; 

public class NamedFileOutputFormat extends MultipleOutputFormat<BytesWritable, BytesWritable> 
{ 
    private static class BytesValueWriter implements RecordWriter<BytesWritable, BytesWritable> 
    { 
     FSDataOutputStream out; 

     BytesValueWriter(FSDataOutputStream out) 
     { 
      this.out = out; 
     } 

     @Override 
     public synchronized void write(BytesWritable key, BytesWritable value) throws IOException 
     { 
      out.write(value.getBytes(), 0, value.getLength()); 
     } 

     @Override 
     public void close(Reporter reporter) throws IOException 
     { 
      out.close(); 
     } 
    } 

    @Override 
    protected String generateFileNameForKeyValue(BytesWritable key, BytesWritable value, String name) 
    { 
     return new String(key.getBytes(), 0, key.getLength(), StandardCharsets.UTF_8); 
    } 

    @Override 
    public RecordWriter<BytesWritable, BytesWritable> getBaseRecordWriter(FileSystem ignored, 
                      JobConf conf, 
                      String name, 
                      Progressable progress) 
    throws IOException 
    { 
     Path file = FileOutputFormat.getTaskOutputPath(conf, name); 
     FileSystem fs = file.getFileSystem(conf); 
     FSDataOutputStream out = fs.create(file, progress); 
     return new BytesValueWriter(out); 
    } 
} 

回答

0

我想我可以帮你解决问题的这部分:

每次我的脚本运行它只有一个键/值对 - 这不是理想的

如果 isSplitable方法返回false,每个映射器只处理一个文件。因此,如果您不覆盖 isSplitable方法并将其保留返回true您应该在一个映射器中具有多个键/值对。在你的情况下,每个文件都是一个键/值对,因此即使 isSplitable返回true时也不能拆分它们。

我不明白为什么只有一个映射器在一次启动,但我仍在考虑它:)

相关问题