这篇我们介绍一下ES的聚合功能(aggregation)。聚合是把索引数据可视化处理成可读有用数据的主要工具。聚合由bucket桶和metrics度量两部分组成。
所谓bucket就是SQL的GROUPBY,如下:
GET /cartxns/_search
{
"size" : 2,
"aggs": {
"color": {
"terms": {"field": "color.keyword"}
}
}
}
...
"aggregations": {
"color": {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets": [
{
"key" : "red",
"doc_count" : 4},
{
"key" : "blue",
"doc_count" : 2},
{
"key" : "green",
"doc_count" : 2}
]
}
}
上面这个例子中是以color.keyword为bucket的。elastic4是如下表现的:
val aggTerms = search("cartxns").aggregations(
termsAgg("colors","color.keyword").includeExactValues("red","green")
).sourceInclude("color","make").size(3)
println(aggTerms.show)
val termsResult = client.execute(aggTerms).await
termsResult.result.hits.hits.foreach(m =>println(m.sourceAsMap))
termsResult.result.aggregations.terms("colors").buckets.foreach(b => println(s"${b.key},${b.docCount}"))
输出为:
POST:/cartxns/_search?StringEntity({"size":3,"_source":{"includes":["color","make"]},"aggs":{"colors":{"terms":{"field":"color.keyword","include":["red","green"]}}}},Some(application/json))
Map(color -> red, make ->honda)
Map(color -> red, make ->honda)
Map(color -> green, make ->ford)
red,4green,2
下面的avg_price是个简单的度量:
POST /cartxns/_search
{
"aggs":{
"colors":{
"terms":{"field":"color.keyword"},
"aggs":{
"avg_price":{
"avg":{"field":"price"}
}
}
}
}
}
...
"aggregations": {
"colors": {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets": [
{
"key" : "red",
"doc_count" : 4,
"avg_price": {
"value" : 32500.0}
},
{
"key" : "blue",
"doc_count" : 2,
"avg_price": {
"value" : 20000.0}
},
{
"key" : "green",
"doc_count" : 2,
"avg_price": {
"value" : 21000.0}
}
]
}
}
terms定义bucket。在terms下加上aggs-avg表示符合某个backet条件文件的平均定价avg_price。elastic4是如下表达的:
val aggTermsAvg = search("cartxns").aggregations(
termsAgg("colors","color.keyword").subAggregations(
avgAgg("avg_price","price")
)
).sourceInclude("color","make").size(3)
println(aggTermsAvg.show)
val avgResult = client.execute(aggTermsAvg).await
avgResult.result.hits.hits.foreach(m =>println(m.sourceAsMap))
avgResult.result.aggregations.terms("colors").buckets
.foreach(b => println(s"${b.key},${b.docCount},${b.avg("avg_price").value}"))
...
POST:/cartxns/_search?StringEntity({"size":3,"_source":{"includes":["color","make"]},"aggs":{"colors":{"terms":{"field":"color.keyword"},"aggs":{"avg_price":{"avg":{"field":"price"}}}}}},Some(application/json))
Map(color -> red, make ->honda)
Map(color -> red, make ->honda)
Map(color -> green, make ->ford)
red,4,32500.0blue,2,20000.0green,2,21000.0
然后,我们可以在bucket里再增加bucket,如下:
POST /cartxns/_search
{
"aggs":{
"colors":{
"terms":{"field":"color.keyword"},
"aggs":{
"avg_price":{"avg":{"field":"price"}},
"makes":{"terms":{"field":"make.keyword"}}
}
}
}
}
...
"aggregations": {
"colors": {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets": [
{
"key" : "red",
"doc_count" : 4,
"makes": {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets": [
{
"key" : "honda",
"doc_count" : 3},
{
"key" : "bmw",
"doc_count" : 1}
]
},
"avg_price": {
"value" : 32500.0}
},
{
"key" : "blue",
"doc_count" : 2,
"makes": {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets": [
{
"key" : "ford",
"doc_count" : 1},
{
"key" : "toyota",
"doc_count" : 1}
]
},
"avg_price": {
"value" : 20000.0}
},
{
"key" : "green",
"doc_count" : 2,
"makes": {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets": [
{
"key" : "ford",
"doc_count" : 1},
{
"key" : "toyota",
"doc_count" : 1}
]
},
"avg_price": {
"value" : 21000.0}
}
]
}
}
elastic4示范:
val aggTAvgT = search("cartxns").aggregations(
termsAgg("colors","color.keyword").subAggregations(
avgAgg("avg_price","price"),
termsAgg("makes","make.keyword")
)
).size(3)
println(aggTAvgT.show)
val avgTTResult = client.execute(aggTAvgT).await
avgTTResult.result.hits.hits.foreach(m =>println(m.sourceAsMap))
avgTTResult.result.aggregations.terms("colors").buckets
.foreach { cb =>println(s"${cb.key},${cb.docCount},${cb.avg("avg_price").value}")
cb.terms("makes").buckets.foreach(mb => println(s"${mb.key},${mb.docCount}"))
}
...
POST:/cartxns/_search?StringEntity({"size":3,"aggs":{"colors":{"terms":{"field":"color.keyword"},"aggs":{"avg_price":{"avg":{"field":"price"}},"makes":{"terms":{"field":"make.keyword"}}}}}},Some(application/json))
Map(price -> 10000, color -> red, make -> honda, sold -> 2014-10-28)
Map(price -> 20000, color -> red, make -> honda, sold -> 2014-11-05)
Map(price -> 30000, color -> green, make -> ford, sold -> 2014-05-18)
red,4,32500.0honda,3bmw,1blue,2,20000.0ford,1toyota,1green,2,21000.0ford,1toyota,1
最后,我们再在最内层的bucket增加min,max两个metrics:
POST /cartxns/_search
{
"size":3,
"aggs":{
"colors":{
"terms":{"field":"color.keyword"},
"aggs":{
"avg_price":{"avg":{"field":"price"}},
"makes":{"terms":{"field":"make.keyword"},
"aggs":{
"max_price":{"max":{"field":"price"}},
"min_price":{"min":{"field":"price"}}
}
}
}
}
}
}
...
"aggregations": {
"colors": {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets": [
{
"key" : "red",
"doc_count" : 4,
"makes": {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets": [
{
"key" : "honda",
"doc_count" : 3,
"max_price": {
"value" : 20000.0},
"min_price": {
"value" : 10000.0}
},
{
"key" : "bmw",
"doc_count" : 1,
"max_price": {
"value" : 80000.0},
"min_price": {
"value" : 80000.0}
}
]
},
"avg_price": {
"value" : 32500.0}
},
{
"key" : "blue",
"doc_count" : 2,
"makes": {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets": [
{
"key" : "ford",
"doc_count" : 1,
"max_price": {
"value" : 25000.0},
"min_price": {
"value" : 25000.0}
},
{
"key" : "toyota",
"doc_count" : 1,
"max_price": {
"value" : 15000.0},
"min_price": {
"value" : 15000.0}
}
]
},
"avg_price": {
"value" : 20000.0}
},
{
"key" : "green",
"doc_count" : 2,
"makes": {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets": [
{
"key" : "ford",
"doc_count" : 1,
"max_price": {
"value" : 30000.0},
"min_price": {
"value" : 30000.0}
},
{
"key" : "toyota",
"doc_count" : 1,
"max_price": {
"value" : 12000.0},
"min_price": {
"value" : 12000.0}
}
]
},
"avg_price": {
"value" : 21000.0}
}
]
}
}
elastic4示范:
val aggTAvgTMM = search("cartxns").aggregations(
termsAgg("colors","color.keyword").subAggregations(
avgAgg("avg_price","price"),
termsAgg("makes","make.keyword").subAggregations(
maxAgg("max_price","price"),
minAgg("min_price","price")
)
)
).size(3)
println(aggTAvgTMM.show)
val avgTTMMResult = client.execute(aggTAvgTMM).await
avgTTMMResult.result.hits.hits.foreach(m =>println(m.sourceAsMap))
avgTTMMResult.result.aggregations.terms("colors").buckets
.foreach { cb =>println(s"${cb.key},${cb.docCount},${cb.avg("avg_price").value}")
cb.terms("makes").buckets.foreach { mb =>println(s"${mb.key},${mb.docCount},${mb.avg("min_price").value},${mb.avg("max_price").value}")
}
}
...
POST:/cartxns/_search?StringEntity({"size":3,"aggs":{"colors":{"terms":{"field":"color.keyword"},"aggs":{"avg_price":{"avg":{"field":"price"}},"makes":{"terms":{"field":"make.keyword"},"aggs":{"max_price":{"max":{"field":"price"}},"min_price":{"min":{"field":"price"}}}}}}}},Some(application/json))
Map(price -> 10000, color -> red, make -> honda, sold -> 2014-10-28)
Map(price -> 20000, color -> red, make -> honda, sold -> 2014-11-05)
Map(price -> 30000, color -> green, make -> ford, sold -> 2014-05-18)
red,4,32500.0honda,3,10000.0,20000.0bmw,1,80000.0,80000.0blue,2,20000.0ford,1,25000.0,25000.0toyota,1,15000.0,15000.0green,2,21000.0ford,1,30000.0,30000.0toyota,1,12000.0,12000.0