0
Hadoop的DistributedCache文档似乎并未充分描述如何使用分布式缓存。这里给出的例子:将Hadoop DistributedCache与存档结合使用
// Setting up the cache for the application
1. Copy the requisite files to the FileSystem:
$ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat
$ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip
$ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar
$ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar
$ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz
$ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz
2. Setup the application's JobConf:
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),
job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job);
3. Use the cached files in the Mapper
or Reducer:
public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {
private Path[] localArchives;
private Path[] localFiles;
public void configure(JobConf job) {
// Get the cached archives/files
File f = new File("./map.zip/some/file/in/zip.txt");
}
public void map(K key, V value,
OutputCollector<K, V> output, Reporter reporter)
throws IOException {
// Use data from the cached archives/files here
// ...
// ...
output.collect(k, v);
}
}
我一直在寻找一个小时试图找出如何使用这个。拼凑几个其他SO问题在一起后,这里就是我想出了:
public static void main(String[] args) throws Exception {
Job job = new Job(new JobConf(), "Job Name");
JobConf conf = job.getConfiguration();
DistributedCache.createSymlink(conf);
DistributedCache.addCacheArchive(new URI("/ProjectDir/LookupTable.zip", job);
// *Rest of configuration code*
}
public static class MyMapper extends Mapper<Object, Text, Text, IntWritable>
{
private Path[] localArchives;
public void configure(JobConf job)
{
// Get the cached archive
File file1 = new File("./LookupTable.zip/file1.dat");
BufferedReader br1index = new BufferedReader(new FileInputStream(file1));
}
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{ // *Map code* }
}
- 我应该在哪里调用
void configure(JobConf job)
功能? - 我在哪里可以使用
private Path[] localArchives
对象? - 我的
configure()
函数中的代码是否正确地访问档案中的文件并将文件链接到BufferedReader?
'''setup'和'configure'函数是否在Mapper类中? “JobConf作业”将超出范围。另外,我收到了'DistributedCache'和'URI'的未定义符号错误。任何想法我失踪?感谢您所有的帮助! – LeonardBlunderbuss
安装程序来自新API并在映射程序类中使用。我建议您在此处开始使用新API http://hadoop.apache.org/docs/stable2/api/org/apache/hadoop/mapreduce/Mapper.html – rVr