2014-10-30 48 views
4

我是新来的hadoop mapreduce使用java处理JSON Mapreduce

我有输入文本文件,其中数据已被存储如下。这里只有几元组(data.txt中)

{"author":"Sharīf Qāsim","book":"al- Rabīʻ al-manshūd"} 
{"author":"Nāṣir Nimrī","book":"Adīb ʻAbbāsī"} 
{"author":"Muẓaffar ʻAbd al-Majīd Kammūnah","book":"Asmāʼ Allāh al-ḥusná al-wāridah fī muḥkam kitābih"} 
{"author":"Ḥasan Muṣṭafá Aḥmad","book":"al- Jabhah al-sharqīyah wa-maʻārikuhā fī ḥarb Ramaḍān"} 
{"author":"Rafīqah Salīm Ḥammūd","book":"Taʻlīm fī al-Baḥrayn"} 

这是我应该写我的(CombineBooks.java)代码我的Java文件

package org.hwone; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.util.GenericOptionsParser; 

//TODO import necessary components 

/* 
* Modify this file to combine books from the same other into 
* single JSON object. 
* i.e. {"author": "Tobias Wells", "books": [{"book":"A die in the country"},{"book": "Dinky died"}]} 
* Beaware that, this may work on anynumber of nodes! 
* 
*/ 

public class CombineBooks { 

    //TODO define variables and implement necessary components 

    public static void main(String[] args) throws Exception { 
    Configuration conf = new Configuration(); 
    String[] otherArgs = new GenericOptionsParser(conf, args) 
       .getRemainingArgs(); 
    if (otherArgs.length != 2) { 
     System.err.println("Usage: CombineBooks <in> <out>"); 
     System.exit(2); 
    } 

    //TODO implement CombineBooks 

    Job job = new Job(conf, "CombineBooks"); 

    //TODO implement CombineBooks 

    System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } 
} 

我的任务是创建一个在“问题2”目录中返回的“CombineBooks.java” 中的Hadoop程序。该计划应该做 如下:鉴于输入作者书元组,地图,减少 程序应该procude其中包含一个JSON数组都来自同一作者的 书JSON对象,即

{"author": "Tobias Wells", "books":[{"book":"A die in the country"},{"book": "Dinky died"}]} 

任何想法如何做到?

+0

怎么样使用Apache钻和SQL? – 2016-10-07 10:34:24

回答

11

首先,您尝试使用的JSON对象不适用于您。为了解决这个问题:

  1. 去这里下载的ZIP:https://github.com/douglascrockford/JSON-java
  2. 提取到你的源代码中的子目录文件夹组织/ JSON/*

接下来,你的代码的第一线,使包“org.json”,这是不正确的,你需要创建一个单独的包,例如“my.books”。

第三,在这里使用合并器是没有用的。

这是我最后的代码,它的工作原理和解决您的问题:

package my.books; 
import java.io.IOException; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
import org.apache.hadoop.util.GenericOptionsParser; 
import org.json.*; 

import javax.security.auth.callback.TextInputCallback; 

public class CombineBooks { 

    public static class Map extends Mapper<LongWritable, Text, Text, Text>{ 

     public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ 

      String author; 
      String book; 
      String line = value.toString(); 
      String[] tuple = line.split("\\n"); 
      try{ 
       for(int i=0;i<tuple.length; i++){ 
        JSONObject obj = new JSONObject(tuple[i]); 
        author = obj.getString("author"); 
        book = obj.getString("book"); 
        context.write(new Text(author), new Text(book)); 
       } 
      }catch(JSONException e){ 
       e.printStackTrace(); 
      } 
     } 
    } 

    public static class Reduce extends Reducer<Text,Text,NullWritable,Text>{ 

     public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{ 

      try{ 
       JSONObject obj = new JSONObject(); 
       JSONArray ja = new JSONArray(); 
       for(Text val : values){ 
        JSONObject jo = new JSONObject().put("book", val.toString()); 
        ja.put(jo); 
       } 
       obj.put("books", ja); 
       obj.put("author", key.toString()); 
       context.write(NullWritable.get(), new Text(obj.toString())); 
      }catch(JSONException e){ 
       e.printStackTrace(); 
      } 
     } 
    } 

    public static void main(String[] args) throws Exception { 
     Configuration conf = new Configuration(); 
     if (args.length != 2) { 
      System.err.println("Usage: CombineBooks <in> <out>"); 
      System.exit(2); 
     } 

     Job job = new Job(conf, "CombineBooks"); 
     job.setJarByClass(CombineBooks.class); 
     job.setMapperClass(Map.class); 
     job.setReducerClass(Reduce.class); 
     job.setMapOutputKeyClass(Text.class); 
     job.setMapOutputValueClass(Text.class); 
     job.setOutputKeyClass(NullWritable.class); 
     job.setOutputValueClass(Text.class); 
     job.setInputFormatClass(TextInputFormat.class); 
     job.setOutputFormatClass(TextOutputFormat.class); 

     FileInputFormat.addInputPath(job, new Path(args[0])); 
     FileOutputFormat.setOutputPath(job, new Path(args[1])); 

     System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } 
} 

这里是我的项目的文件夹结构:

src 
src/my 
src/my/books 
src/my/books/CombineBooks.java 
src/org 
src/org/json 
src/org/json/zip 
src/org/json/zip/BitReader.java 
... 
src/org/json/zip/None.java 
src/org/json/JSONStringer.java 
src/org/json/JSONML.java 
... 
src/org/json/JSONException.java 

这里的输入

[localhost:CombineBooks]$ hdfs dfs -cat /example.txt 
{"author":"author1", "book":"book1"} 
{"author":"author1", "book":"book2"} 
{"author":"author1", "book":"book3"} 
{"author":"author2", "book":"book4"} 
{"author":"author2", "book":"book5"} 
{"author":"author3", "book":"book6"} 

要运行的命令:

hadoop jar ./bookparse.jar my.books.CombineBooks /example.txt /test_output 

下面是输出:

[pivhdsne:CombineBooks]$ hdfs dfs -cat /test_output/part-r-00000 
{"books":[{"book":"book3"},{"book":"book2"},{"book":"book1"}],"author":"author1"} 
{"books":[{"book":"book5"},{"book":"book4"}],"author":"author2"} 
{"books":[{"book":"book6"}],"author":"author3"} 

您可以使用三个选项中把org.json.*类到您的集群:

  1. 包的org.json.*类到您的jar文件(可以很容易地使用GUI IDE完成)。这是我在答案中使用的选项
  2. 将包含每个群集节点上的org.json.*类的jar文件放入其中一个CLASSPATH目录中(请参阅yarn.application。类路径)
  3. 将包含org.json.*的jar文件放入HDFS(hdfs dfs -put <org.json jar> <hdfs path>)中,并使用job.addFileToClassPath调用此jar文件以用于在集群上执行作业的所有任务。在我的答案,你应该添加job.addFileToClassPath(new Path("<jar_file_on_hdfs_location>"));main
+0

+1此解决方案没有'job.addFileToClassPath'语句是不完整的。请添加。 – blackSmith 2014-11-04 07:08:21

+1

这取决于。您可以将'org.json'内容构建为单独的jar文件,并手动将其放置在每个集群节点的'CLASSPATH'目录中(因为JSON解析是一项常见任务,所以这是首选解决方案)。或者你可以把'org.json'和'my.books'放到一个jar中,那么你就不必使用'job.addFileToClassPath'。或者,您可以将其构建为单独的jar文件,并在执行时使用'job.addFileToClassPath'将其发送到群集节点。您必须根据任务的上下文选择一个选项,对于生产,我更喜欢1,用于开发和调试 - 2或3 – 0x0FFF 2014-11-05 08:12:10

+0

将相同的行添加到答案将有利于任何查找此帖的人。如果没有这方面的知识,他们会陷入另一个例外,如果以前没有遇到过。这是我的意图。 – blackSmith 2014-11-05 08:29:31