数据聚合

聚合(aggregations) 可以让我们极其方便的实现对数据的统计、分析、运算。例如:

  • 什么品牌的手机最受欢迎?
  • 这些手机的平均价格、最高价格、最低价格?
  • 这些手机每月的销售情况如何?

实现这些统计功能的比数据库的sql要方便的多,而且查询速度非常快,可以实现近实时搜索效果。

聚合分类

名称作用对比Mysql
分桶类型 (Bucket)满足特定条件的文档的集合类似GROUP BY语法
指标分析类型(Metric)计算最大值,最小值,平均值等类似 COUNTSUM()MAX() 等统计方法
管道分析类型(Pipeline)对聚合结果进行二次分析
矩阵分析类型(Matrix)支持对多个字段的操作并提供一个结果矩阵

Bucket聚合分析

按照Bucket的分桶策略,常见的Bucket聚合分析如下:

策略描述
Terms最简单策略,如果是text类型,则按照分词后的结果分桶
Range按照指定数值的范围来设定分桶规则
Date Range通过指定日期的范围来设定分桶规则
Histogram直方图,以固定间隔的策略来分割数据
Date Histogram针对日期的直方图或者柱状图,是时序数据分析中常用的聚合分析类型

Terms

最简单策略,如果是text类型,则按照分词后的结果分桶

语法

1
2
3
4
5
6
7
8
9
10
11
{
"size":0,//hits返回的数量,默认20条
"aggs": {
"可自定义分组名": {
"terms": {
"field": "指定字段",
"size": 3// 查询的数量
}
}
}
}

使用

1
2
3
4
5
6
7
8
9
10
11
# 根据名称分组
curl -XGET "http://elasticsearch:9200/test_db/_search" -H 'Content-Type: application/json' -d'{
"aggs": {
"my_name_group": {
"terms": {
"field": "name",
"size": 3
}
}
}
}'

Range

按照指定数值的范围来设定分桶规则

语法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
"size":0,//hits返回的数量,默认20条
"aggs": {
"可自定义分组名": {
"range": {
"field": "字段",
"ranges": [
{
"from": 10, // 大于等于
"to": 20 // 小于
},
{
"from": 20, // 大于等于
"to": 30 // 小于
},
...
]
}
}
}
}

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#统计年龄分布在 20 <= x < 30 的人数
curl -XGET "http://elasticsearch:9200/test_db/_search" -H 'Content-Type: application/json' -d'{
"aggs": {
"age_range": {
"range": {
"field": "age",
"ranges": [
{
"from": 10,
"to": 20
},
{
"from": 20,
"to": 30
},
{
"from": 30,
"to": 40
}
]
}
}
}
}'

Date Range

通过指定日期的范围来设定分桶规则

语法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{
"size":0,//hits返回的数量,默认20条
"aggs": {
"可自定义分组名": {
"date_range": {
"field": "字段",
"ranges": [
{
"from": "begin", //格式为:yyyy-MM-dd HH:mm:ss
"to": "end" // 格式为:yyyy-MM-dd HH:mm:ss
},
{
"from": "now-10d/d", //当前时间减去10天,格式为:yyyy-MM-dd HH:mm:ss
"to": "now" // 当前时间,格式为:yyyy-MM-dd HH:mm:ss
}
]
}
}
}
}

使用

Histogram

直方图,以固定间隔的策略来分割数据

语法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
"size": 0, //hits返回的数量,默认20条
"aggs": {
"NAME": {
"histogram": {
"field": "字段",
"interval": 10, // 步长
"extended_bounds": {
"min": 10,// 最小值 x >= 10
"max": 40 // 最大值 x <= 40
}
}
}
}
}

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
curl -XGET "http://elasticsearch:9200/test_db/_search" -H 'Content-Type: application/json' -d'{
"size": 0,
"aggs": {
"NAME": {
"histogram": {
"field": "age",
"interval": 10,
"extended_bounds": {
"min": 10,
"max": 40
}
}
}
}
}'

Date Histogram

针对日期的直方图或者柱状图,是时序数据分析中常用的聚合分析类型

语法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
"size": 0,
"aggs": {
"NAME": {
"date_histogram": {
"field": "字段",
"interval": "day",// 维度
"format": "yyyy-MM-dd",//格式化日期
"extended_bounds": {
"min": "最小值",
"max": "最大值"
}
}
}
}
}

interval的值有: year、month、week、day、hour、minute

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#使用日期直方图(date_histogram),统计记录
curl -XGET "http://elasticsearch:9200/test_db/_search" -H 'Content-Type: application/json' -d'{
"size": 0,
"aggs": {
"NAME": {
"date_histogram": {
"field": "create",
"interval": "day",
"format": "yyyy-MM-dd",
"extended_bounds": {
"min": "2021-03-24",
"max": "2021-03-29"
}
}
}
}
}'

Metric指标分析

计算最大值,最小值,平均值等,类似 COUNTSUM()MAX() 等统计方法。

单值分析

最大和最小

  • max:返回数值类字段的最大值
  • min: 返回数值类字段的最小值

语法

1
2
3
4
5
6
7
8
9
10
{
"size": 0,
"aggs": {
"自定义分组名": {
"max/min": {
"field": "字段"
}
}
}
}

使用

平均值、求和

  • avg: 返回数值类字段的平均值
  • sum: 返回数值类字段的总和

使用

去重

cardinality: 是指不同数值的个数,类似SQL中的distinct count概念

使用

多值分析

Stats

返回一系列数值类型的统计值,包含min,max,avg,sum和count,被分析的字段只能是数字类型

Extended Stats

stats的扩展,包含了更多的统计数据,如方差,标准差等

多值度量聚合计算从汇总文档中提取的数值的统计数据。这些值可以从文档中的特定数值字段中提取,也可以由提供的脚本生成。

扩展统计聚合是统计聚合(stats aggregation)的扩展版本,其中额外添加如sum_of_squares, variance, std_deviation and std_deviation_bounds。

假设数据由学生的考试成绩(0到100)组成:

1
2
3
4
5
6
7
8
9
{ 
"aggs" : {
"grades_stats" : {
"extended_stats" : {
"field" : "grade"
}
}
}
}

上述聚合计算所有文档的分数统计信息。聚合类型为extended_stats,设置文档的数字字段为需要统计的字段为grade,执行上面的语句将返回如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{
...
"aggregations": {
"grade_stats": {
"count": 9,
"min": 72,
"max": 99,
"avg": 86,
"sum": 774,
"sum_of_squares": 67028,
"variance": 51.55555555555556,
"std_deviation": 7.180219742846005,
"std_deviation_bounds": {
"upper": 100.36043948569201,
"lower": 71.63956051430799
}
}
}
}

聚合的名称(上面语句中的grades_stats)作为key,通过该key可以从返回的结果中检索出聚集的结果。

Standard Deviation Bounds

默认情况下,扩展统计度量将返回一个对象称为std_deviation_bounds,它提供了平均值加/减两个标准差的区间。这可以成为一个用来方式来可视化数据的方差。如果你想要一个不同的边界,例如三个标准偏差,你可以在请求中设置:

1
2
3
4
5
6
7
8
9
10
{
"aggs" : {
"grades_stats" : {
"extended_stats" : {
"field" : "grade",
"sigma" : 3 # 1
}
}
}
}

控制应显示多少标准偏差+/-平均值。

sigma可以是任何非负double类型数字,这意味着你可以要求非整数值,如1.5。值为0也是有效的,但只会返回上下限的平均值。

提示:默认情况下显示标准偏差和其边界,但它们并不总是适用于所有的数据集。您的数据必须是正常分布的度量才有意义。标准偏差背后的统计数据假设为正常分布的数据,因此如果数据偏斜向左或向右,返回的值将是误导性的。

Script

使用下面的脚本计算成绩的统计信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
{
...,
"aggs" : {
"grades_stats" : {
"extended_stats" : {
"script" : {
"inline" : "doc['grade'].value",
"lang" : "painless"
}
}
}
}
}

用下面的语法来使用脚本文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
...,
"aggs" : {
"grades_stats" : {
"extended_stats" : {
"script" : {
"file": "my_script",
"params": {
"field": "grade"
}
}
}
}
}
}

提示:可以使用id参数代替file参数来使用index的脚本。

Value Script

当考试的难度是高于学生的水平,需要校正学生的成绩,我们可以使用value script获得新的统计:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{
"aggs" : {
...
"aggs" : {
"grades_stats" : {
"extended_stats" : {
"field" : "grade",
"script" : {
"lang" : "painless",
"inline": "_value * params.correction",
"params" : {
"correction" : 1.2
}
}
}
}
}
}
}
Missing Value

missing参数定义了如何处理缺少值的文档。 默认情况下如果没有指定的字段,这种文档将被忽略,但也可以认为它们具有指定的值:

1
2
3
4
5
6
7
8
9
10
{
"aggs" : {
"grades_stats" : {
"extended_stats" : {
"field" : "grade",
"missing": 0 # 1
}
}
}
}

文档中如果没有grade这个字段,则认为该字段的值是0

top_hits

一般用于分桶后获取桶内最匹配的顶部文档列表,即详情数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 根据年龄分组,并倒序
curl -XGET "http://elasticsearch:9200/test_db/_search" -H 'Content-Type: application/json' -d'{
"size":0,
"aggs":{
"teterms_demo":{
"terms":{
"field":"age",
"size":10
},
"aggs":{
"top_hits_demo":{
"top_hits":{
"size":10,
"sort":[
{
"age":{
"order":"desc"
}
}
]
}
}
}
}
}
}'

聚合附加条件

设置过滤条件

filter: 为聚合分析设定过滤条件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 查询家庭在北京,已名字分组的记录
curl -XGET "http://elasticsearch:9200/test_db/_search" -H 'Content-Type: application/json' -d'{
"size": 0,
"aggs": {
"filter_demo": {
"filter": {
"match_phrase":{
"home":"北京"
}
},
"aggs": {
"name_group_demo": {
"terms": {
"field": "name",
"size": 10
}
}
}
}
}
}'

注意这里的数据结构嵌套。

聚合后排序

可以使用自带的关键数据进行排序,如下:

  • _count: 文档数
  • _key: 按照key值排序

RestAPI实现聚合

代码地址:https://github.com/behappy-java-study/heima-distributed-study

API语法

聚合条件与query条件同级别,因此需要使用request.source()来指定聚合条件。

聚合条件的语法:

聚合的结果也与查询结果不同,API也比较特殊。不过同样是JSON逐层解析:

业务需求

需求:搜索页面的品牌、城市等信息不应该是在页面写死,而是通过聚合索引库中的酒店数据得来的:

分析:

目前,页面的城市列表、星级列表、品牌列表都是写死的,并不会随着搜索结果的变化而变化。但是用户搜索条件改变时,搜索结果会跟着变化。

例如:用户搜索“东方明珠”,那搜索的酒店肯定是在上海东方明珠附近,因此,城市只能是上海,此时城市列表中就不应该显示北京、深圳、杭州这些信息了。

也就是说,搜索结果中包含哪些城市,页面就应该列出哪些城市;搜索结果中包含哪些品牌,页面就应该列出哪些品牌。

如何得知搜索结果中包含哪些品牌?如何得知搜索结果中包含哪些城市?

使用聚合功能,利用Bucket聚合,对搜索结果中的文档基于品牌分组、基于城市分组,就能得知包含哪些品牌、哪些城市了。

因为是对搜索结果聚合,因此聚合是限定范围的聚合,也就是说聚合的限定条件跟搜索文档的条件一致。

查看浏览器可以发现,前端其实已经发出了这样的一个请求:

请求参数与搜索文档的参数完全一致

返回值类型就是页面要展示的最终结果:

结果是一个Map结构:

  • key是字符串,城市、星级、品牌、价格
  • value是集合,例如多个城市的名称

业务实现

cn.itcast.hotel.web包的HotelController中添加一个方法,遵循下面的要求:

  • 请求方式:POST
  • 请求路径:/hotel/filters
  • 请求参数:RequestParams,与搜索文档的参数一致
  • 返回值类型:Map<String, List<String>>

代码:

1
2
3
4
@PostMapping("filters")
public Map<String, List<String>> getFilters(@RequestBody RequestParams params){
return hotelService.getFilters(params);
}

这里调用了IHotelService中的getFilters方法,尚未实现。

cn.itcast.hotel.service.IHotelService中定义新方法:

1
Map<String, List<String>> filters(RequestParams params);

cn.itcast.hotel.service.impl.HotelService中实现该方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@Override
public Map<String, List<String>> filters(RequestParams params) {
try {
// 1.准备Request
SearchRequest request = new SearchRequest("hotel");
// 2.准备DSL
// 2.1.query
buildBasicQuery(params, request);
// 2.2.设置size
request.source().size(0);
// 2.3.聚合
buildAggregation(request);
// 3.发出请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 4.解析结果
Map<String, List<String>> result = new HashMap<>();
Aggregations aggregations = response.getAggregations();
// 4.1.根据品牌名称,获取品牌结果
List<String> brandList = getAggByName(aggregations, "brandAgg");
result.put("品牌", brandList);
// 4.2.根据品牌名称,获取品牌结果
List<String> cityList = getAggByName(aggregations, "cityAgg");
result.put("城市", cityList);
// 4.3.根据品牌名称,获取品牌结果
List<String> starList = getAggByName(aggregations, "starAgg");
result.put("星级", starList);

return result;
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private void buildAggregation(SearchRequest request) {
request.source().aggregation(AggregationBuilders
.terms("brandAgg")
.field("brand")
.size(100)
);
request.source().aggregation(AggregationBuilders
.terms("cityAgg")
.field("city")
.size(100)
);
request.source().aggregation(AggregationBuilders
.terms("starAgg")
.field("starName")
.size(100)
);
}

private List<String> getAggByName(Aggregations aggregations, String aggName) {
// 4.1.根据聚合名称获取聚合结果
Terms brandTerms = aggregations.get(aggName);
// 4.2.获取buckets
List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
// 4.3.遍历
List<String> brandList = new ArrayList<>();
for (Terms.Bucket bucket : buckets) {
// 4.4.获取key
String key = bucket.getKeyAsString();
brandList.add(key);
}
return brandList;
}