聚合
聚合通常在mysql中是group by,例如统计sum等操作,MongoDB也为我们提供了聚合操作,但是实现却不一样。
group()
这次需要的数据会比较多,这次直接用js来准数据。
1 2 3 4
| for(var i = 1; i < 30; ++i ) { var count = i % 5; db.mygroup.insert({name: 'name' + i, count : count}); }
|
mongo3.4开始,这个方法已经被官方弃用了,用db.collection.aggregate()替代了,不过这里的例子仍然以db.collection.group()进行讲解。
实例-计数
我们需要按照count字段进行分组然后统计每个分组的数量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| > db.mygroup.group({key: {count:true}, initial: {totalCount:0}, reduce: function(current, aggregator){ ... aggregator.totalCount++; ... }}); [ { "count" : 1, "totalCount" : 6 }, { "count" : 2, "totalCount" : 6 }, { "count" : 3, "totalCount" : 6 }, { "count" : 4, "totalCount" : 6 }, { "count" : 0, "totalCount" : 5 } ]
|
参数解释:
- key: 需要分组的字段
- initial:每一组初始化的值
- reduce:计算
- keyf:二次计算然后产生的key值
- cond:查询条件
- finalize:完成器,在返回之前对结果进行计算
reduce的两个参数,就是分组后对数据的处理,current表示每次循环的当前对象,aggregator则每一组拥有一个共享对象,所以在这个例子里aggregator的totalCount每一组开始循环的时候都是0,最终统计出来每一组的总数。
关于这些的详细介绍,官网文档也给了非常好的介绍:https://docs.mongodb.com/manual/reference/method/db.collection.group/index.html
实例-最大值
重新准备数据:
1 2 3 4 5
| db.mygroup.drop() for(var i = 1; i < 30; ++i ) { var count = i % 5; db.mygroup.insert({name: 'name' + i, age:i, count : count}); }
|
以count作为分组,取出每一组里age最大值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| > db.mygroup.group({key: {count:true}, initial: {maxAge:-1}, reduce: function(current, aggregator){ ... if(current.age > aggregator.maxAge) { ... aggregator.maxAge = current.age ... } ... }}); [ { "count" : 1, "maxAge" : 26 }, { "count" : 2, "maxAge" : 27 }, { "count" : 3, "maxAge" : 28 }, { "count" : 4, "maxAge" : 29 }, { "count" : 0, "maxAge" : 25 } ]
|
实例-找最小值
找出年龄最小值,这里优化一下效率
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| > db.mygroup.group({key: {count:true}, initial: {minAge:0, count:0}, reduce: function(current, aggregator){ ... if(aggregator.count == 0) { ... aggregator.minAge = current.age; ... aggregator.count++; ... }else if(current.age < aggregator.minAge) { ... aggregator.minAge = current.age ... aggregator.count++; ... } ... }}); [ { "count" : 1, "minAge" : 1 }, { "count" : 1, "minAge" : 2 }, { "count" : 1, "minAge" : 3 }, { "count" : 1, "minAge" : 4 }, { "count" : 1, "minAge" : 5 } ]
|
实例4-平均值
这个例子需要使用finalize完成器来做,先用reduce来完成计数和求和的操作,然后用finalize完成器来求平均值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| > db.mygroup.group({key: {count:true}, initial: {minAge:0, count:0, totalAge:0,totalCount:0}, reduce: function(current, aggregator){ ... if(aggregator.count == 0) { ... aggregator.minAge = current.age; ... aggregator.count++ ... }else if(current.age < aggregator.minAge) { ... aggregator.minAge = current.age ... aggregator.count++ ... } ... aggregator.totalCount++; ... aggregator.totalAge += current.age; ... }, finalize: function(aggregator) { ... aggregator.avgAge = aggregator.totalAge / aggregator.totalCount; ... }}); [ { "count" : 1, "minAge" : 1, "totalAge" : 81, "totalCount" : 6, "avgAge" : 13.5 }, { "count" : 1, "minAge" : 2, "totalAge" : 87, "totalCount" : 6, "avgAge" : 14.5 }, { "count" : 1, "minAge" : 3, "totalAge" : 93, "totalCount" : 6, "avgAge" : 15.5 }, { "count" : 1, "minAge" : 4, "totalAge" : 99, "totalCount" : 6, "avgAge" : 16.5 }, { "count" : 1, "minAge" : 5, "totalAge" : 75, "totalCount" : 5, "avgAge" : 15 } ]
|
mapReduce
mapReduce主要分为两个阶段,mapReduce在mongodb中可以在分片的环境中运行,而group则不行。
- map阶段
处理数据,例如分组,转换等
- reduce阶段
根据map的输出计算数据,得到想要的结果
https://docs.mongodb.com/manual/reference/command/mapReduce/index.html
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| db.runCommand( { mapReduce: <collection>, map: <function>, reduce: <function>, finalize: <function>, out: <output>, query: <document>, sort: <document>, limit: <number>, scope: <document>, jsMode: <boolean>, verbose: <boolean>, bypassDocumentValidation: <boolean>, collation: <document> } )
|
For those keys that have multiple values, MongoDB applies the reduce phase, which collects and condenses the aggregated data.
map的结果只有一个值的话,是不会运行reduce的。
实例1-求长度
数据准备:
1 2 3 4
| for(var i = 1; i < 10; ++i ) { var count = i % 3; db.student.insert({name: 'name' + i, age:i, count : count}); }
|
现在用MapReduce来统计每一组数据长度。
map必须调用emit函数。
1 2 3 4 5 6 7 8 9 10 11 12
| > db.getCollection('student').mapReduce(function(){emit(this.count, 1)}, function(key, values){return values.length;}, {out:"length"}) { "result" : "length", "timeMillis" : 58, "counts" : { "input" : 9, "emit" : 9, "reduce" : 3, "output" : 3 }, "ok" : 1 }
|
注意out指定的是一个collection,MapReduce会把结果生产到当前db的out属性指定的集合里。
1 2 3 4
| > db.length.find(); { "_id" : 0, "value" : 3 } { "_id" : 1, "value" : 3 } { "_id" : 2, "value" : 3 }
|
实例2-求和
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| > db.getCollection('student').mapReduce(function(){emit(this.count, this.age)}, function(key, values){var totalAge = 0; for (i = 0; i < values.length; i++){totalAge += values[i]} return totalAge;}, {out:"totalAge"}); { "result" : "totalAge", "timeMillis" : 53, "counts" : { "input" : 9, "emit" : 9, "reduce" : 3, "output" : 3 }, "ok" : 1 } > db.totalAge.find(); { "_id" : 0, "value" : 18 } { "_id" : 1, "value" : 12 } { "_id" : 2, "value" : 15 }
|
实例3-求最大值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| var maxFun = function(key, values){ var maxAge = 0; values.forEach(function(current) { if(current>maxAge){ maxAge = current; } }); return maxAge; };
db.getCollection('student').mapReduce(function(){emit(this.count, this.age)}, maxFun, {out:"maxAge"});
> db.getCollection('maxAge').find({}) { "_id" : 0, "value" : 9 } { "_id" : 1, "value" : 7 } { "_id" : 2, "value" : 8 } >
|
实例4-求平均数
求平均数也很简单,就是把和除以长度。
1 2 3 4 5 6
| > var avgFun = function(key, values){var totalAge = 0; for (i = 0; i < values.length; i++){totalAge += values[i]} return totalAge / values.length;}; > db.getCollection('student').mapReduce(function(){emit(this.count, this.age)}, avgFun, {out:"avgAge"}); > db.avgAge.find(); { "_id" : 0, "value" : 6 } { "_id" : 1, "value" : 4 } { "_id" : 2, "value" : 5 }
|
实例5-标签统计
数据准备:
1 2 3 4 5
| db.getCollection('article').insert({name:"article1", tags:['java','python','mongodb','ruby']}); db.getCollection('article').insert({name:"article2", tags:['perl','scala','mongodb','ruby']}); db.getCollection('article').insert({name:"article3", tags:['perl','kotlin','mongodb','ruby']}); db.getCollection('article').insert({name:"article4", tags:['perl','kotlin','mongodb','groovy']}); db.getCollection('article').insert({name:"article5", tags:['perl','kotlin','redis','groovy']});
|
每个文章都有标签,现在要统计每一个标签出现的次数。
有思路吗?这个用mongodb来做实在是太方便了,如果是传统的sql,可能还要好好想想,我们在map的时候,就用每一个tag分组,得到我们要的数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| db.article.mapReduce(function(){ this.tags.forEach(function(currrnt){ emit(currrnt, 1) }); }, function(key, values){ return values.length; }, {out:"aa"}); > db.aa.find(); { "_id" : "groovy", "value" : 2 } { "_id" : "java", "value" : 1 } { "_id" : "kotlin", "value" : 3 } { "_id" : "mongodb", "value" : 4 } { "_id" : "perl", "value" : 4 } { "_id" : "python", "value" : 1 } { "_id" : "redis", "value" : 1 } { "_id" : "ruby", "value" : 3 } { "_id" : "scala", "value" : 1 }
|
finalize使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| db.article.mapReduce(function(){ this.tags.forEach(function(currrnt){ emit(currrnt, 1) }); }, function(key, values){ return values.length; }, {out:"aa"}); > db.getCollection('aa').find({}) { "_id" : "groovy", "value" : 2 } { "_id" : "java", "value" : { "count" : 1 } } { "_id" : "kotlin", "value" : 3 } { "_id" : "mongodb", "value" : 4 } { "_id" : "perl", "value" : 4 } { "_id" : "python", "value" : { "count" : 1 } } { "_id" : "redis", "value" : { "count" : 1 } } { "_id" : "ruby", "value" : 3 } { "_id" : "scala", "value" : { "count" : 1 } }
|
为什么会出现这样的结果?有的是对象,有的是值,之前说过,如果map产出的values的结果只有一个,是不会执行reduce阶段的,这里可以用finalize来保证结果的统一。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| db.article.mapReduce(function(){ this.tags.forEach(function(currrnt){ emit(currrnt, {count:1}) }); }, function(key, values){ return values.length; }, {out:"aa",finalize: function(key,reduced){ if(reduced.count) { return reduced.count; } return reduced; }}); > db.getCollection('aa').find({}) { "_id" : "groovy", "value" : 2 } { "_id" : "java", "value" : 1 } { "_id" : "kotlin", "value" : 3 } { "_id" : "mongodb", "value" : 4 } { "_id" : "perl", "value" : 4 } { "_id" : "python", "value" : 1 } { "_id" : "redis", "value" : 1 } { "_id" : "ruby", "value" : 3 } { "_id" : "scala", "value" : 1 }
|
进阶,mapReduce过滤
现在我们只统计发布了java标签的数据,可以用query参数来过滤,也可以用map阶段来用代码过滤。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| db.article.mapReduce(function(){ var flag = false; for(i in this.tags){ if(this.tags[i] == 'java') { flag = true; break; } } if(flag) { this.tags.forEach(function(tag){ emit(tag, {count:1}); }); } }, function(key, values){ return values.length; }, {out:"aa",finalize: function(key,reduced){ if(reduced.count) { return reduced.count; } return reduced; }}); > db.getCollection('aa').find({}) { "_id" : "java", "value" : 1 } { "_id" : "mongodb", "value" : 1 } { "_id" : "python", "value" : 1 }
|
聚合框架(aggregation framework)
聚合框架是mongo聚合操作的第三种方式,它与group()解决的问题是的一样的,group()和MapReduce都是需要写函数的,所以mongodb提供了聚合框架,简化聚合操作。
注意:只有MapReduce和聚合框架能分片的环境下使用。
聚合框架是不能自定义函数的,它帮我们实现了一些函数。
https://docs.mongodb.com/manual/core/aggregation-pipeline/index.html
聚合框架为我们提供了很多操作,aggregate参数是一个pipeline,每一个按操作按顺序执行。
https://docs.mongodb.com/manual/reference/operator/aggregation/
常用的几个操作:
实例1-长度
数据准备:
1 2 3 4 5
| db.commodity.insert({category:1, price:200, name: 'name1'}); db.commodity.insert({category:1, price:300, name: 'name2'}); db.commodity.insert({category:2, price:100, name: 'name3'}); db.commodity.insert({category:2, price:500, name: 'name4'}); db.commodity.insert({category:3, price:200, name: 'name5'});
|
按照category分组并统计每一组的长度
1 2 3 4
| > db.commodity.aggregate({$group: {_id:'$category', totalCount: {$sum:1}}}); { "_id" : 3, "totalCount" : 1 } { "_id" : 2, "totalCount" : 2 } { "_id" : 1, "totalCount" : 2 }
|
实例2-求和
1 2 3 4
| > db.commodity.aggregate({$group: {_id:'$category', totalCount: {$sum:1}, totalPrice: {$sum: '$price'}}}); { "_id" : 3, "totalCount" : 1, "totalPrice" : 200 } { "_id" : 2, "totalCount" : 2, "totalPrice" : 600 } { "_id" : 1, "totalCount" : 2, "totalPrice" : 500 }
|
在没有指定排序规则的时候,aggregate是不保证排序的,需要用$sort
指定排序。
1 2 3 4
| > db.commodity.aggregate({$group: {_id:'$category', totalCount: {$sum:1}, totalPrice: {$sum: '$price'}}}, {$sort: {'totalPrice':-1}}); { "_id" : 2, "totalCount" : 2, "totalPrice" : 600 } { "_id" : 1, "totalCount" : 2, "totalPrice" : 500 } { "_id" : 3, "totalCount" : 1, "totalPrice" : 200 }
|
实例3-求平均值
1 2 3 4
| > db.commodity.aggregate({$group: {_id:'$category', totalCount: {$sum:1}, totalPrice: {$sum: '$price'}, avgPrice: {$avg:'$price'}}}, {$sort: {'totalPrice':-1}}); { "_id" : 2, "totalCount" : 2, "totalPrice" : 600, "avgPrice" : 300 } { "_id" : 1, "totalCount" : 2, "totalPrice" : 500, "avgPrice" : 250 } { "_id" : 3, "totalCount" : 1, "totalPrice" : 200, "avgPrice" : 200 }
|
实例4-标签统计
之前在演示MapReduce的时候,用MapReduce对article的tags统计tag出现的次数,这里用聚合框架来达到同样的效果,当然想到的就是之前介绍到的$unwind
操作。
1 2 3 4 5 6 7 8 9 10
| > db.article.aggregate({$unwind: '$tags'}, {$group: {_id:'$tags', total:{$sum: 1}}}); { "_id" : "redis", "total" : 1 } { "_id" : "groovy", "total" : 2 } { "_id" : "kotlin", "total" : 3 } { "_id" : "scala", "total" : 1 } { "_id" : "perl", "total" : 4 } { "_id" : "python", "total" : 1 } { "_id" : "mongodb", "total" : 4 } { "_id" : "java", "total" : 1 } { "_id" : "ruby", "total" : 3 }
|
实例5-标签过滤
找出打了java标签的文章并进行统计
1 2 3 4 5
| > db.article.aggregate({$match:{'tags' : {$in:['java']}}}, {$unwind: '$tags'}, {$group: {_id:'$tags', total:{$sum: 1}}}); { "_id" : "ruby", "total" : 1 } { "_id" : "mongodb", "total" : 1 } { "_id" : "python", "total" : 1 } { "_id" : "java", "total" : 1 }
|
实例6-各种操作
1 2 3 4 5 6 7 8
| > db.article.aggregate({$match:{'tags' : {$nin:['java']}}}, {$unwind: '$tags'}, {$group: {_id:'$tags', total:{$sum: 1}}}, {$sort:{'total':-1}}); { "_id" : "perl", "total" : 4 } { "_id" : "kotlin", "total" : 3 } { "_id" : "mongodb", "total" : 3 } { "_id" : "groovy", "total" : 2 } { "_id" : "ruby", "total" : 2 } { "_id" : "redis", "total" : 1 } { "_id" : "scala", "total" : 1 }
|
取前三:
1 2 3 4
| > db.article.aggregate({$match:{'tags' : {$nin:['java']}}}, {$unwind: '$tags'}, {$group: {_id:'$tags', total:{$sum: 1}}}, {$sort:{'total':-1}}, {$limit:3}); { "_id" : "perl", "total" : 4 } { "_id" : "mongodb", "total" : 3 } { "_id" : "kotlin", "total" : 3 }
|
取第二到到第四:
1 2 3 4
| > db.article.aggregate({$match:{'tags' : {$nin:['java']}}}, {$unwind: '$tags'}, {$group: {_id:'$tags', total:{$sum: 1}}}, {$sort:{'total':-1}},{$skip:1} ,{$limit:3}); { "_id" : "mongodb", "total" : 3 } { "_id" : "kotlin", "total" : 3 } { "_id" : "groovy", "total" : 2 }
|
只显示total:
1 2 3 4
| > db.article.aggregate({$match:{'tags' : {$nin:['java']}}}, {$unwind: '$tags'}, {$group: {_id:'$tags', total:{$sum: 1}}}, {$sort:{'total':-1}},{$skip:1} ,{$limit:3}, {$project:{_id:0}}); { "total" : 3 } { "total" : 3 } { "total" : 2 }
|
把total加100
1 2 3 4
| > db.article.aggregate({$match:{'tags' : {$nin:['java']}}}, {$unwind: '$tags'}, {$group: {_id:'$tags', total:{$sum: 1}}}, {$sort:{'total':-1}},{$skip:1} ,{$limit:3}, {$project:{_id:0,hello:{$add:['$total',100]}}}); { "hello" : 103 } { "hello" : 103 } { "hello" : 102 }
|