2014-01-20 24 views
33

我的地图任务需要一些配置数据,我希望通过分布式缓存进行分发。Hadoop DistributedCache已弃用 - 首选API是什么?

Hadoop的MapReduce Tutorial显示DistributedCache类的usage,大致如下:

// In the driver 
JobConf conf = new JobConf(getConf(), WordCount.class); 
... 
DistributedCache.addCacheFile(new Path(filename).toUri(), conf); 

// In the mapper 
Path[] myCacheFiles = DistributedCache.getLocalCacheFiles(job); 
... 

然而,DistributedCache是Hadoop中2.2.0 marked as deprecated

什么是实现此目的的首选新方法?是否有涵盖此API的最新示例或教程?

回答

46

分布式缓存的API可以在Job类本身中找到。这里检查文档:http://hadoop.apache.org/docs/stable2/api/org/apache/hadoop/mapreduce/Job.html 代码应该是这样的

Job job = new Job(); 
... 
job.addCacheFile(new Path(filename).toUri()); 

在你的映射器代码:

Path[] localPaths = context.getLocalCacheFiles(); 
... 
+0

谢谢 - 我假定我需要使用更新的'mapreduce' API而不是'mapred',否则'JobContext'对象不会提供给映射器。 – DNA

+0

是的,你是对的。 – user2371156

+10

我认为'getLocalCacheFiles()'被弃用,但'getCacheFiles()'确定 - 虽然返回的URI不是路径。 – DNA

5

新DistributedCache API纱线/ MR2在org.apache.hadoop.mapreduce.Job类中。

Job.addCacheFile() 

不幸的是,目前还没有很多全面的教程式的例子。

http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapreduce/Job.html#addCacheFile%28java.net.URI%29

+0

我不知道如何检索这些使用'Job.addCacheFile(URI)'添加的缓存文件。它不适合我使用旧的方式('context.getCacheFiles()'),因为这些文件是空的。 – tolgap

15

要扩大@jtravaglini,使用DistributedCache纱线的首选方式/ MapReduce的2如下:

在你的驱动程序,使用Job.addCacheFile()

public int run(String[] args) throws Exception { 
    Configuration conf = getConf(); 

    Job job = Job.getInstance(conf, "MyJob"); 

    job.setMapperClass(MyMapper.class); 

    // ... 

    // Mind the # sign after the absolute file location. 
    // You will be using the name after the # sign as your 
    // file name in your Mapper/Reducer 
    job.addCacheFile(new URI("/user/yourname/cache/some_file.json#some")); 
    job.addCacheFile(new URI("/user/yourname/cache/other_file.json#other")); 

    return job.waitForCompletion(true) ? 0 : 1; 
} 

而且在您的Mapper/Reducer,覆盖setup(Context context)方法:

@Override 
protected void setup(
     Mapper<LongWritable, Text, Text, Text>.Context context) 
     throws IOException, InterruptedException { 
    if (context.getCacheFiles() != null 
      && context.getCacheFiles().length > 0) { 

     File some_file = new File("./some"); 
     File other_file = new File("./other"); 

     // Do things to these two files, like read them 
     // or parse as JSON or whatever. 
    } 
    super.setup(context); 
} 
0

我有同样的问题。 DistributedCach不仅是弃用的,而且也是getLocalCacheFiles和“new Job”。那么,什么工作对我来说是这样的:

司机:

Configuration conf = getConf(); 
Job job = Job.getInstance(conf); 
... 
job.addCacheFile(new Path(filename).toUri()); 

在映射/减速设置:

@Override 
protected void setup(Context context) throws IOException, InterruptedException 
{ 
    super.setup(context); 

    URI[] files = context.getCacheFiles(); // getCacheFiles returns null 

    Path file1path = new Path(files[0]) 
    ... 
} 
2

我没有用job.addCacheFile()。相反,我使用了像-files /path/to/myfile.txt#myfile这样的-files选项。然后,在映射器或减速器代码我使用下面的方法:

/** 
* This method can be used with local execution or HDFS execution. 
* 
* @param context 
* @param symLink 
* @param throwExceptionIfNotFound 
* @return 
* @throws IOException 
*/ 
public static File findDistributedFileBySymlink(JobContext context, String symLink, boolean throwExceptionIfNotFound) throws IOException 
{ 
    URI[] uris = context.getCacheFiles(); 
    if(uris==null||uris.length==0) 
    { 
     if(throwExceptionIfNotFound) 
      throw new RuntimeException("Unable to find file with symlink '"+symLink+"' in distributed cache"); 
     return null; 
    } 
    URI symlinkUri = null; 
    for(URI uri: uris) 
    { 
     if(symLink.equals(uri.getFragment())) 
     { 
      symlinkUri = uri; 
      break; 
     } 
    } 
    if(symlinkUri==null) 
    { 
     if(throwExceptionIfNotFound) 
      throw new RuntimeException("Unable to find file with symlink '"+symLink+"' in distributed cache"); 
     return null; 
    } 
    //if we run this locally the file system URI scheme will be "file" otherwise it should be a symlink 
    return "file".equalsIgnoreCase(FileSystem.get(context.getConfiguration()).getScheme())?(new File(symlinkUri.getPath())):new File(symLink); 

} 

然后在映射器/减速器:

@Override 
protected void setup(Context context) throws IOException, InterruptedException 
{ 
    super.setup(context); 

    File file = HadoopUtils.findDistributedFileBySymlink(context,"myfile",true); 
    ... do work ... 
} 

注意,如果我使用的“-files /path/to/myfile.txt”直接然后我需要使用“myfile.txt”来访问该文件,因为这是默认的符号链接名称。

1

没有提到的解决方案为我工作的完整性。这可能是因为Hadoop版本不断变化,我正在使用hadoop 2.6.4。本质上,DistributedCache已被弃用,所以我不想使用它。正如一些帖子建议我们使用addCacheFile(),但它已经改变了一点。下面是它是如何工作的

job.addCacheFile(new URI("hdfs://X.X.X.X:9000/EnglishStop.txt#EnglishStop.txt")); 

这里X.X.X.X可以是主IP地址或本地主机。 EnglishStop.txt存储在HDFS的/位置。

hadoop fs -ls/

输出是

-rw-r--r-- 3 centos supergroup  1833 2016-03-12 20:24 /EnglishStop.txt 
drwxr-xr-x - centos supergroup   0 2016-03-12 19:46 /test 

滑稽,但方便,现在#EnglishStop.txt意味着我们可以在映射器访问它为 “EnglishStop.txt”。这里是代码相同

public void setup(Context context) throws IOException, InterruptedException  
{ 
    File stopwordFile = new File("EnglishStop.txt"); 
    FileInputStream fis = new FileInputStream(stopwordFile); 
    BufferedReader reader = new BufferedReader(new InputStreamReader(fis)); 

    while ((stopWord = reader.readLine()) != null) { 
     // stopWord is a word read from Cache 
    } 
} 

这只是为我工作。你可以读取存储在HDFS中的文件中的行

相关问题