因为产品功能需要,需要实现很多数据的统计功能。产品经理在统计某些数据时为了不失用户体验,对于数据量可能大于几十条的数据都设计为了分页查询

好巧不巧,上面描述的这个需求的后端开发正好就落在了我的头上

刚开始没怎么注意,想着不就是从es提取数据嘛,看看官方API定能分分钟搞定,而且肯定有现成的聚合后再分页的api

产品说10天时间让开发完,我想着10天是不是瞧不起我。3天,就3天完全够了。第1天开发接口,还不能下班后在公司逗留、第2天让前端对接接口、第3天走测试。好在产品也是很给我面子,说3天就3天(此段纯属虚构,切勿模仿)

但实际情况确实 to young to simple

常见es聚合后再分页解决方式
当我翻阅了elastic search官方的文档,却没有找到关于多字段聚合后再分页的现成的搜索语句,但是看到了terms聚合说明部分这么一段话

If you want to retrieve all terms or all combinations of terms in a nested terms aggregation you should use the Composite aggregation which allows to paginate over all possible terms rather than setting a size greater than the cardinality of the field in the terms aggregation. The terms aggregation is meant to return the top terms and does not allow pagination.

跟着文档又看了下composite聚合函数的使用方法,也正如terms聚合函数介绍中的那样,它确实能实现分页。但是,它的分页不能任意跳转到某个页号,仅能采用类似scroll的方式实现分页,感觉和我所期望的和数据库中实现的那种分页还是有点不一样

Unlike the other multi-bucket aggregations, you can use the composite aggregation to paginate all buckets from a multi-level aggregation efficiently. This aggregation provides a way to stream all buckets of a specific aggregation, similar to what scroll does for documents.

但是在上面看了一个multi-bucket aggregations,随后又了解了一下multi terms的介绍,看了下这个感觉确实是我想要的,可以实现多字段聚合与分页

As most bucket aggregations the multi_term supports sub aggregations and ordering the buckets by metrics sub-aggregation

于是乎我便打算开始用multi terms来实现其中好几个需要聚合后再分页的需求了,但实现时才发现我们的es用的不是官方的7.X版本的,而是6.X版本的,6.X版本的es压根就不支持multi terms这个聚合函数。

时间紧迫毕竟牛逼是不能随便吹的,后面我便只好亲自动手了,用万能的script搭配着bucket sort来进行了实现

自定义多字段聚合并支持分页java工具类
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.cardinality.Cardinality;
import org.elasticsearch.search.aggregations.pipeline.bucketsort.BucketSortPipelineAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;

/**
 * es分页查询执行器
 *
 * @author puhaiyang
 * @date 2021/4/6 15:16
 */
public class EsAggPaginationQueryRunner {
    private final RestHighLevelClient restHighLevelClient;
    /**
     * searchRequest对象
     */
    private final SearchRequest searchRequest = new SearchRequest();
    /**
     * 不需要返回数据,size设为0
     */
    private final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(0);
    /**
     * 分组统计的字段
     */
    private String[] groupByFields;
    /**
     * 查询时的script
     */
    private Script queryScript;
    /**
     * 分隔符
     */
    private final String spiltSymbol = "@#@#@";

    private EsAggPaginationQueryRunner(RestHighLevelClient restHighLevelClient) {
        this.restHighLevelClient = restHighLevelClient;
    }

    public EsAggPaginationQueryRunner build(String esIndexName, BoolQueryBuilder boolQueryBuilder, String... groupByFields) {
        //添加查询条件
        searchSourceBuilder.query(boolQueryBuilder);
        //指定索引
        searchRequest.indices(esIndexName);
        //指定分组的字段
        this.groupByFields = groupByFields;
        //查询script初始化
        queryScript = initScript();
        return this;
    }

    public static EsAggPaginationQueryRunner client(RestHighLevelClient restHighLevelClient) {
        //建立一个新对象
        return new EsAggPaginationQueryRunner(restHighLevelClient);
    }


    public Script initScript() {
        //定义script
        StringBuilder scriptBuilder = new StringBuilder();
        for (int i = 0; i < groupByFields.length; i++) {
            if (i != 0) {
                scriptBuilder.append("+'" + spiltSymbol + "'+");
            }
            scriptBuilder.append(String.format("doc['%s'].value", groupByFields[i]));
        }
        return new Script(ScriptType.INLINE, "painless", scriptBuilder.toString(), new HashMap<>(0));
    }

    /**
     * 进行聚合
     */
    public ResultTotal getResultTotal() throws Exception {
        //定义script
        StringBuilder scriptBuilder = new StringBuilder();
        for (int i = 0; i < groupByFields.length; i++) {
            if (i != 0) {
                scriptBuilder.append(" +' '+");
            }
            scriptBuilder.append(String.format("doc['%s'].value", groupByFields[i]));
        }

        Script getTotalScript = new Script(ScriptType.INLINE, "painless", scriptBuilder.toString(), new HashMap<>(0));
        SearchSourceBuilder distinctByFieldAgg = searchSourceBuilder.aggregation(AggregationBuilders.cardinality("distinct_by_field").script(getTotalScript));
        //指定source
        searchRequest.source(distinctByFieldAgg);
        //执行查询
        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        //获取出查询结果
        long groupTotal = ((Cardinality) searchResponse.getAggregations().get("distinct_by_field")).getValue();
        long totalHits = searchResponse.getHits().totalHits;
        return new ResultTotal(groupTotal, totalHits);
    }

    public List<KeyValueDTO<List<String>, Long>> getListByPagination(int offset, int limit) throws Exception {
        List<KeyValueDTO<List<String>, Long>> resultList = new ArrayList<>();

        int size = offset + limit;
        //传入一下新的聚合函数进行聚合,使用bucket_sort实现
        TermsAggregationBuilder groupByFieldAgg = AggregationBuilders.terms("group_by_field").script(queryScript).size(size);
        //定义分页条件
        BucketSortPipelineAggregationBuilder bucketSort = new BucketSortPipelineAggregationBuilder("bucket_sort", null)
                .from(offset)
                .size(limit);
        //添加分页内容
        groupByFieldAgg.subAggregation(bucketSort);
        //指定agg
        searchSourceBuilder.aggregation(groupByFieldAgg);
        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        Terms terms = searchResponse.getAggregations().get("group_by_field");
        List<? extends Terms.Bucket> buckets = terms.getBuckets();
        for (Terms.Bucket bucketItem : buckets) {
            String[] keyArray = bucketItem.getKeyAsString().split(spiltSymbol);

            resultList.add(new KeyValueDTO<>(Arrays.asList(keyArray), bucketItem.getDocCount()));
        }
        return resultList;
    }
}


/**
 * KeyValueDTO
 *
 * @author puhaiyang
 * @date 2021/4/6 15:31
 */
public class KeyValueDTO<T, V> {
    private T key;
    private V value;

    public KeyValueDTO(T key, V value) {
        setKey(key);
        setValue(value);
    }

    /**getter and setter method**/
}

/**
 * 查询结果集-total
 *
 * @author puhaiyang
 * @date 2021/4/5 11:46
 */

public class ResultTotal {
    /**
     * 记录条数,聚合后的条数
     */
    private Long recordTotal;
    /**
     * 所有满足条件的总数
     */
    private Long sumTotal;

    public ResultTotal(Long recordTotal, Long sumTotal) {
        this.recordTotal = recordTotal;
        this.sumTotal = sumTotal;
    }
    
    /**getter and setter method**/

}

使用示例
        //指定EsAggPaginationQueryRunner的客户端,并创建一个EsAggPaginationQueryRunner出来
        EsAggPaginationQueryRunner esAggPaginationQueryRunner = EsAggPaginationQueryRunner.client(restHighLevelClient).
                build("esIndexName", new BoolQueryBuilder(), "product_price","product_name");
        //获取分页数据
        ResultTotal resultTotal = esAggPaginationQueryRunner.getResultTotal();
        System.out.println("总记录数:"+resultTotal.getSumTotal()+" 数据条数:"+resultTotal.getRecordTotal());
        if (resultTotal.getRecordTotal().intValue() > 0) {
            //有数据,进行分页查询
            List<KeyValueDTO<List<String>, Long>> pageResultList = esAggPaginationQueryRunner.getListByPagination(pageOffet, pageLimit);
        }

有了支持多字段聚合后再分页的工具类后,感觉一切都那么顺畅,噼里啪啦好几个类似的需求被我一把梭掉的感觉真不错。(感谢公司领导和产品经理将这个开发任务指派给了我)

总结
对于es中需要实现聚合后再分页的功能,如果是高版本的es可以考虑采用multi terms聚合函数来实现

如果是稍微低一点版本的es可以使用composite聚合函数来实现,使用滚动分页(类scroll)来进行分页

如果数据量不是很大,则可以考虑用传统的terms+script来实现多字段分组,并采用bucket sort实现排序

如果数据量实在太大还想实现聚合后分页查询,个人感觉得让产品改改需求了,可以来个【最后数据刷新时间】啥的,先用scroll慢慢将统计结果集放到关系型数据库中,再用sql语句直接查询关系型数据库

当然最重要的还是看具体的业务场景,在适合的情况下选择最适合自己的那个就对了
 

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐