使用场景

对es中某些字段进行分组并对某一个列聚合求平均值/最大值/等,并展示未分组的列的值,并且用求得平均值排序

实现步骤

1.注册es客户端

用账号密码连接es集群

@Configuration
@Data
public class ElasticSearchConfig {
    @Value("${elasticsearch.host}")
    private String host;
    @Value("${elasticsearch.port}")
    private int port;
    @Value("${elasticsearch.loginName}")
    private String loginName;
    @Value("${elasticsearch.password}")
    private String password;
    private RestHighLevelClient client;
   
    @Bean
    public  RestHighLevelClient client(){
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY,
                new UsernamePasswordCredentials(loginName, password));
        HttpHost[] httpHostArray = new HttpHost[1];
        httpHostArray[0] = new HttpHost(host, port);
        RestClientBuilder restClientBuilder = RestClient.builder(httpHostArray)
                .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                        httpClientBuilder.disableAuthCaching();
                        return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                    }
                });
        restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder
                .setConnectTimeout(60000)
                .setSocketTimeout(150000));
        client =new RestHighLevelClient(
                restClientBuilder
        );
        return client;
    }
}

2. RestHighLevelClient构造请求并查询

    public static List getVendorDelayAvgTimeTop() throws IOException, ParseException {
        SearchRequest searchRequest = new SearchRequest(Index_MSG);
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
        String format = sdf.format(new Date());
        Date startTime = sdf.parse(format);
        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
        boolQuery.filter(QueryBuilders.rangeQuery("fileArrivalTime").from(startTime.getTime(), true));
        sourceBuilder.query(boolQuery);
        //分组条件
        TermsAggregationBuilder teamAggregation1 = AggregationBuilders.terms("province").field("province.keyword").size(31);
        Script script = new Script("doc['sex'].value");
        TermsAggregationBuilder teamAggregation2 = AggregationBuilders.terms("keyword").field("key.keyword").order(BucketOrder.aggregation("vendorDelay", false)).size(1);
        AggregationBuilder top = AggregationBuilders.topHits("result").fetchSource(new String[]{"vendor","dataType"}, null).size(1);
        AvgAggregationBuilder vendorDelayTime = AggregationBuilders.avg("vendorDelay").field("vendorDelay");
        sourceBuilder.aggregation(teamAggregation1.subAggregation(teamAggregation2.subAggregation(top))).size(1000).sort("vendorDelay",SortOrder.fromString("DESC"));
        teamAggregation2.subAggregation(vendorDelayTime).order();
        System.out.println("sourceBuilder" + sourceBuilder);
        searchRequest.source(sourceBuilder);
        SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
        Aggregations agg = searchResponse.getAggregations();
        Terms terms = agg.get("province");
        List list = Lists.newArrayList();
        for (Terms.Bucket province : terms.getBuckets()) {
            Aggregations provinceAgg = province.getAggregations();
            Terms dir = provinceAgg.get("keyword");
            for (Terms.Bucket dirData : dir.getBuckets()) {
                Map keyTimeMap = new LinkedHashMap<>();
                dirData.getKey();
                ParsedAvg avg = dirData.getAggregations().get("vendorDelay");//延时时间
                String delayAvg = avg.getValueAsString();
                Double delayTime = new Double(delayAvg);
                log.info(province.getKeyAsString()+"  "+dirData.getKey()+"  "+delayTime/60000);
                NumberFormat numberInstance = NumberFormat.getNumberInstance();
                numberInstance.setMaximumFractionDigits(2);
                keyTimeMap.put("province",province.getKeyAsString());
                keyTimeMap.put("keyWord",dirData.getKey());
                keyTimeMap.put("vendorDelay",numberInstance.format(delayTime/60000));
                list.add(keyTimeMap);
            }
        }
        return list;
    }

3. DSL语句

GET /collect_msg*/_search
{
  "size": 1000,
  "query": {
    "bool": {
      "filter": [
        {
          "range": {
            "fileArrivalTime": {
              "from": 1626019200000,
              "to": null,
              "include_lower": true,
              "include_upper": true,
              "boost": 1
            }
          }
        }
      ],
      "adjust_pure_negative": true,
      "boost": 1
    }
  },
  "sort": [
    {
      "vendorDelay": {
        "order": "desc"
      }
    }
  ],
  "aggregations": {
    "province": {
      "terms": {
        "field": "province.keyword",
        "size": 31,
        "min_doc_count": 1,
        "shard_min_doc_count": 0,
        "show_term_doc_count_error": false,
        "order": [
          {
            "_count": "desc"
          },
          {
            "_key": "asc"
          }
        ]
      },
      "aggregations": {
        "keyword": {
          "terms": {
            "field": "key.keyword",
            "size": 5,
            "min_doc_count": 1,
            "shard_min_doc_count": 0,
            "show_term_doc_count_error": false,
            "order": [
              {
                "vendorDelay": "desc"
              },
              {
                "_key": "asc"
              }
            ]
          },
          "aggregations": {
            "result": {
              "top_hits": {
                "from": 0,
                "size": 1,
                "version": false,
                "seq_no_primary_term": false,
                "explain": false,
                "_source": {
                  "includes": [
                    "vendor",
                    "dataType"
                  ],
                  "excludes": []
                }
              }
            },
            "vendorDelay": {
              "avg": {
                "field": "vendorDelay"
              }
            }
          }
        }
      }
    }
  }
}

4. 查询示例

分组并对某列求平均值,并根据需求展示其他列信息
在这里插入图片描述

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐