我跟着mongo-hadoop连接器的documentation。如何使用mongo-hadoop连接器使用spark来保存mongo集合中的数据?
我能够将数据从inputCol
收集testDB
数据库传输到outputCol
收集利用:
Configuration mongodbConfig = new Configuration();
mongodbConfig.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat");
mongodbConfig.set("mongo.input.uri", "mongodb://localhost:27017/testDB.inputCol");
JavaSparkContext sc = new JavaSparkContext(sparkClient.sparkContext);
JavaPairRDD<Object, BSONObject> documents = sc.newAPIHadoopRDD(
mongodbConfig, // Configuration
MongoInputFormat.class, // InputFormat: read from a live cluster.
Object.class, // Key class
BSONObject.class // Value class
);
Configuration outputConfig = new Configuration();
outputConfig.set("mongo.output.format",
"com.mongodb.hadoop.MongoOutputFormat");
outputConfig.set("mongo.output.uri",
"mongodb://localhost:27017/testDB.outputCol");
documents.saveAsNewAPIHadoopFile(
"file:///this-is-completely-unused",
Object.class,
BSONObject.class,
MongoOutputFormat.class,
outputConfig
);
我要救一个简单的文件说
{"_id":1, "name":"dev"}
在outputCol
收集testDB
数据库。
我该如何做到这一点?