2010-03-23 171 views
110

在应用MapReduce的许多实际情况中,最终的算法最终都是几个MapReduce步骤。在Hadoop中链接多个MapReduce作业

即Map1,Reduce1,Map2,Reduce2等等。

因此,您需要将最后一次减少的输出作为下一个地图的输入。

中间数据是您(一般情况下)在管道成功完成后不想保留的内容。另外,因为这个中间数据通常是一些数据结构(如'map'或'set'),所以您不希望在编写和读取这些键值对时付出太多努力。

在Hadoop中这样做的推荐方式是什么?

是否有一个(简单)示例说明如何以正确的方式处理这些中间数据,包括之后的清理?

+2

使用其MapReduce框架? – skaffman 2010-03-23 12:03:03

+1

我编辑了这个问题来澄清我正在谈论的Hadoop。 – 2010-03-23 13:11:05

+0

我推荐这个swineherd gem:https://github.com/Ganglion/swineherd best,Tobias – Tobias 2011-06-15 09:33:36

回答

52

我觉得这个教程雅虎开发者网络将帮助你这样的:Chaining Jobs

您使用JobClient.runJob()。来自第一份工作的数据的输出路径成为您的第二份工作的输入路径。这些需要作为参数传递给您的作业,并使用适当的代码来解析它们并为作业设置参数。

我认为上述方法可能是现在较老的mapred API的做法,但它应该仍然有效。在新的mapreduce API中会有类似的方法,但我不确定它是什么。

只要在作业完成后删除中间数据,您可以在代码中执行此操作。我之前已经做了它的方式是使用类似:

FileSystem.delete(Path f, boolean recursive); 

所在路径是数据的HDFS的位置。您需要确保只有在没有其他工作需要时才删除此数据。

+2

感谢您的雅虎教程链接。链接工作确实是你想要的,如果两者都在同一个运行。我一直在寻找的是如果你想能够单独运行它们的简单方法。在上述教程中,我发现SequenceFileOutputFormat“将适合读取的二进制文件写入后续的MapReduce作业”以及匹配的SequenceFileInputFormat,这使得它非常容易完成。谢谢。 – 2010-05-17 09:14:40

7

实际上有很多方法可以做到这一点。我将专注于两个。

一个是通过Riffle(http://github.com/cwensel/riffle)一个注释库来识别相关事物并以依赖(拓扑)顺序“执行”它们。

或者您可以在级联(http://www.cascading.org/)中使用级联(和MapReduceFlow)。未来版本将支持Riffle注释,但现在使用原始MR JobConf作业效果很好。

一个变种就是不用手工管理MR作业,而是使用Cascading API开发应用程序。然后,JobConf和作业链通过Cascading planner和Flow类在内部处理。

这样你就可以专注于自己的问题,而不是管理Hadoop作业的机制等等。甚至可以在顶层(如clojure或jruby)上层叠不同的语言,以进一步简化开发和应用程序。 http://www.cascading.org/modules.html

17

有很多方法可以做到这一点。

(1)层叠作业

创建针对第一作业的JobConf对象“JOB1”,并设置所有的参数以“输入”作为inputdirectory和“温度”作为输出目录。执行此任务:

JobClient.run(job1). 

紧下方,创建第二个作业的JobConf对象“作业2”,并设置所有的参数与“温度”为inputdirectory和“输出”的输出目录。执行此任务:

JobClient.run(job2). 

(2)创建两个JobConf对象和设置的所有参数在他们就像(1),只是你不使用JobClient.run。

然后创建两个工作与jobconfs对象作为参数:

Job job1=new Job(jobconf1); 
Job job2=new Job(jobconf2); 

使用JobControl作业控制对象时,你指定作业依赖关系,然后运行的作业:

JobControl jbcntrl=new JobControl("jbcntrl"); 
jbcntrl.addJob(job1); 
jbcntrl.addJob(job2); 
job2.addDependingJob(job1); 
jbcntrl.run(); 

(3)如果你需要一个像Map + |的结构减少| Map *,您可以使用Hadoop版本0.19及之后的ChainMapper和ChainReducer类。

干杯

1

虽然有基于复杂的服务器Hadoop的工作流引擎如Oozie的,我有一个简单的Java库,使多个的Hadoop执行作为工作流程的工作。定义内部作业依赖关系的作业配置和工作流程在JSON文件中配置。所有内容都可以在外部进行配置,并且不需要对现有地图缩减实施进行任何更改即可成为工作流程的一部分。

详情可以在这里找到。源代码和jar在github中可用。

http://pkghosh.wordpress.com/2011/05/22/hadoop-orchestration/

普拉纳布·

1

我觉得Oozie的帮助随之而来的就业机会,直接从以前的任务接收输入。这可以避免使用jobcontrol执行的I/O操作。

3

我们可以利用Job的waitForCompletion(true)方法来定义作业之间的依赖关系。

在我的场景中,我有3个互相依赖的工作。在驱动程序类中,我使用了下面的代码,它按预期工作。

public static void main(String[] args) throws Exception { 
     // TODO Auto-generated method stub 

     CCJobExecution ccJobExecution = new CCJobExecution(); 

     Job distanceTimeFraudJob = ccJobExecution.configureDistanceTimeFraud(new Configuration(),args[0], args[1]); 
     Job spendingFraudJob = ccJobExecution.configureSpendingFraud(new Configuration(),args[0], args[1]); 
     Job locationFraudJob = ccJobExecution.configureLocationFraud(new Configuration(),args[0], args[1]); 

     System.out.println("****************Started Executing distanceTimeFraudJob ================"); 
     distanceTimeFraudJob.submit(); 
     if(distanceTimeFraudJob.waitForCompletion(true)) 
     { 
      System.out.println("=================Completed DistanceTimeFraudJob================= "); 
      System.out.println("=================Started Executing spendingFraudJob ================"); 
      spendingFraudJob.submit(); 
      if(spendingFraudJob.waitForCompletion(true)) 
      { 
       System.out.println("=================Completed spendingFraudJob================= "); 
       System.out.println("=================Started locationFraudJob================= "); 
       locationFraudJob.submit(); 
       if(locationFraudJob.waitForCompletion(true)) 
       { 
        System.out.println("=================Completed locationFraudJob================="); 
       } 
      } 
     } 
    } 
+0

你的答案是关于如何在执行方面加入这些工作。最初的问题是关于最好的数据结构。所以你的答案与这个具体问题无关。 – 2013-01-28 21:03:45

1

如果您想以编程方式链接您的作业,您将无法使用JobControl。用法很简单:

JobControl jobControl = new JobControl(name); 

然后添加ControlledJob实例。 ControlledJob使用它的依赖性定义作业,从而自动插入输入和输出以适应作业的“链”。

jobControl.add(new ControlledJob(job, Arrays.asList(controlledjob1, controlledjob2)); 

    jobControl.run(); 

启动链。你会想要把它放在一个敏捷的线程中。这允许检查链条的状况得到控制而它运行:

while (!jobControl.allFinished()) { 
     System.out.println("Jobs in waiting state: " + jobControl.getWaitingJobList().size()); 
     System.out.println("Jobs in ready state: " + jobControl.getReadyJobsList().size()); 
     System.out.println("Jobs in running state: " + jobControl.getRunningJobList().size()); 
     List<ControlledJob> successfulJobList = jobControl.getSuccessfulJobList(); 
     System.out.println("Jobs in success state: " + successfulJobList.size()); 
     List<ControlledJob> failedJobList = jobControl.getFailedJobList(); 
     System.out.println("Jobs in failed state: " + failedJobList.size()); 
    } 
5

我已经做的工作​​与使用对象JobConf一前一后链接。我以WordCount为例链接工作。一份工作计算出在给定输出中一个词重复了多少次。第二份工作将第一份工作输出作为输入,并计算出给定输入中的总字数。以下是需要放置在Driver类中的代码。

//First Job - Counts, how many times a word encountered in a given file 
    JobConf job1 = new JobConf(WordCount.class); 
    job1.setJobName("WordCount"); 

    job1.setOutputKeyClass(Text.class); 
    job1.setOutputValueClass(IntWritable.class); 

    job1.setMapperClass(WordCountMapper.class); 
    job1.setCombinerClass(WordCountReducer.class); 
    job1.setReducerClass(WordCountReducer.class); 

    job1.setInputFormat(TextInputFormat.class); 
    job1.setOutputFormat(TextOutputFormat.class); 

    //Ensure that a folder with the "input_data" exists on HDFS and contains the input files 
    FileInputFormat.setInputPaths(job1, new Path("input_data")); 

    //"first_job_output" contains data that how many times a word occurred in the given file 
    //This will be the input to the second job. For second job, input data name should be 
    //"first_job_output". 
    FileOutputFormat.setOutputPath(job1, new Path("first_job_output")); 

    JobClient.runJob(job1); 


    //Second Job - Counts total number of words in a given file 

    JobConf job2 = new JobConf(TotalWords.class); 
    job2.setJobName("TotalWords"); 

    job2.setOutputKeyClass(Text.class); 
    job2.setOutputValueClass(IntWritable.class); 

    job2.setMapperClass(TotalWordsMapper.class); 
    job2.setCombinerClass(TotalWordsReducer.class); 
    job2.setReducerClass(TotalWordsReducer.class); 

    job2.setInputFormat(TextInputFormat.class); 
    job2.setOutputFormat(TextOutputFormat.class); 

    //Path name for this job should match first job's output path name 
    FileInputFormat.setInputPaths(job2, new Path("first_job_output")); 

    //This will contain the final output. If you want to send this jobs output 
    //as input to third job, then third jobs input path name should be "second_job_output" 
    //In this way, jobs can be chained, sending output one to other as input and get the 
    //final output 
    FileOutputFormat.setOutputPath(job2, new Path("second_job_output")); 

    JobClient.runJob(job2); 

命令来运行这些作业是:

斌/ Hadoop的罐子TotalWords。

我们需要给出命令的最终作业名称。在上述情况下,它是TotalWords。

4

您可以按照代码中给出的方式运行MR链。

请注意:只有驱动程序代码已经提供

public class WordCountSorting { 
// here the word keys shall be sorted 
     //let us write the wordcount logic first 

     public static void main(String[] args)throws IOException,InterruptedException,ClassNotFoundException { 
      //THE DRIVER CODE FOR MR CHAIN 
      Configuration conf1=new Configuration(); 
      Job j1=Job.getInstance(conf1); 
      j1.setJarByClass(WordCountSorting.class); 
      j1.setMapperClass(MyMapper.class); 
      j1.setReducerClass(MyReducer.class); 

      j1.setMapOutputKeyClass(Text.class); 
      j1.setMapOutputValueClass(IntWritable.class); 
      j1.setOutputKeyClass(LongWritable.class); 
      j1.setOutputValueClass(Text.class); 
      Path outputPath=new Path("FirstMapper"); 
      FileInputFormat.addInputPath(j1,new Path(args[0])); 
        FileOutputFormat.setOutputPath(j1,outputPath); 
        outputPath.getFileSystem(conf1).delete(outputPath); 
      j1.waitForCompletion(true); 
        Configuration conf2=new Configuration(); 
        Job j2=Job.getInstance(conf2); 
        j2.setJarByClass(WordCountSorting.class); 
        j2.setMapperClass(MyMapper2.class); 
        j2.setNumReduceTasks(0); 
        j2.setOutputKeyClass(Text.class); 
        j2.setOutputValueClass(IntWritable.class); 
        Path outputPath1=new Path(args[1]); 
        FileInputFormat.addInputPath(j2, outputPath); 
        FileOutputFormat.setOutputPath(j2, outputPath1); 
        outputPath1.getFileSystem(conf2).delete(outputPath1, true); 
        System.exit(j2.waitForCompletion(true)?0:1); 
     } 

} 

序列是

JOB1)MAP-> REDUCE->(JOB2)MAP
这样做为了获得排序的键还有更多的方法,如使用树形图
但是我想把你的注意力集中到乔布斯链接的方式!
谢谢

2

新类org.apache.hadoop.mapreduce.lib.chain.ChainMapper帮助这种情况下

+1

答案是好的 - 但你应该添加一些关于它做些什么的细节,或者至少是一个API参考链接,这样人们才能投票 – 2016-11-09 17:50:50

+0

ChainMapper和ChainReducer用于在Reduce之前有一个或多个映射器,并且有0个或更多Reduce规范之后的映射器。 (Mapper +)减少(Mapper *)。如果我明显错了,请纠正我,但我不认为这种方法能够按照OP的要求完成连续的工作。 – rahul1210 2017-04-12 05:53:31

0

正如您在您的要求已提到想要MRJob1的O/P是我/ p MRJob2等等,你可以考虑使用oozie工作流来处理这个用例。您也可以考虑将您的中间数据写入HDFS,因为它将被下一个MRJob使用。作业完成后,您可以清理中间数据。

<start to="mr-action1"/> 
<action name="mr-action1"> 
    <!-- action for MRJob1--> 
    <!-- set output path = /tmp/intermediate/mr1--> 
    <ok to="end"/> 
    <error to="end"/> 
</action> 

<action name="mr-action2"> 
    <!-- action for MRJob2--> 
    <!-- set input path = /tmp/intermediate/mr1--> 
    <ok to="end"/> 
    <error to="end"/> 
</action> 

<action name="success"> 
     <!-- action for success--> 
    <ok to="end"/> 
    <error to="end"/> 
</action> 

<action name="fail"> 
     <!-- action for fail--> 
    <ok to="end"/> 
    <error to="end"/> 
</action> 

<end name="end"/> 

相关问题