我正在Spark应用程序和Mongo控制台上运行相同的聚合管道。在控制台上,数据在一眨眼之间就被提取出来了,只需要第二次使用“it”来检索所有预期的数据。 根据Spark WebUI,Spark应用程序需要将近两分钟的时间。MongoDB Spark连接器 - 聚合速度慢
正如你所看到的,242级的任务正在推出,以获取结果。我不确定为什么会启动这么大量的任务,而MongoDB汇总只返回40个文档。它看起来有很高的开销。
我的Mongos控制台上运行查询:
db.data.aggregate([
{
$match:{
signals:{
$elemMatch:{
signal:"SomeSignal",
value:{
$gt:0,
$lte:100
}
}
}
}
},
{
$group:{
_id:"$root_document",
firstTimestamp:{
$min:"$ts"
},
lastTimestamp:{
$max:"$ts"
},
count:{
$sum:1
}
}
}
])
的Spark应用程序代码
JavaMongoRDD<Document> rdd = MongoSpark.load(sc);
JavaMongoRDD<Document> aggregatedRdd = rdd.withPipeline(Arrays.asList(
Document.parse(
"{ $match: { signals: { $elemMatch: { signal: \"SomeSignal\", value: { $gt: 0, $lte: 100 } } } } }"),
Document.parse(
"{ $group : { _id : \"$root_document\", firstTimestamp: { $min: \"$ts\"}, lastTimestamp: { $max: \"$ts\"} , count: { $sum: 1 } } }")));
JavaRDD<String> outputRdd = aggregatedRdd.map(new Function<Document, String>() {
@Override
public String call(Document arg0) throws Exception {
String output = String.format("%s;%s;%s;%s", arg0.get("_id").toString(),
arg0.get("firstTimestamp").toString(), arg0.get("lastTimestamp").toString(),
arg0.get("count").toString());
return output;
}
});
outputRdd.saveAsTextFile("/user/spark/output");
在那之后,我用hdfs dfs -getmerge /user/spark/output/ output.csv
并比较结果。
聚合为什么这么慢?是不是拨打withPipeline
意味着减少需要传输到Spark的数据量?它看起来不像Mongo控制台那样进行相同的聚合。在Mongos控制台上,它正在快速发展。我正在使用Spark 1.6.1和mongo-spark-connector_2.10版本1.1.0。
编辑:我想知道的另一件事是两个执行程序启动(因为我使用默认执行设置atm),但只有一个执行程序完成所有工作。为什么不是第二个执行者做任何工作?
编辑2:当使用不同的聚合管道,并呼吁.count()
代替saveAsTextFile(..)
,也有正在创建242个任务。这次将返回65.000个文件。
我会更多地关注用户界面,试图了解242个任务是什么。有了40个文件,我想可以将它们放在一个分区中。 – Ross
@Ross当我运行一个不同的查询和'.count()''aggregatedRdd'而不是将其保存到hdfs时,还会创建242个任务。不同的查询返回几百万个文档。我的收集统计数据是:'数据:15.01GiB文档:45141000大块:443'。我怀疑写入HDFS是个问题。这只是我的Spark应用程序中调用的唯一操作,这就是为什么它被列为Web UI中唯一的阶段。还是我误会了? – j9dy
@Ross我总觉得没有执行聚合管道。我是否必须专门执行聚合管道? – j9dy