2014-02-07 78 views
0

Hadoop的DistributedCache文档似乎并未充分描述如何使用分布式缓存。这里给出的例子:将Hadoop DistributedCache与存档结合使用

// Setting up the cache for the application 

1. Copy the requisite files to the FileSystem: 

$ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat 
$ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip 
$ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar 
$ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar 
$ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz 
$ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz 

2. Setup the application's JobConf: 

JobConf job = new JobConf(); 
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"), 
           job); 
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job); 
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job); 
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job); 
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job); 
DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job); 

3. Use the cached files in the Mapper 
or Reducer: 

public static class MapClass extends MapReduceBase 
implements Mapper<K, V, K, V> { 

    private Path[] localArchives; 
    private Path[] localFiles; 

    public void configure(JobConf job) { 
    // Get the cached archives/files 
    File f = new File("./map.zip/some/file/in/zip.txt"); 
    } 

    public void map(K key, V value, 
        OutputCollector<K, V> output, Reporter reporter) 
    throws IOException { 
    // Use data from the cached archives/files here 
    // ... 
    // ... 
    output.collect(k, v); 
    } 
} 

我一直在寻找一个小时试图找出如何使用这个。拼凑几个其他SO问题在一起后,这里就是我想出了:

public static void main(String[] args) throws Exception { 
    Job job = new Job(new JobConf(), "Job Name"); 
    JobConf conf = job.getConfiguration(); 
    DistributedCache.createSymlink(conf); 
    DistributedCache.addCacheArchive(new URI("/ProjectDir/LookupTable.zip", job); 
    // *Rest of configuration code* 
} 

public static class MyMapper extends Mapper<Object, Text, Text, IntWritable> 
{ 
    private Path[] localArchives; 

    public void configure(JobConf job) 
    { 
     // Get the cached archive 
     File file1 = new File("./LookupTable.zip/file1.dat"); 
     BufferedReader br1index = new BufferedReader(new FileInputStream(file1)); 
    } 

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
    { // *Map code* } 
} 
  • 我应该在哪里调用void configure(JobConf job)功能?
  • 我在哪里可以使用private Path[] localArchives对象?
  • 我的configure()函数中的代码是否正确地访问档案中的文件并将文件链接到BufferedReader?

回答

1

我会回答你的问题w.r.t新的API,并使用共同的规范,分布式缓存

  • 我应该在哪里调用无效配置(JobConf工作)功能?

框架将调用保护无效的设置(上下文的背景下),在每一个地图的任务开始方法一次,使用缓存文件通常这里处理相关的逻辑。例如,读取文件,并存储在变量数据在地图中()函数可以使用被称为设置后()

  • 我在哪里使用专用路径[] localArchives对象?

它将通常用于setup()方法来检索缓存文件的路径。像这样的东西。

Path[] localArchive =DistributedCache.getLocalCacheFiles(context.getConfiguration()); 
  • 是我在配置()函数来访问存档内 文件,使文件和一个BufferedReader链接的正确方法的代码?

它缺少一个调用方法来检索存储缓存文件的路径(如上所示)。一旦路径被检索到,文件可以被读取如下。

FSDataInputStream in = fs.open(localArchive); 
BufferedReader br = new BufferedReader(new InputStreamReader(in)); 
+0

'''setup'和'configure'函数是否在Mapper类中? “JobConf作业”将超出范围。另外,我收到了'DistributedCache'和'URI'的未定义符号错误。任何想法我失踪?感谢您所有的帮助! – LeonardBlunderbuss

+0

安装程序来自新API并在映射程序类中使用。我建议您在此处开始使用新API http://hadoop.apache.org/docs/stable2/api/org/apache/hadoop/mapreduce/Mapper.html – rVr