2017-08-24 89 views
1

我需要从Spark中的MongoDB带来一些数据。我使用了mongo-spark-connector_2.11的spark mongo连接器。下面的书面代码 并运行它在火花壳测试Spark Mongo连接需要很长的时间,然后预计

def createReadConfig(topic: String): ReadConfig = { 
    val user =UserId 
    val pass = Password 
    val host = Host 
    val db = Database 
    val coll = Collection 
    val partitioner = MongoPaginateBySizePartitioner 
    ReadConfig(Map("uri" -> ("mongodb://" + user + ":" + pass + "@" + host + "/" + 
    db), "database" -> db, "collection" -> coll, "partitioner" -> partitioner)) 
} 


val collectionRDD= MongoSpark.load(sc,admissionConfig) 

collectionRDD.filter(doc=>doc.getObjectId("_id")==new ObjectId("objectId")).count 

花更多然后20​​秒,得到的结果,而相同的查询了小于在蒙戈控制台秒。

为什么会发生这种情况,以及如何降低速度差异?

回答

1

为什么会发生这种情况,以及如何降低速度差异?

所不同的是,在执行RDD.filter()负载从MongoDB的数据到火花工人和然后执行filter操作。根据您的网络,数据大小,MongoDB服务器和Spark吸收器,与通过mongo shell执行查询匹配相比,这可能需要更多时间。

你可以利用用于星火MongoDB的连接器的withPipeline功能来利用这一点,例如:

val rdd = MongoSpark.load(sc) 

val aggregatedRDD = rdd.withPipeline(Seq(Document.parse("{ $match: { '_id' : 'some id' } }"))) 

以上将过滤数据,传递文件,星火之前执行MongoDB中的聚合。这减少了从MongoDB服务器到Spark工作人员的数据传输,并增加了利用数据库索引的能力。请参阅MongoDB Spark Connector: Filters and Aggregation

相关问题