目录

桶(Buckets)

指标(Metrics)

将两者结合起来——聚合

常见的聚合查询

聚合查询的使用

1、简单的词频统计

2、数据按时间划分

3、数据按某个字段进行聚合后,再按时间排序


为了掌握聚合,要先了解两个主要概念:

Buckets(桶):满足某个条件的文档集合。

Metrics(指标):为某个桶中的文档计算得到的统计信息。

就是这样!每个聚合只是简单地由一个或者多个桶,零个或者多个指标组合而成。可以将它粗略地转换为SQL:

SELECT COUNT(field) FROM table GROUP BY field

以上的COUNT(field)就相当于一个指标。GROUP BY field则相当于一个桶。

桶和SQL中的组(Grouping)拥有相似的概念,而指标则与COUNT(),SUM(),MAX()等相似。

桶(Buckets)

一个桶就是满足特定条件的一个文档集合,例如:

  • 一名员工要么属于男性桶,或者女性桶。
  • 长宁区属于上海市这个桶。
  • 日期2021-08-28属于八月份这个桶。

随着聚合被执行,每份文档中的值会被计算来决定它们是否匹配了桶的条件。如果匹配成功,那么该文档会被置入该桶中,同时聚合会继续执行。

桶也能够嵌套在其它桶中,能让你完成层次或者条件划分这些需求。比如,娄山关地铁站可以被放置在长宁区这个桶中,而整个长宁区则能够被放置在上海市这个桶中。

ES中有很多类型的桶,让你可以将文档通过多种方式进行划分(按日期,关键词,标签等),从根本上来说,都是根据相同的原理运作:按照条件对文档进行划分。



指标(Metrics)

分桶是达到最终目的的手段:提供了对文档进行划分的方法,从而让你能够计算需要的指标。

多数指标仅仅是简单的数学运算(比如,min,mean,max以及sum),它们使用文档中的值进行计算。

在实际应用中,指标能够让你计算例如平均价格,最高出售价格等。



将两者结合起来——聚合

一个聚合就是一些桶和指标的组合。一个聚合可以只有一个桶,或者一个指标,或者每样一个。在桶中甚至可以有多个嵌套的桶。比如,我们可以将文档按照其所属国家进行分桶,然后对每个桶计算其平均薪资(一个指标)。

因为桶是可以嵌套的,我们能够实现一个更加复杂的聚合操作:

  1. 将文档按照国家进行分桶。(桶)
  2. 然后将每个国家的桶再按照性别分桶。(桶)
  3. 然后将每个性别的桶按照年龄区间进行分桶。(桶)
  4. 最后,为每个年龄区间计算平均薪资。(指标)

常见的聚合查询

聚合查询都是由AggregationBuilders创建的,一些常见的聚合查询如下

​​​​​​(1)统计某个字段的数量

ValueCountBuilder vcb= AggregationBuilders.count("count_uid").field("uid");

(2)去重统计某个字段的数量(有少量误差)

CardinalityBuilder cb= AggregationBuilders.cardinality("distinct_count_uid").field("uid");

(3)聚合过滤

FilterAggregationBuilder fab= AggregationBuilders.filter("uid_filter").filter(QueryBuilders.queryStringQuery("uid:001"));

(4)按某个字段分组

TermsBuilder tb= AggregationBuilders.terms("group_name").field("name");

(5)求和

SumBuilder sumBuilder= AggregationBuilders.sum("sum_price").field("price");

(6)求平均

AvgBuilder ab= AggregationBuilders.avg("avg_price").field("price");

(7)求最大值

MaxBuilder mb= AggregationBuilders.max("max_price").field("price");

(8)求最小值

MinBuilder min= AggregationBuilders.min("min_price").field("price");

(9)按日期间隔分组

DateHistogramBuilder dhb= AggregationBuilders.dateHistogram("dh").field("date");

(10)获取聚合里面的结果

TopHitsBuilder thb= AggregationBuilders.topHits("top_result");

(11)嵌套的聚合

NestedBuilder nb= AggregationBuilders.nested("negsted_path").path("quests");

(12)反转嵌套

AggregationBuilders.reverseNested("res_negsted").path("kps ");

聚合查询的使用

1、简单的词频统计

public void test(){
        //构建数据筛选条件
        BoolQueryBuilder root = QueryBuilders.boolQuery();
        root.must(QueryBuilders.rangeQuery("publish_time")
                    .gte(startTime)
                    .lte(endTime));
        SearchSourceBuilder builder = new SearchSourceBuilder();
        builder.from(0).size(0)
                .timeout(new TimeValue(60, TimeUnit.SECONDS))
                .query(root);

        //构建聚合桶
        TermsAggregationBuilder termsAggregationBuilder = 
                AggregationBuilders.terms("terms_aggr")
                .field("word_cloud")
                .includeExclude(new IncludeExclude(".{2,}", null))
                .size(50);
        builder.aggregation(termsAggregationBuilder);
        //构建查询对象
        SearchRequest searchRequest = new SearchRequest(ES_INDEX_NAME);
        searchRequest.source(builder);
        //查询,获取查询结果
        SearchResponse response;
        try {
            response = ES.getHighLevelClient().search(searchRequest);
        } catch (IOException e) {
            LOGGER.warn("Search ES Error", e);
            return Collections.emptyList();
        }

        //取出查询结果
        List<Map<String, Object>> results = new ArrayList<>();
        Terms aggr = response.getAggregations().get("terms_aggr");
        for (Terms.Bucket bucket : aggr.getBuckets()) {
            String key = bucket.getKeyAsString().toLowerCase().replaceFirst("'s$", "");
             Map<String, Object> map = new HashMap<>(2);
             map.put("name", key);
             map.put("value", bucket.getDocCount());
             results.add(map);
        }
        System.out.println("results:" + results);

}

2、数据按时间划分

public void test(){
        //构建数据筛选条件,本测试仅进行时间筛选
        BoolQueryBuilder root = QueryBuilders.boolQuery();
        root.must(QueryBuilders.rangeQuery("publish_time")
                    .gte(startTime)
                    .lte(endTime));

        SearchSourceBuilder builder = new SearchSourceBuilder();
        builder.from(0).size(0)
                .timeout(new TimeValue(60, TimeUnit.SECONDS))
                .query(root);

        //构建聚合对象
        DateHistogramAggregationBuilder histogramBuilder =
                AggregationBuilders.dateHistogram("date_hist")
                .field("publish_time")
                .dateHistogramInterval(DateHistogramInterval.DAY)
                .extendedBounds(new ExtendedBounds(startTime,endTime))
                .minDocCount(0);
        builder.aggregation(histogramBuilder );
        //构建查询对象
        SearchRequest searchRequest = new SearchRequest(ES_INDEX_NAME);
        searchRequest.source(builder);
        //查询,获取查询结果
        SearchResponse response;
        try {
            response = ES.getHighLevelClient().search(searchRequest);
        } catch (IOException e) {
            LOGGER.warn("Search ES Error", e);
            return Collections.emptyList();
        }

        //取出查询结果
        List<Map<String, Object>> results = new ArrayList<>();
        Terms aggr = response.getAggregations().get("date_hist");
        for (Terms.Bucket bucket : aggr.getBuckets()) {
            String key = bucket.getKeyAsString();
             Map<String, Object> map = new HashMap<>(2);
             map.put("name", key);
             map.put("value", bucket.getDocCount());
             results.add(map);
        }
        System.out.println("results:" + results);

}

3、数据按某个字段进行聚合后,再按时间排序

//排序,倒序:false,正序:true
boolean sortASC =  false ;

public void test(){
        //构建数据筛选条件,本测试仅进行时间筛选
        BoolQueryBuilder root = QueryBuilders.boolQuery();
        root.must(QueryBuilders.rangeQuery("publish_time")
                    .gte(startTime)
                    .lte(endTime));

        SearchSourceBuilder builder = new SearchSourceBuilder();
        builder.from(0).size(0)
                .timeout(new TimeValue(60, TimeUnit.SECONDS))
                .query(root);

        
        //构建聚合的Order对象
        boolean sortASC = ValidateUtils.isEmpty(order) ? false : order.contains("asc")||order.contains("ASC");

       Terms.Order sortOrder = Terms.Order.aggregation("sort_aggs_publish_time",sortASC);
        
        //构建排序子聚合,实现排序,正序就用min函数,倒序就用max函数      
        AggregationBuilder sortAggregationBuilder = 
                sortASC?AggregationBuilders.min("sort_aggs_publish_time").field("publish_time"):
                    AggregationBuilders.max("sort_aggs_publish_time").field("publish_time");
       
        //构建聚合对象
        TermsAggregationBuilder termsAggregationBuilder =
                AggregationBuilders.terms("aggr_cluster_id")
                        .field("cluster_id")
                        .size(50)
                        .order(sortOrder)
                        .subAggregation(sortAggregationBuilder)
        ;
        builder.aggregation(termsAggregationBuilder);
        //构建查询对象
        SearchRequest searchRequest = new SearchRequest(ES_INDEX_NAME);
        searchRequest.source(builder);
        //查询,获取查询结果
        SearchResponse response;
        try {
            response = ES.getHighLevelClient().search(searchRequest);
        } catch (IOException e) {
            LOGGER.warn("Search ES Error", e);
            return Collections.emptyList();
        }

        //取出查询结果
        List<Map<String, Object>> results = new ArrayList<>();
        Terms aggr = response.getAggregations().get("date_hist");
        for (Terms.Bucket bucket : aggr.getBuckets()) {
            String key = bucket.getKeyAsString();
             Map<String, Object> map = new HashMap<>(2);
             map.put("name", key);
             map.put("value", bucket.getDocCount());
             results.add(map);
        }
        System.out.println("results:" + results);

}

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐