ElasticSearch使用elasticsearchTemplate聚合查询
这两天正好做个需求,需要用到聚合查询。前几篇文章只是简单的提到过,并没有真正的运用到实际产出中,本篇结合实际代码,专项学习ES的聚合查询。1、业务背景有一张地址索引表:hisAddress与formatAddress是一对多的关系。当一条地址进来查找hisAddress,然后对formatAddress做聚合,再根据count筛选聚合中的数据。类似以下SQL:select hisAd......
·
这两天正好做个需求,需要用到聚合查询。前几篇文章只是简单的提到过,并没有真正的运用到实际产出中,本篇结合实际代码,专项学习ES的聚合查询。
1、业务背景
有一张地址索引表:
hisAddress与formatAddress是一对多的关系。
当一条地址进来查找hisAddress,然后对formatAddress做聚合,再根据count筛选聚合中的数据。
类似以下SQL:
select hisAddress,formatAddress,count(*) from addressIndex
where hisAddress = "上海市静安区静安路100号静安寺"
group by formatAddress having count(*) > 10
当然逻辑比这个稍稍复杂,需要使用嵌套聚合筛选数据。
流程如下:
2、elasticsearch的query
{
"size":0,
"query":{
"bool":{
"filter":{
"term":{
"hisAddress":"上海市上海市静安区静安路100号静安寺"
}
}
}
},
"aggs":{
"format_address":{
"terms":{
"field":"formatAddress"
},
"aggs":{
"sign_org":{
"terms":{
"field":"signOrgCode"
}
},
"having":{
"bucket_selector":{
"buckets_path":{
"formatCount":"_count"
},
"script":{
"inline": " formatCount>10"
}
}
},
"stats_sign_bulk":{
"stats_bucket":{
"buckets_path":"sign_org > _count"
}
}
}
}
}
}
返回结果:
{
"took": 19,
"timed_out": false,
"_shards": {
"total": 6,
"successful": 6,
"failed": 0
},
"hits": {
"total": 67,
"max_score": 0,
"hits": []
},
"aggregations": {
"format_address": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "上海市上海市静安区静安寺",
"doc_count": 21,
"sign_org": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "022795",
"doc_count": 21
}
]
},
"stats_sign_bulk": {
"count": 1,
"min": 21,
"max": 21,
"avg": 21,
"sum": 21
}
},
{
"key": "上海市上海市静安区静安路100号",
"doc_count": 21,
"sign_org": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "022795",
"doc_count": 21
}
]
},
"stats_sign_bulk": {
"count": 1,
"min": 21,
"max": 21,
"avg": 21,
"sum": 21
}
},
{
"key": "上海市上海市静安区肯德基",
"doc_count": 16,
"sign_org": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "01111",
"doc_count": 16
}
]
},
"stats_sign_bulk": {
"count": 1,
"min": 16,
"max": 16,
"avg": 16,
"sum": 16
}
}
]
}
}
}
3、elasticsearchTemplate的实现
String hisAddress = "上海市上海市静安区静安路100号静安寺";
List<HistoryIndexDocument> prepareList = new ArrayList<HistoryIndexDocument>();
Map<String,String> bucketMap = new HashMap<String, String>();
bucketMap.put("formatCount", "_count");
// 根据全量地址和寄派类型查询数据(此处使用filter过滤,它能缓存数据且不参与计算分值,比query速度快)
QueryBuilder queryBuilder = QueryBuilders
.boolQuery()
.filter(QueryBuilders.termQuery("hisAddress", entity.getHisAddress()))
.filter(QueryBuilders.termQuery("rangeType", entity.getRangeType()));
// 结构化地址聚合桶
TermsBuilder format_address_aggs = AggregationBuilders.terms("format_address_aggs").field("formatAddress");
// 签收网点聚合桶
TermsBuilder sign_org_aggs = AggregationBuilders.terms("sign_org_aggs").field("signOrgCode");
// 管道聚合,类似having count(*) > 10
BucketSelectorBuilder bucketSelectorBuilder = PipelineAggregatorBuilders
.having("having")
.setBucketsPathsMap(bucketMap)
.script(new Script("formatCount>10"));
// 嵌套聚合,类似在group by formatAddress的基础上再group by signOrgCode
format_address_aggs.subAggregation(sign_org_aggs);
// 嵌套聚合,筛选数量大于10的结构化地址
format_address_aggs.subAggregation(bucketSelectorBuilder);
// 嵌套聚合,筛选数量大于10的签收网点
sign_org_aggs.subAggregation(bucketSelectorBuilder);
SearchQuery searchQuery = new NativeSearchQueryBuilder()
.withIndices("my_index").withTypes("my_type")
.withQuery(queryBuilder)
.withPageable(new PageRequest(0, 1, null))
.addAggregation(format_address_aggs)
.build();
// 执行语句获取聚合结果
Aggregations aggregations = elasticsearchTemplate.query(searchQuery, new ResultsExtractor<Aggregations>() {
@Override
public Aggregations extract(SearchResponse response) {
return response.getAggregations();
}
});
// 获取聚合结果
StringTerms teamAgg = (StringTerms) aggregations.asMap().get("format_address_aggs");
List<Bucket> bucketList = teamAgg.getBuckets();
for(Bucket bucket:bucketList) {
// 结构化地址
String formatAddress = bucket.getKeyAsString();
System.out.println(formatAddress);
Aggregations signAggs = bucket.getAggregations();
StringTerms signTerms = (StringTerms) signAggs.asMap().get("sign_org_aggs");
List<Bucket> signBucketList = signTerms.getBuckets();
// 签收网点只能一个
if(signBucketList==null || signBucketList.size() >1) {
continue;
}
Bucket signBucket = signBucketList.get(0);
// 签收频次需要5次以上
if(signBucket.getDocCount() >= 5) {
// 满足条件的网点放入prepareList
HistoryIndexDocument entity = new HistoryIndexDocument();
entity.setFormatAddress(formatAddress);
entity.setSignOrgCode(signBucket.getKeyAsString());
prepareList.add(entity);
}
}
System.out.println(FastJsonUtil.toJsonString(prepareList));
4、更多java API聚合的用法
//(1)统计某个字段的数量
ValueCountBuilder vcb= AggregationBuilders.count("count_uid").field("uid");
//(2)去重统计某个字段的数量(有少量误差)
CardinalityBuilder cb= AggregationBuilders.cardinality("distinct_count_uid").field("uid");
//(3)聚合过滤
FilterAggregationBuilder fab= AggregationBuilders.filter("uid_filter").filter(QueryBuilders.queryStringQuery("uid:001"));
//(4)按某个字段分组
TermsBuilder tb= AggregationBuilders.terms("group_name").field("name");
//(5)求和
SumBuilder sumBuilder= AggregationBuilders.sum("sum_price").field("price");
//(6)求平均
AvgBuilder ab= AggregationBuilders.avg("avg_price").field("price");
//(7)求最大值
MaxBuilder mb= AggregationBuilders.max("max_price").field("price");
//(8)求最小值
MinBuilder min= AggregationBuilders.min("min_price").field("price");
//(9)按日期间隔分组
DateHistogramBuilder dhb= AggregationBuilders.dateHistogram("dh").field("date");
//(10)获取聚合里面的结果
TopHitsBuilder thb= AggregationBuilders.topHits("top_result");
//(11)嵌套的聚合
NestedBuilder nb= AggregationBuilders.nested("negsted_path").path("quests");
//(12)反转嵌套
AggregationBuilders.reverseNested("res_negsted").path("kps ");
了解更多详情,请参考官方文档:https://www.elastic.co/guide/en/elasticsearch/reference/6.5/search-aggregations.html
本文参考:https://blog.csdn.net/u010454030/article/details/63266035
更多推荐
已为社区贡献1条内容
所有评论(0)