MongoDB Aggregation实践

内容为依据实验室项目需求来使用MongoDB聚合操作来将处理的数据存到集合中。

  1. 数据聚合

    mongoDB的数据聚合使用了聚合管道(Aggregation Pipeline)和MapReduce。MapReduce将数据分两部分进行处理。首先是map步骤,以key/vaule的形式转换输入的文档并以key值来进行分组。然后是reduce步骤,在这个阶段中以key来获取最终结果。MapReduce比较难理解,不过它的功能是强劲的。而今天我们使用的是聚合管道的方法来处理数据。

    Pipeline以一种“链式模型”来串接不同的程序或者不同的组件,让它们组成一条直线的工作流。给定一个完整的输入,经过各个组件的先后协同处理,得到唯一的最终输出。[1]

    mongoDB提供了一个聚合管道来转换和整合文档,将每一个环节称之为stage。其实聚合在SQL中也有简单应用,就是 'group by'操作。MongoDB的聚合操作提供了一种MapReduce的替代方法,其拥有固定的接口(也就是mongoDB的操作符)在使用时比较简单。

    在mongoDB shell中,我们有aggregate辅助类来执行管道操作比如,使用$group来对文档进行分组,这与上述的Group by一致。官网介绍上的一张图可以进一步解释聚合操作的流程。首先是$match stage将记录为 'A' 的status字段获得,并将不同cust_id的amount数量累加起来。

在接下来的实战中还会介绍其他的操作符,官网上有全部的操作符。

  1. 实战

    预设的实验场景是这样的:以一个假象的用户快递单为单元文档,其中有快递单号(ExpressID)、寄件人(Sendername)、寄件人电话(Sendermobile)、寄件日期(Sender_date)等(如上图)。我们可以思考一些关于用户的简单问题,比如到底一个时间段内有多少人寄/收件,其寄/收了多少快件。那么久需要规定用户的唯一性怎么判断,这里我们简单地假设用户名和电话号码为唯一用户的区分凭证。那么我们在处理数据时,就需要使用聚合操作。第一步是,过滤固定时间内的文档,这样能减少处理的文档数;第二步则是通过名字和电话号码字段来匹配目标用户并记录器出现次数;第三步则是可选的操作,比如排序、限制数量。

    首先进行第一步,我们通过$match来过滤文档,该操作将符合要求的文档进入到下一个stage。需要注意的是,$match应尽可能地在聚合操作的开始进行,这样能减少下一步操作的处理量。其详细的解释如下图。

    如我们在mongoDB shell中操作,应输入下列代码来进行收件时间范围内的过滤。

    $match: {
      'Receiver_date': {
        $gte: ISODate('2015-09-01T04:00:00Z'), 
        $lt: ISODate('2015-10-01T04:00:00Z')
      }
    }
    

    接下来继续进行分组计数的操作,$group操作符在上文中有所提及,其通过强制使用_id来匹配分组字段,并且可以结合一些累加操作符(accumulator expression)来进一步操作分组后的数据。具体的解释如下图。

    还记得我们的需求吗?需要依据姓名和号码两个字段来对文档划分独立的用户数据,并且记录数量。其代码就应该是下列这样。

    $group: {
      '_id': {
        'Receiver_name': '$Receiver_name',
        'Receiver_mobile': '$Receiver_mobile'
      },
      'count': {
        $sum: 1
      }
    }
    

    上述$group的解释(图)中提到一句,说:$group不会堆砌输出数据进行排序,所以我们需要使用$sort操作符来进行排序。

    排序依据不同的需求变化,在我们实验中需要将文档倒序排列。

    $sort: {
      "count": -1
    }
    

    最后我们将这三个stage在aggregate类中如下调用即可。

    db.user.aggregate([
      {
        $match: {
          'Receiver_date': {
            $gte: ISODate('2015-09-01T04:00:00Z'), 
            $lt: ISODate('2015-10-01T04:00:00Z')
          }
        }
      },
      {
        $group: {
          '_id': {
            'Receiver_name': '$Receiver_name',
            'Receiver_mobile': '$Receiver_mobile'
          },
          'count': {
            $sum: 1
          }
        }
      },
      {
        $sort: {
          "count": -1
        }
      }
    ], { allowDiskUse: true});
    

    最后我们需要注意的是$group操作默认限制在100mb的内存,当数据量大时(我们这样使用了150万条数据),需要设置allowDiskUse来讲操作写入临时文件。结果图如下,distinctArrar就是处理结果。


mongoDB的聚合管道还是比较简单易用的,但是针对大数据量时还是有点力不从心。在我们的实验环境中,mongoDB采用了12节点的集群,处理能力只有10万/秒。进一步的优化方法还在思考中。