1 Java创建es客户端

1.1 引入ElasticSearch客户端api

本次使用ES客户端8.4.1版本,springboot 2.1.7.RELEASE 版本

<dependency>
            <groupId>co.elastic.clients</groupId>
            <artifactId>elasticsearch-java</artifactId>
            <version>8.4.3</version>
            <exclusions>
                <exclusion>
                    <groupId>org.elasticsearch.client</groupId>
                    <artifactId>elasticsearch-rest-client</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>jakarta.json</groupId>
                    <artifactId>jakarta.json-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
		<dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
            <version>8.4.1</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.13.4</version>
        </dependency>
        <dependency>
            <groupId>jakarta.json</groupId>
            <artifactId>jakarta.json-api</artifactId>
            <version>2.0.1</version>
        </dependency>

1.2 创建连接

// 创建low-level客户端连接
RestClient restClient = RestClient.builder(
          new HttpHost("localhost", 9200)).build();
// 基于Jackson创建传输对象
ElasticsearchTransport transport = new RestClientTransport(
          restClient, new JacksonJsonpMapper());
// 创建API客户端
ElasticsearchClient esClient = new ElasticsearchClient(transport);

1.3 身份验证

配置基本身份验证可以通过提供通过其构建器构建的时来完成。该接口有一个方法,该方法接收 org.apache.http.impl.nio.client.HttpAsyncClientBuilder 的实例作为参数,并具有相同的返回类型。可以修改 http 客户端生成器,然后返回该生成器。在以下示例中,我们设置了需要基本身份验证的默认凭据提供程序。

HttpClientConfigCallbackRestClient

final CredentialsProvider credentialsProvider =
    new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
    new UsernamePasswordCredentials("user", "test-user-password"));

RestClientBuilder builder = RestClient.builder(
    new HttpHost("localhost", 9200))
    .setHttpClientConfigCallback(new HttpClientConfigCallback() {
        @Override
        public HttpAsyncClientBuilder customizeHttpClient(
                HttpAsyncClientBuilder httpClientBuilder) {
            return httpClientBuilder
                .setDefaultCredentialsProvider(credentialsProvider);
        }
    });

可以禁用抢占式身份验证,这意味着每个请求都将在没有授权标头的情况下发送,以查看它是否被接受,并且在收到HTTP 401响应后,它将重新发送具有基本身份验证标头的完全相同的请求。如果您希望这样做,则可以通过以下方式禁用它来执行此操作:

HttpAsyncClientBuilder

final CredentialsProvider credentialsProvider =
    new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
    new UsernamePasswordCredentials("user", "test-user-password"));

RestClientBuilder builder = RestClient.builder(
    new HttpHost("localhost", 9200))
    .setHttpClientConfigCallback(new HttpClientConfigCallback() {
        @Override
        public HttpAsyncClientBuilder customizeHttpClient(
                HttpAsyncClientBuilder httpClientBuilder) {
            httpClientBuilder.disableAuthCaching(); 
            return httpClientBuilder
                .setDefaultCredentialsProvider(credentialsProvider);
        }
    });

2 基础查询

2.1 match查询

match默认是分词匹配查询,
针对类型为text文本类型数据,部分值匹配,ok;全值匹配会出现异常,解决加字段.keyword
针对其他类型数据,部分值匹配结果为空;全值匹配ok。

举例:name为text类型,status为long类型

//对text文本类型分词部分匹配查询
{
    "_source": [
        "name",
        "status"
    ],
    "query": {
        "match": {
            "name": "li"
        }
    }
}
//结果
{											
	...
    "_source": {
        "name": "li li",
        "status": 500
    }
},
{
	...
    "_source": {
        "name": "li si",
        "status": 500
    }
}
//对text文本类型 全值匹配查询
{
    "_source": [
        "name",
        "status"
    ],
    "query": {
        "match": {
            "name": "li si"
        }
    }
}
//结果异常(指不符合预期的值),改为"name.keyword": "li si" 取消分词,结果正常
{
	...
    "_source": {
        "name": "li si",
        "status": 500
    }
},
{
	...
    "_source": {
        "name": "li li",
        "status": 500
    }
}

javaAPI实现

SearchResponse<Map> searchResponse = client.search(s -> s.index("test-log")
				//忽略不存在索引项
                .ignoreUnavailable(true).ignoreThrottled(true)
                .query(q -> q.match(m -> m.field("name").query("li"))),Map.class);

List<Hit<Map>> hits = searchResponse.hits().hits();
for (Hit<Map> hit : hits){
    System.out.println(hit.source().toString());;
}

2.2 multi_match多字段匹配查询,同match

JavaAPI实现

SearchResponse<Map> searchResponse = client.search(s -> s.index("test-log")
                            .ignoreUnavailable(true).ignoreThrottled(true)
                            .query(q -> q.multiMatch(m -> m.fields("name","status").query("li"))), 
                    Map.class);

2.3 term精确查询

如果是text类型,则需要加.keyword ,其他则不用加
{
    "_source": [
        "name",
        "status"
    ],
    "query": {
        "term": {
            "name.keyword": "li si"
        }
    }
}

JavaAPI实现

SearchResponse<Map> searchResponse = client.search(s -> s.index("test-log")
                            .ignoreUnavailable(true).ignoreThrottled(true)
                            .query(q -> q.term(t -> t.field("name.keyword").value("li"))),
                    Map.class);

2.4 terms多值精确查询(相当于in)

//查出结果为name='zhang san','li si'两条数据
{
    "query": {
        "terms": {
            "name.keyword": [
                "zhang san",
                "li si"
            ]
        }
    }
}

javaAPI实现

//创建FieldValue集合
FieldValue fieldValue1 = FieldValue.of("zhang san");
FieldValue fieldValue2 = FieldValue.of("li si");
List<FieldValue> fieldValueList = Arrays.asList(fieldValue1,fieldValue2);
//构建TermsQuery查询语句
TermsQuery termsQuery = TermsQuery.of(t -> t.field("name.keyword").terms(t2 -> t2.value(fieldValueList)));
//执行查询
SearchResponse<Map> search = client.search(searchBuilder -> searchBuilder.index("test-log")
        .ignoreUnavailable(true).ignoreThrottled(true)
        .query(queryBuilder -> queryBuilder.terms(termsQuery))
        , Map.class);
//获取查询结果集
List<Hit<Map>> hits = search.hits().hits();
for (Hit<Map> hit : hits){
    Map source = hit.source();
    System.out.println(source.toString());
}

2.4 range范围查询

{
    "query": {
        "range": {
            "status": {
                "gte":400,
                "lte":500
            }
        }
    }
}

JavaAPI实现

SearchResponse<Map> searchResponse = client.search(s -> s.index("test-log")
                .ignoreUnavailable(true).ignoreThrottled(true)
                .query(q -> q.range(r -> r.field("status").gte(JsonData.of(400)).lte(JsonData.of(500)))),
        Map.class);

2.5 sort排序、from size 分页

//按time倒序排序,从0开始取前十条
{
    "sort":{
        "time":{
            "order":"desc"
        }
    },
    "from":0,
    "size":10
}

javaAPI实现

SearchResponse<Map> search = client.search(searchBuilder -> searchBuilder.index(indexList)
                    .ignoreUnavailable(true).ignoreThrottled(true)
                    //对time倒序排序
                    .sort(sortBuilder -> sortBuilder.field(f -> f.order(SortOrder.Desc).field("time")))
                    .from(0)
                    .size(10), Map.class);

2.6 wildcard通配符查询(类比SQL中like)

{
    "query":{
        "wildcard":{
            "name.keyword":"li*"
        }
    }
}

javaAPI实现

//构建WildcardQuery查询语句
//caseInsensitive()是否区分大小写,true不区分,不加默认区分
WildcardQuery wildcardQuery = WildcardQuery.of(t2 -> t2.caseInsensitive(true).field("name").value("li*"));
//执行查询
SearchResponse<Map> search = client.search(searchBuilder -> searchBuilder.index("test-log")
        .ignoreUnavailable(true).ignoreThrottled(true)
        .query(queryBuilder -> queryBuilder.wildcard(wildcardQuery)), Map.class);

2.7 fuzzy相似度查询

关于模糊查询,很多人会认为是fuzzy,其实fuzzy的功能更加强大,它可以针对关键字进行相似匹配,比如:

doc1: “i will marry you because I love you”
doc2: “i will live with harry”

{
  "query": {
    "fuzzy": {
      "name": {
        "value": "harry"
        "fuziness": 1
      }
    }
  }
}

javaAPI实现

//构建FuzzyQuery查询语句
FuzzyQuery fuzzyQuery = FuzzyQuery.of(fuzzyBuilder -> fuzzyBuilder.fuzziness("1").field("name").value("harry"));
//执行查询
SearchResponse<Map> search = client.search(searchBuilder -> searchBuilder.index("test-log")
        .ignoreUnavailable(true).ignoreThrottled(true)
        .query(queryBuilder -> queryBuilder.fuzzy(fuzzyQuery)), Map.class);

输入value=“harry”,如果’fuziness’ = 1,则两条结果都会输出;
如果’fuziness’ = 0,则只有doc2会输出;

这里的fuziness参数就是指允许误差,为0时就是完全匹配

关于fuzzy的介绍可以阅读 什么是fuzzy

3 复杂查询

3.1 bool组合查询

bool用于组合多个查询条件,配合mustmust_notshouldfilter一起使用,完成复杂查询操作。

3.1 must

满足全部的查询条件,相当于SQL中的and,它会计算相关度评分

举例:查询数据中’processTime’ >=10 且 ‘appCode’ = ‘testApp’ 的所有数据

{
    "query": {
        "bool": {
            "must": [
                {
                    "range": {
                        "processTime": {
                            "gte": 10
                        }
                    }
                },
                {
                    "term": {
                        "appCode.keyword": "testApp"
                    }
                }
            ]
        }
    }
}

javaAPI实现

List<Query> mustQueryList = new ArrayList<>();
//构建term查询对象
Query query1 = Query.of(queryBuilder -> queryBuilder.term(termBuilder -> 
        termBuilder.field("appCode.keyword").value("testApp")));
//构建range查询对象
Query query2 = Query.of(queryBuilder -> queryBuilder.range(rangeBuilder ->
        rangeBuilder.field("processTime").gte(JsonData.of(10))));
mustQueryList.add(query1);
mustQueryList.add(query2);
//执行查询
SearchResponse<Map> search = client.search(searchBuilder -> searchBuilder.index("test-log")
        .ignoreUnavailable(true).ignoreThrottled(true)
        .query(queryBuilder -> queryBuilder.bool(
                boolBuilder -> boolBuilder.must(mustQueryList)
        )), Map.class);

注意:这里只返回相关度评分最高的前十条,可以和from size关键字配合使用

3.2 must_not

mus相反,查询都不满足条件的数据。这里就不再举例,参考must

3.3 should

相当于SQL中的or关键字,会计算相关度评分。
举例:查询数据中’processTime’ >=10 或者 ‘appCode’ = ‘testApp’ 的所有数据

{
    "query": {
        "bool": {
            "should": [
                {
                    "range": {
                        "processTime": {
                            "gte": 10
                        }
                    }
                },
                {
                    "term": {
                        "appCode.keyword": "testApp"
                    }
                }
            ]
        }
    }
}

javaAPI实现

List<Query> shouldQueryList = new ArrayList<>();
//构建term查询对象
Query query1 = Query.of(queryBuilder -> queryBuilder.term(termBuilder ->
        termBuilder.field("appCode.keyword").value("testApp")));
//构建range查询对象
Query query2 = Query.of(queryBuilder -> queryBuilder.range(rangeBuilder ->
        rangeBuilder.field("processTime").gte(JsonData.of(10))));
shouldQueryList.add(query1);
shouldQueryList.add(query2);
//执行查询,并返回相关度评分最高的前100条
SearchResponse<Map> search = client.search(searchBuilder -> searchBuilder.index("api-gateway-trace-log-2022.10.14")
        .ignoreUnavailable(true).ignoreThrottled(true)
        .query(queryBuilder -> queryBuilder.bool(
                boolBuilder -> boolBuilder.should(shouldQueryList)
        ))
        .from(0).size(100), Map.class);

3.4 filter过滤器

filter关键字可以将其他过滤条件关键字组合起来使用,下面举个栗子说明。

查询:‘processTime’ >=10 且 ‘gatewayCode’ = ‘GATEWAY-DEMO’ 且 (‘status’ 以 '4*'开头 或者 ‘status’ 以 ‘5*’ 开头)的数据

{
    "query": {
        "bool": {
            "filter": [
                {
                    "range": {
                        "processTime": {
                            "gte": 10
                        }
                    }
                },
                {
                    "term": {
                        "gatewayCode.keyword": "GATEWAY-DEMO"
                    }
                },
                {
                    "bool": {
                        "should": [
                            {
                                "wildcard": {
                                    "status": "5*"
                                }
                            },
                            {
                                "wildcard": {
                                    "status": "4*"
                                }
                            }
                        ]
                    }
                }
            ]
        }
    }
}

javaAPI实现

//1.创建过滤条件
List<Query> filterQuery = new ArrayList<>();
//1.1创建rangeQuery查询语句
Query rangeQ = Query.of(queryBuilder -> queryBuilder.range(rangeBuilder -> rangeBuilder.field("processTime").gte(JsonData.of(10))));
filterQuery.add(rangeQ);
//1.2创建termQuery查询语句
Query termQ = Query.of(queryBuilder -> queryBuilder.term(term -> term.field("gatewayCode.keyword").value("GATEWAY-DEMO")));
filterQuery.add(termQ);

//1.3创建模糊过滤条件集合
List<Query> shouldQuerys = new ArrayList<>();
//构建模糊查询
WildcardQuery wildcardQuery = WildcardQuery.of(t -> t.caseInsensitive(true).field("status").value("4*"));
Query wildcardQ = Query.of(t -> t.wildcard(wildcardQuery));

Query wildcardQ2 = Query.of(t -> t.wildcard(WildcardQuery.of(t2 -> t2.caseInsensitive(true).field("status").value("5*"))));
shouldQuerys.add(wildcardQ);
shouldQuerys.add(wildcardQ2);

//构建BoolQuery查询语句
BoolQuery boolQuery = BoolQuery.of(builder -> builder.should(shouldQuerys));
Query boolQ = Query.of(q -> q.bool(boolQuery));

//1.4将bool查询放入过滤查询集合中
filterQuery.add(boolQ);

//2.执行查询
SearchResponse<Map> search = client.search(searchBuilder -> searchBuilder.index("test-log")
        .ignoreUnavailable(true).ignoreThrottled(true)
        .query(queryBuilder -> queryBuilder.bool(
                boolBuilder -> boolBuilder.should(filterQuery)
        ))
        .from(0).size(100), Map.class);

只要掌握这几种关键字查询,再复杂的条件查询写起来也会很简单

4 聚合查询

4.1 Metric Aggregation聚合统计

Metric Aggregation:一些数学运算,可以对文档字段进行统计分析,类比 Mysql中的 min(), max(), sum() 操作。

举例:对 ‘processTime’ 求平均值

{
    "size": 0,
    "aggs": {
        "avgProcessTime": {
            "avg": {
                "field": "processTime"
            }
        }
    }
}

"size": 0 : 只返回聚合结果集,元数据不返回;
"avgProcessTime" : 自定义当前聚合查询结果集,定义个key,后面获取时直接get(key)即可。
"avg" : 聚合方式,表示求平均,其他如minmaxsum等,直接替换即可。

4.2 date_histogram日期聚合

date_histogram关键字实现日期聚合,比如按年、月、周、日等维度。

举例:对 ‘time’ 按小时进行聚合

{
    "size": 0,
    "aggs": {
        "hourData": {
            "date_histogram": {
                "field": "time",
                "format": "yyyy-MM-dd",
                "calendar_interval": "1h"
            }
        }
    }
}

javaAPI实现

SearchResponse<Map> hourSearch = client.search(s -> s.index("test-log")
               .ignoreUnavailable(true).ignoreThrottled(true)
               // 按日期time进行分组,calendarInterval表示按时间段(按天/按月/按时)分组,format对分组时间格式化
               .aggregations("hourData", agg -> agg.dateHistogram(his -> his
                       .field("time").calendarInterval(CalendarInterval.Hour).format("yyyy-MM-dd HH:mm:dd")))
               .size(0),
       Map.class);
//获取聚合数据
Aggregate hourData = hourSearch.aggregations().get("hourData");
List<StringTermsBucket> array = hourData.sterms().buckets().array();

可以看到,CalendarInterval 这个枚举类中有各种聚合维度

public enum CalendarInterval implements JsonEnum {
    Second("second", new String[]{"1s"}),
    Minute("minute", new String[]{"1m"}),
    Hour("hour", new String[]{"1h"}),
    Day("day", new String[]{"1d"}),
    Week("week", new String[]{"1w"}),
    Month("month", new String[]{"1M"}),
    Quarter("quarter", new String[]{"1q"}),
    Year("year", new String[]{"1Y"});

4.3 terms常规聚合

举例:查询不同状态的总数量,并取前五条数据

{
    "size": 0,
    "aggs": {
        "requestPathData": {
            "terms": {
                "field": "status",
                "size": 5
            }
        }
    }
}

javaAPI实现

SearchResponse<Map> searchResponse = client.search(s -> s.index("test-log")
                .ignoreUnavailable(true).ignoreThrottled(true)
                // 对status分组取每个状态的数量,结果默认是数量倒序,size(5)取前两条数据
                .aggregations("statusCount", agg -> agg.terms(t -> t
                        .field("status").size(5))),
        Map.class);
//解析结果集
Aggregate statusCount = searchResponse.aggregations().get("statusCount");
List<LongTermsBucket> buckets = ((LongTermsAggregate) statusCount._get()).buckets().array();
for(LongTermsBucket bucket : buckets){
    System.out.println(bucket.toString());
}

4.4 嵌套聚合查询

举例:先获取 ‘type’ = ‘Response’ 的数据,再按 ‘time’ 进行时间聚合,再对 ‘processTime’ 求每个聚合时间段的平均值

{
    "query": {
        "term": {
            "type.keyword": "Response"
        }
    },
    "size": 0,
    "aggs": {
        "dateAgg": {
            "date_histogram": {
                "field": "time",
                "format": "yyyy-MM-dd",
                "calendar_interval": "1h"
            },
            "aggs": {
                "avg_salary": {
                    "avg": {
                        "field": "processTime"
                    }
                }
            }
        }
    }
}

javaAPI实现

SearchResponse<Map> hourSearch = client.search(s -> s.index("test-log")
                .ignoreUnavailable(true).ignoreThrottled(true)
                .query(q -> q.term(t -> t.field("type.keyword").value("Response")))
                // 按日期time进行分组,calendarInterval表示按时间段(按天/按月/按时)分组,format对分组时间格式化
                .aggregations("hourData", agg -> agg.dateHistogram(his -> his
                        .field("time").calendarInterval(CalendarInterval.Hour).format("yyyy-MM-dd HH:mm:dd"))
                        // 求平均值
                        .aggregations("avgData", aggr -> aggr.avg(avg -> avg.field("processTime"))))
                .size(0),
        Map.class);

Aggregate hourData = hourSearch.aggregations().get("hourData");
List<DateHistogramBucket> bucketList = ((DateHistogramAggregate) hourData._get()).buckets().array();
for(DateHistogramBucket dateBucket : bucketList){
    Aggregate avgData = dateBucket.aggregations().get("avgData");
    System.out.println(avgData.avg().value());
}

4.5 多字段联合聚合查询

举例:对字段’requestMethod’、‘requestPath’、'targetApp’三个字段进行聚合查询,获取总条数
DSL查询

{
    "size": 0,
    "aggs": {
        "resultData": {
            "terms": {
                "script": {
                    "inline": "doc['requestMethod.keyword'].value +'-split-'+ doc['requestPath.keyword'].value +'-split-'+ doc['targetApp.keyword'].value"
                }
            }
        }
    }
}

javaAPI实现

//构建Script对象
Script script = Script.of(s -> s.inline(i -> i.source("doc['requestMethod.keyword'].value +'-split-'+ doc['requestPath.keyword'].value +'-split-'+ doc['targetApp.keyword'].value")));
//执行查询
SearchResponse<Map> search = client.search(s -> s.index("api-gateway-trace-log-2022.10.26")
       .ignoreUnavailable(true).ignoreThrottled(true)
       .aggregations("resultData", agg -> agg.terms(
               t -> t.script(script))
       )
, Map.class);
//解析数据,打印
Aggregate resultData = search.aggregations().get("resultData");
if(null != resultData){
   List<StringTermsBucket> array = resultData.sterms().buckets().array();
   for (StringTermsBucket bucket : array){
       System.out.println(bucket.toString());
   }
}

//结果
StringTermsBucket: {"doc_count":11,"key":"GET-split-/get-split-ccc_test"}
StringTermsBucket: {"doc_count":3,"key":"GET-split-/security/helloworld-split-gaoyang"}

4.6 终极聚合查询

举例:先聚合查询,再对聚合结果排序,再取前五数据

{
    "size": 0,
    "aggs": {
        "requestPathData": {
            "terms": {
                "field": "requestPath.keyword"
            },
            "aggs": {
                "avgProcess": {
                    "avg": {
                        "field": "processTime"
                    }
                },
                "process_bucket_sort": {
                    "bucket_sort": {
                        "sort": [
                            {
                                "avgProcess": {
                                    "order": "desc"
                                }
                            } //根据平均响应时间对桶排序
                        ],
                        "size": 5 //取前10个桶
                    }
                }
            }
        }
    }
}

参考
https://blog.csdn.net/qq_37774171/article/details/122916354
https://blog.csdn.net/qq_42402854/article/details/126692323

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐