2017-03-07 37 views
0

我试图使用MongoDB连接器的Hadoop与Spark查询MongoDB中的一个集合,并插入所有文件检索到另一个集合。 MongoUpdateWritable类用于RDD的值来更新MongoDB中的集合,并且它有一个upsert标志。不幸的是,upsert标志似乎对执行没有影响。代码正在执行,没有错误,就像upsert标志被设置为false一样。 (Scala)代码连接到本地主机mongod进程,使用mongo客户端写入一些数据,然后尝试读取该数据并使用spark将其写入同一数据库中的另一个集合。在写入没有完成之后,代码通过具有相同ID的mongo客户端将文档写入目标表,并运行相同的spark工作,以显示upsert的更新部分工作正常。mongo-hadoop包装upsert与火花似乎并没有工作

火花版本:1.6.0-cdh5.7.0

hadoop的版本:2.6.0-cdh5.4.7

蒙戈版本:3.2.0

蒙戈-的hadoop核版本:2.0 .2

import com.mongodb.client.{FindIterable, MongoCollection, MongoDatabase} 
import com.mongodb.{BasicDBObject, DBCollection, MongoClient} 
import com.mongodb.hadoop.io.MongoUpdateWritable 
import org.apache.hadoop.conf.Configuration 
import org.bson.{BSONObject, BasicBSONObject, Document} 
import com.mongodb.hadoop.{MongoInputFormat, MongoOutputFormat} 
import org.apache.spark.rdd.RDD 
import org.apache.spark.{SparkConf, SparkContext} 

object sparkTest extends App { 

    //setting up mongo 
    val mongo: MongoDatabase = new MongoClient("localhost",27017).getDatabase("test") 
    var source: MongoCollection[Document] = mongo.getCollection("source") 
    val target: MongoCollection[Document] = mongo.getCollection("target") 
    source.drop() 
    target.drop() 
    //inserting document 
    val sourceDoc = new Document() 
    sourceDoc.put("unchanged","this field should not be changed") 
    sourceDoc.put("_id","1") 
    source.insertOne(sourceDoc) 

    //setting up spark 
    val conf = new SparkConf().setAppName("test mongo with spark").setMaster("local") 
    val mongoConfig = new Configuration() 
    val sc = new SparkContext(conf) 
    mongoConfig.set("mongo.input.uri", 
    "mongodb://localhost:27017/test.source") 
    mongoConfig.set("mongo.output.uri", 
    "mongodb://localhost:27017/test.target") 

    //setting up read 
    val documents = sc.newAPIHadoopRDD(
    mongoConfig,    // Configuration 
    classOf[MongoInputFormat], // InputFormat 
    classOf[Object],   // Key type 
    classOf[BSONObject])  // Value type 

    //building updates with no document matching the query in the target collection 
    val upsert_insert_rdd: RDD[(Object, MongoUpdateWritable)] =  documents.mapValues(
    (value: BSONObject) => { 

    val query = new BasicBSONObject 
    query.append("_id", value.get("_id").toString) 

    val update = new BasicBSONObject(value.asInstanceOf[BasicBSONObject]) 
    update.append("added","this data will be added") 

    println("val:"+value.toString) 
    println("query:"+query.toString) 
    println("update:"+update.toString) 

    new MongoUpdateWritable(
    query, // Query 
    update, // Update 
    true, // Upsert flag 
    false, // Update multiple documents flag 
    true // Replace flag 
    )} 
) 
    //saving updates 
    upsert_insert_rdd.saveAsNewAPIHadoopFile(
    "", 
    classOf[Object], 
    classOf[MongoUpdateWritable], 
    classOf[MongoOutputFormat[Object, MongoUpdateWritable]], 
    mongoConfig) 

    // At this point, there should be a new document in the target database, but there is not. 
    val count = target.count() 
    println("count after insert: "+count+", expected: 1") 

    //adding doc to display working update. This code will throw an exception  if there is a 
    //document with a matching _id field in the collection, so if this breaks that means the upsert worked! 
    val targetDoc = new Document() 
    targetDoc.put("overwritten","this field should not be changed") 
    targetDoc.put("_id","1") 
    target.insertOne(targetDoc) 

    //building updates when a document matching the query exists in the target collection 
    val upsert_update_rdd: RDD[(Object, MongoUpdateWritable)] = documents.mapValues(
    (value: BSONObject) => { 

     val query = new BasicBSONObject 
     query.append("_id", value.get("_id").toString) 

     val update = new BasicBSONObject(value.asInstanceOf[BasicBSONObject]) 
     update.append("added","this data will be added") 

     println("val:"+value.toString) 
     println("query:"+query.toString) 
     println("update:"+update.toString) 

     new MongoUpdateWritable(
     query, // Query 
     update, // Update 
     true, // Upsert flag 
     false, // Update multiple documents flag 
     true // Replace flag 
    )} 
) 
    //saving updates 
    upsert_update_rdd.saveAsNewAPIHadoopFile(
    "", 
    classOf[Object], 
    classOf[MongoUpdateWritable], 
    classOf[MongoOutputFormat[Object, MongoUpdateWritable]], 
    mongoConfig) 

    //checking that the update succeeded. should print: 
    //contains new field:true, contains overwritten field:false 
    val ret = target.find().first 
    if (ret != null) 
    println("contains new field:"+ret.containsKey("added")+", contains overwritten field:"+ret.containsKey("overwritten")) 
    else 
    println("no documents found in target") 
} 

任何洞察我所缺少的将有所帮助。我曾尝试将输出格式更改为MongoUpdateWritable,但这对行为没有影响。我知道这可能是一个配置问题,但它似乎是mongo hadoop适配器的一个bug,因为使用它们的输入和输出格式编写文档以及MongoUpdateWritable类可以成功读写文档。

POM为了方便:

<?xml version="1.0" encoding="UTF-8"?> 
<project xmlns="http://maven.apache.org/POM/4.0.0" 
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <modelVersion>4.0.0</modelVersion> 

    <groupId>test</groupId> 
    <artifactId>spark_mongo_upsert_test</artifactId> 
    <version>1.0-SNAPSHOT</version> 

    <properties> 
     <spark.version>1.6.0-cdh5.7.0</spark.version> 
     <mongo.version>3.2.0</mongo.version> 
     <mongo.hadoop.version>2.0.2</mongo.hadoop.version> 
    </properties> 

    <dependencies> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-core_2.10</artifactId> 
      <version>${spark.version}</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-sql_2.10</artifactId> 
      <version>${spark.version}</version> 
     </dependency> 

     <dependency> 
      <groupId>org.mongodb.mongo-hadoop</groupId> 
      <artifactId>mongo-hadoop-core</artifactId> 
      <version>2.0.2</version> 
     </dependency> 
     <dependency> 
      <groupId>org.mongodb.</groupId> 
      <artifactId>mongo-java-driver</artifactId> 
      <version>${mongo.version}</version> 
     </dependency> 

    </dependencies> 

    <build> 
     <plugins> 
      <!-- Plugin to compile Scala code --> 
      <plugin> 
       <groupId>net.alchim31.maven</groupId> 
       <artifactId>scala-maven-plugin</artifactId> 
       <version>3.2.1</version> 
      </plugin> 
     </plugins> 
    </build> 

</project> 

回答

0

保存数据集的MongoDB包含一个_id字段将更换和更新插入任何现有的文件。