MongoDB MapReduce实践

上回说到采用mongoDB自带的Aggregation功能来处理数据,但我们发现当数据量大时(1000w records)执行时间非常长。并且处理数据的结果文档超过16MB时就会报错,当然可以使用了$out的操作符来结果集大小的限制。这篇mongoDB系列文章有详细的用法和比较,文末总结并提出:简单固定的聚合操作使用管道,但是对于复杂的、大量数据集的聚合任务还是使用MapReduce。

MapReduce是Google提出的一个软件架构,用于大规模数据集(大于1TB)的并行运算。概念“Map(映射)”和“Reduce(归纳,mongoDB权威指南翻译为化简)”,及他们的主要思想,都是从函数式编程语言借来的,还有从矢量编程语言借来的特性。@Wiki

而MongoDB实现的MapReduce采用同样的处理范式。在map函数中,将当前文档中的值映射成key-value对并emit出去;reduce函数则接受参数并将同一key的values合并成一个文档,并等待可能的下一次。(所以我们在写reduce函数时,需要保证参数与返回值为同一格式)。

当然,mongoDB针对自身的特性还有一些其他的参数可以设置,比如设置结果的输出等,下面将例子需要使用的参数讲解一下,其他参数可以看官网文档或者这篇文章。首先是最重要的3个参数:map、reduce、out。map函数不需要参数,数据来源于各个文档,通过this.来获取文档内的数据,其函数原型如下:

function() {
    ...
    emit(key, value);
}

如前所说,当同一key有多个value时,mongoDB会调用,并且同一key可能会被调用多次。所以我们需要保证reduce函数返回的对象与map函数emit的参数一致,同样要使下列等式成立: 'reduce(key, [ C, reduce(key, [ A, B ]) ] ) == reduce( key, [ C, A, B ] )' 。reduce的函数原型如下:

function(key, values) {
    ...
    return result;
}

而out参数则是指定输出设置,我们可以替换(replace)、合并(merge)亦或是归纳(reduce),其原型为:

out: { <action>: <collectionName>
        [, db: <dbName>]
        [, sharded: <boolean> ]
        [, nonAtomic: <boolean> ] }

然后我们还需要使用的参数有:sort、query和limit。sort是指定排序方法、query是过滤数据、limit是限制处理文档数量。

下面我们接着上一篇文章的例子,统计用户的数量并记录最早的时间。其MaprReduce的写法与Aggregation的思路差不多,差别在于map传递给reduce的内容。

我们首先将Receivername、Receivermobile指定为key,其value记录为计数参数count和时间Receiver_date,注意我们需要以数组的方式传递date,这样map和reduce的中间环节——shuffle,可以将时间以数组的方式传递给reduce。具体可看如下代码:

map: function(){
    var key = { 'name': this.Receiver_name, 'mobile': this.Receiver_mobile };
    var values = { 'count': 1, 'dates': [ this.Receiver_date ] };
    emit(key, values);
}

reduce函数将同一key的values合并,并且我们需要返回一个与输入结构一致的对象。记得我们的新需求是Receiver_date的最早时间,所以我们通过使用js的Math.min在时间数组中获得最小的时间,其代码如下。

reduce: function(key, values){
    var result = { count: 0, dates: 0 };
    var dates = [];      
    values.forEach(function(value){
        result.count += value.count;
        dates = dates.concat(value.dates);
    });
    result.dates = new Date(Math.min.apply(Math, dates));
    return result;
}

我们参考Antoine Girbal的方法,依据key来排序以获得更高的运行速度。最后我们只要指定输出集合为 'mrrs' 即可。其结果怎么样呢?接下来我们跟踪mapReduce执行过程。

我们使用的测试数据是在5个分片上总共4500w的数据,其分片的分布大致均匀(其S1只有4.11%的数据,其他4个分片平均占有24%左右的数据)。首先,我们看看currentOp()的结果(其他不太重要的记录隐去)。

{
    "opid" : "s5:138453432",
    "active" : true,
    "secs_running" : 22,
    "microsecs_running" : NumberLong(22696209),
    "op" : "query",
    "ns" : "express.tmp.mr.ord_16_inc",
    "query" : {
        "$msg" : "query not recording (too large)"
    },
    "msg" : "m/r: (1/3) emit phase M/R: (1/3) Emit Progress: 373799/11172981 3%"
}, 
{
    "opid" : "s1:1638471032",
    "active" : true,
    "secs_running" : 22,
    "microsecs_running" : NumberLong(22696688),
    "op" : "query",
    "ns" : "express.ord",
    "query" : {
        "$msg" : "query not recording (too large)"
    },
    "msg" : "m/r: (1/3) emit phase M/R: (1/3) Emit Progress: 36973/1871499 1%"
}, 
{
    "opid" : "s4:137918099",
    "active" : true,
    "secs_running" : 22,
    "microsecs_running" : NumberLong(22697211),
    "op" : "query",
    "ns" : "express.ord",
    "query" : {
        "$msg" : "query not recording (too large)"
    },
    "msg" : "m/r: (1/3) emit phase M/R: (1/3) Emit Progress: 342219/10522354 3%"
}, 
{
    "opid" : "s2:334235462",
    "active" : true,
    "secs_running" : 22,
    "microsecs_running" : NumberLong(22697033),
    "op" : "query",
    "ns" : "express.tmp.mr.ord_20_inc",
    "query" : {
        "$msg" : "query not recording (too large)"
    },
    "msg" : "m/r: (1/3) emit phase M/R: (1/3) Emit Progress: 311999/10711728 2%"
}, 
{
    "opid" : "s6:139283903",
    "active" : true,
    "secs_running" : 22,
    "microsecs_running" : NumberLong(22697747),
    "op" : "query",
    "ns" : "express.tmp.mr.ord_16_inc",
    "query" : {
        "$msg" : "query not recording (too large)"
    },
    "msg" : "m/r: (1/3) emit phase M/R: (1/3) Emit Progress: 378699/11184537 3%"
}

从操作记录可以看到,每个分片都在执行emit phase,即map函数中,progress记录了完成的百分比。然后就是下面的Reduce阶段。

{
    "opid" : "s5:138777487",
    "active" : true,
    "secs_running" : 662,
    "microsecs_running" : NumberLong(662351896),
    "op" : "query",
    "ns" : "express.tmp.mr.ord_17",
    "query" : {
        "$msg" : "query not recording (too large)"
    },
    "msg" : "m/r: (3/3) final reduce to collection M/R: (3/3) Final Reduce Progress: 133058/746825 17%"
}

在分片条件下,reduce之后还有merge sort and reduce 阶段,这表明mongoDB采用合并排序来整合分片的结果,merge阶段在分片s1上执行。

{
    "opid" : "s1:1639335194",
    "active" : true,
    "secs_running" : 7,
    "microsecs_running" : NumberLong(7445347),
    "op" : "query",
    "ns" : "express.tmp.mr.ord_62",
    "query" : {
        "$msg" : "query not recording (too large)"
    },
    "waitingForLock" : false,
    "msg" : "m/r: merge sort and reduce",
}

最后我统计比较了一下Aggregation和MapReduce之间的性能比较,发现Aggregation比MapReduce快了10~20%的样子。MR的平均执行时间为1054秒,而AF只要848秒。对比代码,可能是时间处理那个地方还有可以优化的空间。


相关的分片优化实验会在下一篇文章分享。