2014-11-24 73 views
0

我正在尝试将HDFS数据移入MongoDB。我知道如何使用sqoop将数据导出到mysql中。我不认为我可以使用MongoDb的sqoop。我需要帮助了解如何做到这一点。将HDFS数据移入MongoDB

回答

0

基本问题是mongo以BSON格式(二进制JSON)存储其数据,而您的hdfs数据可能有不同的格式(txt,sequence,avro)。最容易做的事情是使用猪用这个驱动程序加载结果:

https://github.com/mongodb/mongo-hadoop/tree/master/pig

到蒙戈分贝。你必须将你的值映射到你的集合 - 在git hub页面上有一个很好的例子。

1

此配方将使用MongoOutputFormat类将HDFS实例 中的数据加载到MongoDB集合中。

Getting ready 

最简单的方式开始使用Hadoop的蒙哥适配器是克隆从GitHub蒙戈 - Hadoop的 项目,并建立配置Hadoop的特定版本的项目。必须安装Git 客户端才能克隆此项目。 这个配方假设你使用的是Hadoop的CDH3发行版。 官方Git客户端可在http://git-scm.com/downloads找到。

Mongo Hadoop适配器可以在GitHub上找到https://github.com/mongodb/ mongo-hadoop。该项目需要为特定版本的Hadoop构建。生成的 JAR文件必须安装在$ HADOOP_HOME/lib文件夹中的每个节点上。 需要在$ HADOOP_HOME/ lib文件夹中的每个节点上安装Mongo Java驱动程序。它可以在https://github.com/mongodb/mongo-java-driver/ 下载找到。

怎么办呢?

Complete the following steps to copy data form HDFS into MongoDB: 
    1. Clone the mongo-hadoop repository with the following command line: 
    git clone https://github.com/mongodb/mongo-hadoop.git 


    2. Switch to the stable release 1.0 branch: 
    git checkout release-1.0 


    3. Set the Hadoop version which mongo-hadoop should target. In the folder 
    that mongo-hadoop was cloned to, open the build.sbt file with a text editor. 
    Change the following line: 
    hadoopRelease in ThisBuild := "default" 
    to 
    hadoopRelease in ThisBuild := "cdh3" 


    4. Build mongo-hadoop : 
    ./sbt package 
    This will create a file named mongo-hadoop-core_cdh3u3-1.0.0.jar in the 
    core/target folder. 


    5. Download the MongoDB Java Driver Version 2.8.0 from https://github.com/ 
    mongodb/mongo-java-driver/downloads . 


    6. Copy mongo-hadoop and the MongoDB Java Driver to $HADOOP_HOME/lib on 
    each node: 


    cp mongo-hadoop-core_cdh3u3-1.0.0.jar mongo-2.8.0.jar $HADOOP_ 
    HOME/lib 


    7. Create a Java MapReduce program that will read the weblog_entries.txt file 
    from HDFS and write them to MongoDB using the MongoOutputFormat class: 


import java.io.*; 
import org.apache.commons.logging.*; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.*; 
import org.bson.*; 
import org.bson.types.ObjectId; 
import com.mongodb.hadoop.*; 
import com.mongodb.hadoop.util.*; 
public class ExportToMongoDBFromHDFS { 
private static final Log log = LogFactory.getLog(ExportToMongoDBFromHDFS.class); 
public static class ReadWeblogs extends Mapper<LongWritable, Text, ObjectId, BSONObject>{ 
public void map(Text key, Text value, Context context) 
throws IOException, InterruptedException{ 
      System.out.println("Key: " + key); 
      System.out.println("Value: " + value); 
      String[] fields = value.toString().split("\t"); 
      String md5 = fields[0]; 
      String url = fields[1]; 
      String date = fields[2]; 
      String time = fields[3]; 
      String ip = fields[4]; 
      BSONObject b = new BasicBSONObject(); 
      b.put("md5", md5); 
      b.put("url", url); 
      b.put("date", date); 
      b.put("time", time); 
       b.put("ip", ip); 
      context.write(new ObjectId(), b); 
     } 
} 
public static void main(String[] args) throws Exception{ 
     final Configuration conf = new Configuration(); 
      MongoConfigUtil.setOutputURI(conf,"mongodb://<HOST>:<PORT>/test.  weblogs"); 
System.out.println("Configuration: " + conf); 
     final Job job = new Job(conf, "Export to Mongo"); 
     Path in = new Path("/data/weblogs/weblog_entries.txt"); 
      FileInputFormat.setInputPaths(job, in); 
      job.setJarByClass(ExportToMongoDBFromHDFS.class); 
      job.setMapperClass(ReadWeblogs.class); 
      job.setOutputKeyClass(ObjectId.class); 
      job.setOutputValueClass(BSONObject.class); 
      job.setInputFormatClass(TextInputFormat.class); 
      job.setOutputFormatClass(MongoOutputFormat.class); 
      job.setNumReduceTasks(0); 
      System.exit(job.waitForCompletion(true) ? 0 : 1); 
     } 
} 

8. Export as a runnable JAR file and run the job: 
hadoop jar ExportToMongoDBFromHDFS.jar 
9. Verify that the weblogs MongoDB collection was populated from the Mongo shell: 
db.weblogs.find(); 
+0

能否请您发布的map-reduce代码从MongoDB中收集数据并将其存储在HDFS。 – MChirukuri 2015-07-14 11:23:16