2016-11-04 123 views
3

我正在Spark应用程序和Mongo控制台上运行相同的聚合管道。在控制台上,数据在一眨眼之间就被提取出来了,只需要第二次使用“it”来检索所有预期的数据。 根据Spark WebUI,Spark应用程序需要将近两分钟的时间。MongoDB Spark连接器 - 聚合速度慢

enter image description here

正如你所看到的,242级的任务正在推出,以获取结果。我不确定为什么会启动这么大量的任务,而MongoDB汇总只返回40个文档。它看起来有很高的开销。

我的Mongos控制台上运行查询:

db.data.aggregate([ 
    { 
     $match:{ 
     signals:{ 
      $elemMatch:{ 
       signal:"SomeSignal", 
       value:{ 
        $gt:0, 
        $lte:100 
       } 
      } 
     } 
     } 
    }, 
    { 
     $group:{ 
     _id:"$root_document", 
     firstTimestamp:{ 
      $min:"$ts" 
     }, 
     lastTimestamp:{ 
      $max:"$ts" 
     }, 
     count:{ 
      $sum:1 
     } 
     } 
    } 
]) 

的Spark应用程序代码

JavaMongoRDD<Document> rdd = MongoSpark.load(sc); 

    JavaMongoRDD<Document> aggregatedRdd = rdd.withPipeline(Arrays.asList(
      Document.parse(
        "{ $match: { signals: { $elemMatch: { signal: \"SomeSignal\", value: { $gt: 0, $lte: 100 } } } } }"), 
      Document.parse(
        "{ $group : { _id : \"$root_document\", firstTimestamp: { $min: \"$ts\"}, lastTimestamp: { $max: \"$ts\"} , count: { $sum: 1 } } }"))); 

    JavaRDD<String> outputRdd = aggregatedRdd.map(new Function<Document, String>() { 
     @Override 
     public String call(Document arg0) throws Exception { 
      String output = String.format("%s;%s;%s;%s", arg0.get("_id").toString(), 
        arg0.get("firstTimestamp").toString(), arg0.get("lastTimestamp").toString(), 
        arg0.get("count").toString()); 
      return output; 
     } 
    }); 

    outputRdd.saveAsTextFile("/user/spark/output"); 

在那之后,我用hdfs dfs -getmerge /user/spark/output/ output.csv并比较结果。

聚合为什么这么慢?是不是拨打withPipeline意味着减少需要传输到Spark的数据量?它看起来不像Mongo控制台那样进行相同的聚合。在Mongos控制台上,它正在快速发展。我正在使用Spark 1.6.1和mongo-spark-connector_2.10版本1.1.0。

编辑:我想知道的另一件事是两个执行程序启动(因为我使用默认执行设置atm),但只有一个执行程序完成所有工作。为什么不是第二个执行者做任何工作?

enter image description here

编辑2:当使用不同的聚合管道,并呼吁.count()代替saveAsTextFile(..),也有正在创建242个任务。这次将返回65.000个文件。 enter image description here

+1

我会更多地关注用户界面,试图了解242个任务是什么。有了40个文件,我想可以将它们放在一个分区中。 – Ross

+0

@Ross当我运行一个不同的查询和'.count()''aggregatedRdd'而不是将其保存到hdfs时,还会创建242个任务。不同的查询返回几百万个文档。我的收集统计数据是:'数据:15.01GiB文档:45141000大块:443'。我怀疑写入HDFS是个问题。这只是我的Spark应用程序中调用的唯一操作,这就是为什么它被列为Web UI中唯一的阶段。还是我误会了? – j9dy

+0

@Ross我总觉得没有执行聚合管道。我是否必须专门执行聚合管道? – j9dy

回答

2

大量的任务是由默认的Mongo Spark分区策略引起的。计算分区时它忽略聚合管道,主要有两个原因:

  1. 它减少了计算分区的成本
  2. 确保对所有分片和非碎片化partitioners

相同的行为。然而,因为你发现他们可以生成空的分区,在你的情况是昂贵的。

用于固定的选择可能是:

  1. 更改分区策略

    对于选择其他分区,以减少分区的数量。例如,PaginateByCount会将数据库拆分成一组数量的分区。

    创建您自己的分区器 - 只需实现特性,您就可以应用聚合管道并对结果进行分区。举例来说,请参阅HalfwayPartitionercustom partitioner test

  2. Pre使用$ out将结果集合到一个集合中并从那里读取。

  3. 使用coalesce(N)将分区合并在一起并减少分区数量。
  4. 增加spark.mongodb.input.partitionerOptions.partitionSizeMB配置以产生更少的分区。

自定义分区程序应该会产生最好的解决方案,但是有办法更好地使用可用的默认分区程序。

如果您认为应该有一个使用聚合管道计算分区的默认分区程序,那么请向MongoDB Spark Jira project添加一张标签。

+0

我可以使用'MongoShardedPartitioner'作为具有哈希分片的集合吗?文档中说'shardkey - 该字段应该被索引并包含唯一值.'在我的情况下,我的字段log_file_name:day_of_timestamp:hour_of_timestamp有一个合并的分区键,从而将相关数据保存在一起 - 至少是I希望这样做。但预值散列值不是唯一的。文档是否讨论了哈希值?另外,我还有一个关于如何在聊天中使用MongoSpark进行多个查询的小问题 - 如果你介意看看它。 – j9dy