项目场景:

Elasticsearch分组后,根据分组后的数量排序,并查询分组后的组数量,通过DSL和java API两种方式


解决方案:

示例:在单据表中,查询2022-01-19当天每个人提交的单据数量,从高到低排序,并查询提交过单据的总人数。

期望实现的SQL

select Id,count(Id) as c from userbill where type='bill' and createTime >='2022-01-19' 
and createTime <= '2022-01-19' group by createUser order by c desc

1、DSL方式 :

GET /userbill/_search
{
  "from": 0,
  "size": 0,
  "query": {
    "bool": {
      "filter": [
        {
          "bool": {
            "must": [
              {
                "term": {
                  "type": {
                    "value": "bill",
                    "boost": 1
                  }
                }
              },
              {
                "range": {
                  "createTime": {
                    "from": "2022-01-19",
                    "to": "2022-01-19",
                    "include_lower": true,
                    "include_upper": true,
                    "boost": 1
                  }
                }
              }
            ],
            "adjust_pure_negative": true,
            "boost": 1
          }
        }
      ]
    }
  },
  "_source": false,
  "stored_fields": "_none_",
  "aggs": {
    "group_name": {
      "terms": {
        "field": "createUser",
        "size": 999999
      },
      "aggs": {
        "bucket_field": {
          "bucket_sort": {
            "sort": [
              {
                "_count": {
                  "order": "desc"
                }
              }
            ]
          }
        }
      }
    },
    "stats_monthly_sales": {
      "stats_bucket": {
        "buckets_path": "group_name>_count"
      }
    }
  }
}

查询结果 

key为分组用户ID,doc_count为数量,stats_monthly_sales内count为组的数量

{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 32,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "group_name" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : 115944,
          "doc_count" : 8
        },
        {
          "key" : 967145,
          "doc_count" : 7
        },
        {
          "key" : 917175,
          "doc_count" : 4
        },
        {
          "key" : 937800,
          "doc_count" : 3
        },
        {
          "key" : 888831,
          "doc_count" : 2
        },
        {
          "key" : 963198,
          "doc_count" : 2
        },
        {
          "key" : 88896565,
          "doc_count" : 2
        },
        {
          "key" : 381480,
          "doc_count" : 1
        },
        {
          "key" : 918555,
          "doc_count" : 1
        },
        {
          "key" : 1002454,
          "doc_count" : 1
        },
        {
          "key" : 88895739,
          "doc_count" : 1
        }
      ]
    },
    "stats_monthly_sales" : {
      "count" : 11,
      "min" : 1.0,
      "max" : 8.0,
      "avg" : 2.909090909090909,
      "sum" : 32.0
    }
  }
}

 2、java API方式

查询后解析结果,封装到List集合中

public List<StatisticsData> test(){
    
    SearchSourceBuilder builder = new SearchSourceBuilder();
    BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
    BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();

    boolQueryBuilder.must(QueryBuilders.termQuery("type", "bill"));
    boolQueryBuilder.must(QueryBuilders.rangeQuery("createTime").gte("2022-01-19").lte("2022-01-19"));
    
    queryBuilder.filter(boolQueryBuilder);
    builder.query(queryBuilder);
    
    TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders
            .terms("group_name")
            .field("createUser")
            .size(999999);
    builder.aggregation(new StatsBucketPipelineAggregationBuilder("stats_bucket", "group_name>_count"));
    builder.aggregation(termsAggregationBuilder);
    
    builder.from(0);
    builder.size(0);
    builder.fetchSource(false);
        
    SearchRequest request = new SearchRequest(ElasticsearchConstant.EMSUSERBILLPROCESS_INDEX);
    request.source(builder);
    
    SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
    ParsedTerms parsedTerms = (ParsedTerms) response.getAggregations().asMap().get("group_name");
    List<Terms.Bucket> buckets = (List<Terms.Bucket>) parsedTerms.getBuckets();
    List<StatisticsData> li = buckets.stream().map(u -> {
        StatisticsData s = new StatisticsData();
        s.setKey(String.valueOf(u.getKey()));
        s.setDoc_count(String.valueOf(u.getDocCount()));
        return s;
    }).collect(Collectors.toList());
    
    return li;
    
}

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐