es实现多字段聚合后再分页-附JAVA工具类
es实现多字段聚合后再分页-附JAVA工具类
因为产品功能需要,需要实现很多数据的统计功能。产品经理在统计某些数据时为了不失用户体验,对于数据量可能大于几十条的数据都设计为了分页查询
好巧不巧,上面描述的这个需求的后端开发正好就落在了我的头上
刚开始没怎么注意,想着不就是从es提取数据嘛,看看官方API定能分分钟搞定,而且肯定有现成的聚合后再分页的api
产品说10天时间让开发完,我想着10天是不是瞧不起我。3天,就3天完全够了。第1天开发接口,还不能下班后在公司逗留、第2天让前端对接接口、第3天走测试。好在产品也是很给我面子,说3天就3天(此段纯属虚构,切勿模仿)
但实际情况确实 to young to simple
常见es聚合后再分页解决方式
当我翻阅了elastic search官方的文档,却没有找到关于多字段聚合后再分页的现成的搜索语句,但是看到了terms聚合说明部分这么一段话
If you want to retrieve all terms or all combinations of terms in a nested terms aggregation you should use the Composite aggregation which allows to paginate over all possible terms rather than setting a size greater than the cardinality of the field in the terms aggregation. The terms aggregation is meant to return the top terms and does not allow pagination.
跟着文档又看了下composite聚合函数的使用方法,也正如terms聚合函数介绍中的那样,它确实能实现分页。但是,它的分页不能任意跳转到某个页号,仅能采用类似scroll的方式实现分页,感觉和我所期望的和数据库中实现的那种分页还是有点不一样
Unlike the other multi-bucket aggregations, you can use the composite aggregation to paginate all buckets from a multi-level aggregation efficiently. This aggregation provides a way to stream all buckets of a specific aggregation, similar to what scroll does for documents.
但是在上面看了一个multi-bucket aggregations,随后又了解了一下multi terms的介绍,看了下这个感觉确实是我想要的,可以实现多字段聚合与分页
As most bucket aggregations the multi_term supports sub aggregations and ordering the buckets by metrics sub-aggregation
于是乎我便打算开始用multi terms来实现其中好几个需要聚合后再分页的需求了,但实现时才发现我们的es用的不是官方的7.X版本的,而是6.X版本的,6.X版本的es压根就不支持multi terms这个聚合函数。
时间紧迫毕竟牛逼是不能随便吹的,后面我便只好亲自动手了,用万能的script搭配着bucket sort来进行了实现
自定义多字段聚合并支持分页java工具类
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.cardinality.Cardinality;
import org.elasticsearch.search.aggregations.pipeline.bucketsort.BucketSortPipelineAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
/**
* es分页查询执行器
*
* @author puhaiyang
* @date 2021/4/6 15:16
*/
public class EsAggPaginationQueryRunner {
private final RestHighLevelClient restHighLevelClient;
/**
* searchRequest对象
*/
private final SearchRequest searchRequest = new SearchRequest();
/**
* 不需要返回数据,size设为0
*/
private final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(0);
/**
* 分组统计的字段
*/
private String[] groupByFields;
/**
* 查询时的script
*/
private Script queryScript;
/**
* 分隔符
*/
private final String spiltSymbol = "@#@#@";
private EsAggPaginationQueryRunner(RestHighLevelClient restHighLevelClient) {
this.restHighLevelClient = restHighLevelClient;
}
public EsAggPaginationQueryRunner build(String esIndexName, BoolQueryBuilder boolQueryBuilder, String... groupByFields) {
//添加查询条件
searchSourceBuilder.query(boolQueryBuilder);
//指定索引
searchRequest.indices(esIndexName);
//指定分组的字段
this.groupByFields = groupByFields;
//查询script初始化
queryScript = initScript();
return this;
}
public static EsAggPaginationQueryRunner client(RestHighLevelClient restHighLevelClient) {
//建立一个新对象
return new EsAggPaginationQueryRunner(restHighLevelClient);
}
public Script initScript() {
//定义script
StringBuilder scriptBuilder = new StringBuilder();
for (int i = 0; i < groupByFields.length; i++) {
if (i != 0) {
scriptBuilder.append("+'" + spiltSymbol + "'+");
}
scriptBuilder.append(String.format("doc['%s'].value", groupByFields[i]));
}
return new Script(ScriptType.INLINE, "painless", scriptBuilder.toString(), new HashMap<>(0));
}
/**
* 进行聚合
*/
public ResultTotal getResultTotal() throws Exception {
//定义script
StringBuilder scriptBuilder = new StringBuilder();
for (int i = 0; i < groupByFields.length; i++) {
if (i != 0) {
scriptBuilder.append(" +' '+");
}
scriptBuilder.append(String.format("doc['%s'].value", groupByFields[i]));
}
Script getTotalScript = new Script(ScriptType.INLINE, "painless", scriptBuilder.toString(), new HashMap<>(0));
SearchSourceBuilder distinctByFieldAgg = searchSourceBuilder.aggregation(AggregationBuilders.cardinality("distinct_by_field").script(getTotalScript));
//指定source
searchRequest.source(distinctByFieldAgg);
//执行查询
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
//获取出查询结果
long groupTotal = ((Cardinality) searchResponse.getAggregations().get("distinct_by_field")).getValue();
long totalHits = searchResponse.getHits().totalHits;
return new ResultTotal(groupTotal, totalHits);
}
public List<KeyValueDTO<List<String>, Long>> getListByPagination(int offset, int limit) throws Exception {
List<KeyValueDTO<List<String>, Long>> resultList = new ArrayList<>();
int size = offset + limit;
//传入一下新的聚合函数进行聚合,使用bucket_sort实现
TermsAggregationBuilder groupByFieldAgg = AggregationBuilders.terms("group_by_field").script(queryScript).size(size);
//定义分页条件
BucketSortPipelineAggregationBuilder bucketSort = new BucketSortPipelineAggregationBuilder("bucket_sort", null)
.from(offset)
.size(limit);
//添加分页内容
groupByFieldAgg.subAggregation(bucketSort);
//指定agg
searchSourceBuilder.aggregation(groupByFieldAgg);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
Terms terms = searchResponse.getAggregations().get("group_by_field");
List<? extends Terms.Bucket> buckets = terms.getBuckets();
for (Terms.Bucket bucketItem : buckets) {
String[] keyArray = bucketItem.getKeyAsString().split(spiltSymbol);
resultList.add(new KeyValueDTO<>(Arrays.asList(keyArray), bucketItem.getDocCount()));
}
return resultList;
}
}
/**
* KeyValueDTO
*
* @author puhaiyang
* @date 2021/4/6 15:31
*/
public class KeyValueDTO<T, V> {
private T key;
private V value;
public KeyValueDTO(T key, V value) {
setKey(key);
setValue(value);
}
/**getter and setter method**/
}
/**
* 查询结果集-total
*
* @author puhaiyang
* @date 2021/4/5 11:46
*/
public class ResultTotal {
/**
* 记录条数,聚合后的条数
*/
private Long recordTotal;
/**
* 所有满足条件的总数
*/
private Long sumTotal;
public ResultTotal(Long recordTotal, Long sumTotal) {
this.recordTotal = recordTotal;
this.sumTotal = sumTotal;
}
/**getter and setter method**/
}
使用示例
//指定EsAggPaginationQueryRunner的客户端,并创建一个EsAggPaginationQueryRunner出来
EsAggPaginationQueryRunner esAggPaginationQueryRunner = EsAggPaginationQueryRunner.client(restHighLevelClient).
build("esIndexName", new BoolQueryBuilder(), "product_price","product_name");
//获取分页数据
ResultTotal resultTotal = esAggPaginationQueryRunner.getResultTotal();
System.out.println("总记录数:"+resultTotal.getSumTotal()+" 数据条数:"+resultTotal.getRecordTotal());
if (resultTotal.getRecordTotal().intValue() > 0) {
//有数据,进行分页查询
List<KeyValueDTO<List<String>, Long>> pageResultList = esAggPaginationQueryRunner.getListByPagination(pageOffet, pageLimit);
}
有了支持多字段聚合后再分页的工具类后,感觉一切都那么顺畅,噼里啪啦好几个类似的需求被我一把梭掉的感觉真不错。(感谢公司领导和产品经理将这个开发任务指派给了我)
总结
对于es中需要实现聚合后再分页的功能,如果是高版本的es可以考虑采用multi terms聚合函数来实现
如果是稍微低一点版本的es可以使用composite聚合函数来实现,使用滚动分页(类scroll)来进行分页
如果数据量不是很大,则可以考虑用传统的terms+script来实现多字段分组,并采用bucket sort实现排序
如果数据量实在太大还想实现聚合后分页查询,个人感觉得让产品改改需求了,可以来个【最后数据刷新时间】啥的,先用scroll慢慢将统计结果集放到关系型数据库中,再用sql语句直接查询关系型数据库
当然最重要的还是看具体的业务场景,在适合的情况下选择最适合自己的那个就对了
更多推荐
所有评论(0)