2012-05-06 129 views
4

我看到很多SO关于MongoDB聚合的问题,但是,我还没有找到一个完整的解决方案。通过映射的MongoDB独特的价值聚合reduce

这里是我的数据的例子:

{ 
    "fruits" : { 
     "apple" : "red", 
     "orange" : "orange", 
     "plum" : "purple" 
    } 
} 
{ 
    "fruits" : { 
     "apple" : "green", 
     "plum" : "purple" 
    } 
} 
{ 
    "fruits" : { 
     "apple" : "red", 
     "orange" : "yellow", 
     "plum" : "purple" 
    } 
} 

现在,我的目标是确定每种颜色的每个水果的普及,所以像这将是输出集合:

{ 
    "_id" : "apple" 
    "values" : { 
     "red" : 2, 
     "green" : 1 
    } 
} 
{ 
    "_id" : "orange" 
    "values" : { 
     "orange" : 1, 
     "yellow" : 1 
    } 
} 
{ 
    "_id" : "plum" 
    "values" : { 
     "purple" : 3 
    } 
} 

我已经尝试了各种M/R功能,最后他们要么不工作,要么以指数级长。在这个例子(水果)的背景下,我有大约1,000,000个总文件,大约有1000种不同的水果和100,000种颜色。我目前的工作M/R是这样的:

map = function() { 
    if (!this.fruits) return; 
    for (var fruit in this.fruits) { 
     emit(fruit, { 
      val_array: [ 
       {value: this.fruits[fruit], count: 1} 
      ] 
     }); 
    } 
}; 

reduce = function(key, values) { 
    var collection = { 
     val_array: [] 
    }; 
    var found = false; 
    values.forEach(function(map_obj) { 
     map_obj.val_array.forEach(function(value_obj) { 
      found = false; 
      // if exists in collection, inc, else add 
      collection.val_array.forEach(function(coll_obj) { 
       if (coll_obj.value == value_obj.value) { 
        // the collection already has this object, increment it 
        coll_obj.count += value_obj.count; 
        found = true; 
        return; 
       } 
      }); 
      if (!found) { 
       // the collection doesn't have this obj yet, push it 
       collection.val_array.push(value_obj); 
      } 
     }); 
    }); 
    return collection; 
}; 

现在,这样做的工作,并为100个记录,它只需一秒钟左右,但时间非线性增加,所以100M记录将采取很长时间,很。问题是我在collection数组中使用reduce函数进行穷人子聚合,因此需要我迭代collection和我的map函数中的值。现在我只需要弄清楚如何有效地做到这一点(即使它需要多次减少)。欢迎任何建议!


编辑缺少一个更好的地方发布它,这是我的解决方案。
首先,我创建了一个名为 mr.js文件:在我的整个集合

map = function() { 
    if (!this.fruits) return; 
    var skip_fruits = { 
     'Watermelon':1, 
     'Grapefruit':1, 
     'Tomato':1 // yes, a tomato is a fruit 
    } 
    for (var fruit in this.fruits) { 
     if (skip_fruits[fruit]) continue; 
     var obj = {}; 
     obj[this.fruits[fruit]] = 1; 
     emit(fruit, obj); 
    } 
}; 

reduce = function(key, values) { 
    var out_values = {}; 
    values.forEach(function(v) { 
     for(var k in v) { // iterate values 
      if (!out_values[k]) { 
       out_values[k] = v[k]; // init missing counter 
      } else { 
       out_values[k] += v[k]; 
      } 
     } 
    }); 
    return out_values; 
}; 

var in_coll = "fruit_repo"; 
var out_coll = "fruit_agg_so"; 
var total_docs = db[in_coll].count(); 
var page_size = 100000; 
var pages = Math.floor(total_docs/page_size); 
print('Starting incremental MR job with '+pages+' pages'); 
db[out_coll].drop(); 
for (var i=0; i<pages; i++) { 
    var skip = page_size * i; 
    print("Calculating page limits for "+skip+" - "+(skip+page_size-1)+"..."); 
    var start_date = db[in_coll].find({},{date:1}).sort({date:1}).skip(skip).limit(1)[0].date; 
    var end_date = db[in_coll].find({},{date:1}).sort({date:1}).skip(skip+page_size-1).limit(1)[0].date; 
    var mr_command = { 
     mapreduce: in_coll, 
     map: map, 
     reduce: reduce, 
     out: {reduce: out_coll}, 
     sort: {date: 1}, 
     query: { 
      date: { 
       $gte: start_date, 
       $lt: end_date 
      } 
     }, 
     limit: (page_size - 1) 
    }; 
    print("Running mapreduce for "+skip+" - "+(skip+page_size-1)); 
    db[in_coll].runCommand(mr_command); 
} 

该文件迭代,逐步映射/减少100K文档(由date排序其中必须有索引!)的时间,并减少他们成为一个单一的输出集合。它的用法如下:mongo db_name mr.js

然后,几个小时后,我收集了所有的信息。为了弄清楚哪些水果有大多数颜色,我用这个从蒙戈外壳打印出前25名:

// Show number of number of possible values per key 
var keys = []; 
for (var c = db.fruit_agg_so.find(); c.hasNext();) { 
    var obj = c.next(); 
    if (!obj.value) break; 
    var len=0;for(var l in obj.value){len++;} 
    keys.push({key: obj['_id'], value: len}); 
} 
keys.sort(function(a, b){ 
    if (a.value == b.value) return 0; 
    return (a.value > b.value)? -1: 1; 
}); 
for (var i=0; i<20; i++) { 
    print(keys[i].key+':'+keys[i].value); 
} 

这个方法的很酷的事情是,因为它是渐进的,我可以与输出工作数据,而mapreduce正在运行。

回答

8

看来你并不真的需要val_array。为什么不使用简单的哈希?试试这个:

map = function() { 
    if (!this.fruits) return; 
    for (var fruit in this.fruits) { 
     emit(fruit, 
      {this.fruits[fruit]: 1}); 
    } 
}; 

reduce = function(key, values) { 
    var colors = {}; 

    values.forEach(function(v) { 
    for(var k in v) { // iterate colors 
     if(!colors[k]) // init missing counter 
     colors[k] = 0 

     color[k] += v[k]; 
    } 
    }); 

    return colors; 
} 
+0

哇,我真的在想那个,不是我!这确实做到了我想要的。我用100,1000和100,000条记录对它进行了测试,每个集合的运行速度约为20k/sec(在这些大小上显然是线性的)。我现在正在运行完整的10M记录,我可以看到,随着映射数据的批量变大,减少它们需要相当长的时间('colors'对象必须增长):''secs_running“:488,”msg “:”m/r:(1/3)排出383999/10752083 3%“'。 – SteveK

+0

顺便说一下,我不能使用'emit(fruit,{this.fruits [fruit]:1});'因为密钥是动态生成的,所以我用这个JS hack代替:'var obj = {}; obj [this.fruits [水果]] = 1;散发(水果,obj);'。 – SteveK

+0

我会建议尝试部分工作。也就是说,批量处理文件100k(或其他),然后在最后的工作中减少它。这可能很难实现,所以如果是一次性的,我不会打扰。 :) –

0

我很遗憾地告诉你这一点,但MongoDB的MapReduce框架是非常缓慢的,并可能会继续如此的“相当长的一段”(我不会期望的改善是在他们的路线图上)。

简单地说,我的回答将是我不会做,与蒙戈-MapReduce的,而是注重与新的聚合框架的帮助下实现它: http://docs.mongodb.org/manual/reference/aggregation/

或跑在最前面的Hadoop: http://www.slideshare.net/spf13/mongodb-and-hadoop(很好,简单的介绍)

我也有问题,使用实施MapReduce功能时,MongoDB缓慢,我的结论是,即使在做最简单的任务,它甚至不会接近上述两个解决方案涉及到性能。使用新的汇总框架,您可以轻松地在商品硬件上处理> 1M文档/秒(甚至更多)。