Java操作ElasticSearch客户端实现高级查询
java连接ElasticSearch8.4.1客户端;实现各种聚合查询操作
文章目录
1 Java创建es客户端
1.1 引入ElasticSearch客户端api
本次使用ES客户端8.4.1版本,springboot 2.1.7.RELEASE 版本
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>8.4.3</version>
<exclusions>
<exclusion>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
</exclusion>
<exclusion>
<groupId>jakarta.json</groupId>
<artifactId>jakarta.json-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>8.4.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.4</version>
</dependency>
<dependency>
<groupId>jakarta.json</groupId>
<artifactId>jakarta.json-api</artifactId>
<version>2.0.1</version>
</dependency>
1.2 创建连接
// 创建low-level客户端连接
RestClient restClient = RestClient.builder(
new HttpHost("localhost", 9200)).build();
// 基于Jackson创建传输对象
ElasticsearchTransport transport = new RestClientTransport(
restClient, new JacksonJsonpMapper());
// 创建API客户端
ElasticsearchClient esClient = new ElasticsearchClient(transport);
1.3 身份验证
配置基本身份验证可以通过提供通过其构建器构建的时来完成。该接口有一个方法,该方法接收 org.apache.http.impl.nio.client.HttpAsyncClientBuilder 的实例作为参数,并具有相同的返回类型。可以修改 http 客户端生成器,然后返回该生成器。在以下示例中,我们设置了需要基本身份验证的默认凭据提供程序。
HttpClientConfigCallbackRestClient
final CredentialsProvider credentialsProvider =
new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials("user", "test-user-password"));
RestClientBuilder builder = RestClient.builder(
new HttpHost("localhost", 9200))
.setHttpClientConfigCallback(new HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(
HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder
.setDefaultCredentialsProvider(credentialsProvider);
}
});
可以禁用抢占式身份验证,这意味着每个请求都将在没有授权标头的情况下发送,以查看它是否被接受,并且在收到HTTP 401响应后,它将重新发送具有基本身份验证标头的完全相同的请求。如果您希望这样做,则可以通过以下方式禁用它来执行此操作:
HttpAsyncClientBuilder
final CredentialsProvider credentialsProvider =
new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials("user", "test-user-password"));
RestClientBuilder builder = RestClient.builder(
new HttpHost("localhost", 9200))
.setHttpClientConfigCallback(new HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(
HttpAsyncClientBuilder httpClientBuilder) {
httpClientBuilder.disableAuthCaching();
return httpClientBuilder
.setDefaultCredentialsProvider(credentialsProvider);
}
});
2 基础查询
2.1 match查询
match默认是分词匹配查询,
针对类型为text
文本类型数据,部分值匹配,ok;全值匹配会出现异常,解决加字段.keyword
。
针对其他类型数据,部分值匹配结果为空;全值匹配ok。
举例:name为text类型,status为long类型
//对text文本类型分词部分匹配查询
{
"_source": [
"name",
"status"
],
"query": {
"match": {
"name": "li"
}
}
}
//结果
{
...
"_source": {
"name": "li li",
"status": 500
}
},
{
...
"_source": {
"name": "li si",
"status": 500
}
}
//对text文本类型 全值匹配查询
{
"_source": [
"name",
"status"
],
"query": {
"match": {
"name": "li si"
}
}
}
//结果异常(指不符合预期的值),改为"name.keyword": "li si" 取消分词,结果正常
{
...
"_source": {
"name": "li si",
"status": 500
}
},
{
...
"_source": {
"name": "li li",
"status": 500
}
}
javaAPI实现
SearchResponse<Map> searchResponse = client.search(s -> s.index("test-log")
//忽略不存在索引项
.ignoreUnavailable(true).ignoreThrottled(true)
.query(q -> q.match(m -> m.field("name").query("li"))),Map.class);
List<Hit<Map>> hits = searchResponse.hits().hits();
for (Hit<Map> hit : hits){
System.out.println(hit.source().toString());;
}
2.2 multi_match多字段匹配查询,同match
JavaAPI实现
SearchResponse<Map> searchResponse = client.search(s -> s.index("test-log")
.ignoreUnavailable(true).ignoreThrottled(true)
.query(q -> q.multiMatch(m -> m.fields("name","status").query("li"))),
Map.class);
2.3 term精确查询
如果是text类型,则需要加.keyword ,其他则不用加
{
"_source": [
"name",
"status"
],
"query": {
"term": {
"name.keyword": "li si"
}
}
}
JavaAPI实现
SearchResponse<Map> searchResponse = client.search(s -> s.index("test-log")
.ignoreUnavailable(true).ignoreThrottled(true)
.query(q -> q.term(t -> t.field("name.keyword").value("li"))),
Map.class);
2.4 terms多值精确查询(相当于in)
//查出结果为name='zhang san','li si'两条数据
{
"query": {
"terms": {
"name.keyword": [
"zhang san",
"li si"
]
}
}
}
javaAPI实现
//创建FieldValue集合
FieldValue fieldValue1 = FieldValue.of("zhang san");
FieldValue fieldValue2 = FieldValue.of("li si");
List<FieldValue> fieldValueList = Arrays.asList(fieldValue1,fieldValue2);
//构建TermsQuery查询语句
TermsQuery termsQuery = TermsQuery.of(t -> t.field("name.keyword").terms(t2 -> t2.value(fieldValueList)));
//执行查询
SearchResponse<Map> search = client.search(searchBuilder -> searchBuilder.index("test-log")
.ignoreUnavailable(true).ignoreThrottled(true)
.query(queryBuilder -> queryBuilder.terms(termsQuery))
, Map.class);
//获取查询结果集
List<Hit<Map>> hits = search.hits().hits();
for (Hit<Map> hit : hits){
Map source = hit.source();
System.out.println(source.toString());
}
2.4 range范围查询
{
"query": {
"range": {
"status": {
"gte":400,
"lte":500
}
}
}
}
JavaAPI实现
SearchResponse<Map> searchResponse = client.search(s -> s.index("test-log")
.ignoreUnavailable(true).ignoreThrottled(true)
.query(q -> q.range(r -> r.field("status").gte(JsonData.of(400)).lte(JsonData.of(500)))),
Map.class);
2.5 sort排序、from size 分页
//按time倒序排序,从0开始取前十条
{
"sort":{
"time":{
"order":"desc"
}
},
"from":0,
"size":10
}
javaAPI实现
SearchResponse<Map> search = client.search(searchBuilder -> searchBuilder.index(indexList)
.ignoreUnavailable(true).ignoreThrottled(true)
//对time倒序排序
.sort(sortBuilder -> sortBuilder.field(f -> f.order(SortOrder.Desc).field("time")))
.from(0)
.size(10), Map.class);
2.6 wildcard通配符查询(类比SQL中like)
{
"query":{
"wildcard":{
"name.keyword":"li*"
}
}
}
javaAPI实现
//构建WildcardQuery查询语句
//caseInsensitive()是否区分大小写,true不区分,不加默认区分
WildcardQuery wildcardQuery = WildcardQuery.of(t2 -> t2.caseInsensitive(true).field("name").value("li*"));
//执行查询
SearchResponse<Map> search = client.search(searchBuilder -> searchBuilder.index("test-log")
.ignoreUnavailable(true).ignoreThrottled(true)
.query(queryBuilder -> queryBuilder.wildcard(wildcardQuery)), Map.class);
2.7 fuzzy相似度查询
关于模糊查询,很多人会认为是fuzzy
,其实fuzzy的功能更加强大,它可以针对关键字进行相似匹配,比如:
doc1: “i will marry you because I love you”
doc2: “i will live with harry”
{
"query": {
"fuzzy": {
"name": {
"value": "harry"
"fuziness": 1
}
}
}
}
javaAPI实现
//构建FuzzyQuery查询语句
FuzzyQuery fuzzyQuery = FuzzyQuery.of(fuzzyBuilder -> fuzzyBuilder.fuzziness("1").field("name").value("harry"));
//执行查询
SearchResponse<Map> search = client.search(searchBuilder -> searchBuilder.index("test-log")
.ignoreUnavailable(true).ignoreThrottled(true)
.query(queryBuilder -> queryBuilder.fuzzy(fuzzyQuery)), Map.class);
输入value=“harry”,如果’fuziness’ = 1,则两条结果都会输出;
如果’fuziness’ = 0,则只有doc2会输出;
这里的fuziness
参数就是指允许误差,为0时就是完全匹配
关于fuzzy的介绍可以阅读 什么是fuzzy
3 复杂查询
3.1 bool组合查询
bool
用于组合多个查询条件,配合must
、must_not
、should
、filter
一起使用,完成复杂查询操作。
3.1 must
满足全部的查询条件,相当于SQL中的and
,它会计算相关度评分
举例:查询数据中’processTime’ >=10 且 ‘appCode’ = ‘testApp’ 的所有数据
{
"query": {
"bool": {
"must": [
{
"range": {
"processTime": {
"gte": 10
}
}
},
{
"term": {
"appCode.keyword": "testApp"
}
}
]
}
}
}
javaAPI实现
List<Query> mustQueryList = new ArrayList<>();
//构建term查询对象
Query query1 = Query.of(queryBuilder -> queryBuilder.term(termBuilder ->
termBuilder.field("appCode.keyword").value("testApp")));
//构建range查询对象
Query query2 = Query.of(queryBuilder -> queryBuilder.range(rangeBuilder ->
rangeBuilder.field("processTime").gte(JsonData.of(10))));
mustQueryList.add(query1);
mustQueryList.add(query2);
//执行查询
SearchResponse<Map> search = client.search(searchBuilder -> searchBuilder.index("test-log")
.ignoreUnavailable(true).ignoreThrottled(true)
.query(queryBuilder -> queryBuilder.bool(
boolBuilder -> boolBuilder.must(mustQueryList)
)), Map.class);
注意:这里只返回相关度评分最高的前十条,可以和from
size
关键字配合使用
3.2 must_not
同mus
相反,查询都不满足条件的数据。这里就不再举例,参考must
3.3 should
相当于SQL中的or
关键字,会计算相关度评分。
举例:查询数据中’processTime’ >=10 或者 ‘appCode’ = ‘testApp’ 的所有数据
{
"query": {
"bool": {
"should": [
{
"range": {
"processTime": {
"gte": 10
}
}
},
{
"term": {
"appCode.keyword": "testApp"
}
}
]
}
}
}
javaAPI实现
List<Query> shouldQueryList = new ArrayList<>();
//构建term查询对象
Query query1 = Query.of(queryBuilder -> queryBuilder.term(termBuilder ->
termBuilder.field("appCode.keyword").value("testApp")));
//构建range查询对象
Query query2 = Query.of(queryBuilder -> queryBuilder.range(rangeBuilder ->
rangeBuilder.field("processTime").gte(JsonData.of(10))));
shouldQueryList.add(query1);
shouldQueryList.add(query2);
//执行查询,并返回相关度评分最高的前100条
SearchResponse<Map> search = client.search(searchBuilder -> searchBuilder.index("api-gateway-trace-log-2022.10.14")
.ignoreUnavailable(true).ignoreThrottled(true)
.query(queryBuilder -> queryBuilder.bool(
boolBuilder -> boolBuilder.should(shouldQueryList)
))
.from(0).size(100), Map.class);
3.4 filter过滤器
filter
关键字可以将其他过滤条件关键字组合起来使用,下面举个栗子说明。
查询:‘processTime’ >=10 且 ‘gatewayCode’ = ‘GATEWAY-DEMO’ 且 (‘status’ 以 '4*'开头 或者 ‘status’ 以 ‘5*’ 开头)的数据
{
"query": {
"bool": {
"filter": [
{
"range": {
"processTime": {
"gte": 10
}
}
},
{
"term": {
"gatewayCode.keyword": "GATEWAY-DEMO"
}
},
{
"bool": {
"should": [
{
"wildcard": {
"status": "5*"
}
},
{
"wildcard": {
"status": "4*"
}
}
]
}
}
]
}
}
}
javaAPI实现
//1.创建过滤条件
List<Query> filterQuery = new ArrayList<>();
//1.1创建rangeQuery查询语句
Query rangeQ = Query.of(queryBuilder -> queryBuilder.range(rangeBuilder -> rangeBuilder.field("processTime").gte(JsonData.of(10))));
filterQuery.add(rangeQ);
//1.2创建termQuery查询语句
Query termQ = Query.of(queryBuilder -> queryBuilder.term(term -> term.field("gatewayCode.keyword").value("GATEWAY-DEMO")));
filterQuery.add(termQ);
//1.3创建模糊过滤条件集合
List<Query> shouldQuerys = new ArrayList<>();
//构建模糊查询
WildcardQuery wildcardQuery = WildcardQuery.of(t -> t.caseInsensitive(true).field("status").value("4*"));
Query wildcardQ = Query.of(t -> t.wildcard(wildcardQuery));
Query wildcardQ2 = Query.of(t -> t.wildcard(WildcardQuery.of(t2 -> t2.caseInsensitive(true).field("status").value("5*"))));
shouldQuerys.add(wildcardQ);
shouldQuerys.add(wildcardQ2);
//构建BoolQuery查询语句
BoolQuery boolQuery = BoolQuery.of(builder -> builder.should(shouldQuerys));
Query boolQ = Query.of(q -> q.bool(boolQuery));
//1.4将bool查询放入过滤查询集合中
filterQuery.add(boolQ);
//2.执行查询
SearchResponse<Map> search = client.search(searchBuilder -> searchBuilder.index("test-log")
.ignoreUnavailable(true).ignoreThrottled(true)
.query(queryBuilder -> queryBuilder.bool(
boolBuilder -> boolBuilder.should(filterQuery)
))
.from(0).size(100), Map.class);
只要掌握这几种关键字查询,再复杂的条件查询写起来也会很简单
4 聚合查询
4.1 Metric Aggregation聚合统计
Metric Aggregation:一些数学运算,可以对文档字段进行统计分析,类比 Mysql中的 min(), max(), sum() 操作。
举例:对 ‘processTime’ 求平均值
{
"size": 0,
"aggs": {
"avgProcessTime": {
"avg": {
"field": "processTime"
}
}
}
}
"size": 0
: 只返回聚合结果集,元数据不返回;
"avgProcessTime"
: 自定义当前聚合查询结果集,定义个key,后面获取时直接get(key)即可。
"avg"
: 聚合方式,表示求平均,其他如min
、max
、sum
等,直接替换即可。
4.2 date_histogram日期聚合
date_histogram
关键字实现日期聚合,比如按年、月、周、日等维度。
举例:对 ‘time’ 按小时进行聚合
{
"size": 0,
"aggs": {
"hourData": {
"date_histogram": {
"field": "time",
"format": "yyyy-MM-dd",
"calendar_interval": "1h"
}
}
}
}
javaAPI实现
SearchResponse<Map> hourSearch = client.search(s -> s.index("test-log")
.ignoreUnavailable(true).ignoreThrottled(true)
// 按日期time进行分组,calendarInterval表示按时间段(按天/按月/按时)分组,format对分组时间格式化
.aggregations("hourData", agg -> agg.dateHistogram(his -> his
.field("time").calendarInterval(CalendarInterval.Hour).format("yyyy-MM-dd HH:mm:dd")))
.size(0),
Map.class);
//获取聚合数据
Aggregate hourData = hourSearch.aggregations().get("hourData");
List<StringTermsBucket> array = hourData.sterms().buckets().array();
可以看到,CalendarInterval
这个枚举类中有各种聚合维度
public enum CalendarInterval implements JsonEnum {
Second("second", new String[]{"1s"}),
Minute("minute", new String[]{"1m"}),
Hour("hour", new String[]{"1h"}),
Day("day", new String[]{"1d"}),
Week("week", new String[]{"1w"}),
Month("month", new String[]{"1M"}),
Quarter("quarter", new String[]{"1q"}),
Year("year", new String[]{"1Y"});
4.3 terms常规聚合
举例:查询不同状态的总数量,并取前五条数据
{
"size": 0,
"aggs": {
"requestPathData": {
"terms": {
"field": "status",
"size": 5
}
}
}
}
javaAPI实现
SearchResponse<Map> searchResponse = client.search(s -> s.index("test-log")
.ignoreUnavailable(true).ignoreThrottled(true)
// 对status分组取每个状态的数量,结果默认是数量倒序,size(5)取前两条数据
.aggregations("statusCount", agg -> agg.terms(t -> t
.field("status").size(5))),
Map.class);
//解析结果集
Aggregate statusCount = searchResponse.aggregations().get("statusCount");
List<LongTermsBucket> buckets = ((LongTermsAggregate) statusCount._get()).buckets().array();
for(LongTermsBucket bucket : buckets){
System.out.println(bucket.toString());
}
4.4 嵌套聚合查询
举例:先获取 ‘type’ = ‘Response’ 的数据,再按 ‘time’ 进行时间聚合,再对 ‘processTime’ 求每个聚合时间段的平均值
{
"query": {
"term": {
"type.keyword": "Response"
}
},
"size": 0,
"aggs": {
"dateAgg": {
"date_histogram": {
"field": "time",
"format": "yyyy-MM-dd",
"calendar_interval": "1h"
},
"aggs": {
"avg_salary": {
"avg": {
"field": "processTime"
}
}
}
}
}
}
javaAPI实现
SearchResponse<Map> hourSearch = client.search(s -> s.index("test-log")
.ignoreUnavailable(true).ignoreThrottled(true)
.query(q -> q.term(t -> t.field("type.keyword").value("Response")))
// 按日期time进行分组,calendarInterval表示按时间段(按天/按月/按时)分组,format对分组时间格式化
.aggregations("hourData", agg -> agg.dateHistogram(his -> his
.field("time").calendarInterval(CalendarInterval.Hour).format("yyyy-MM-dd HH:mm:dd"))
// 求平均值
.aggregations("avgData", aggr -> aggr.avg(avg -> avg.field("processTime"))))
.size(0),
Map.class);
Aggregate hourData = hourSearch.aggregations().get("hourData");
List<DateHistogramBucket> bucketList = ((DateHistogramAggregate) hourData._get()).buckets().array();
for(DateHistogramBucket dateBucket : bucketList){
Aggregate avgData = dateBucket.aggregations().get("avgData");
System.out.println(avgData.avg().value());
}
4.5 多字段联合聚合查询
举例:对字段’requestMethod’、‘requestPath’、'targetApp’三个字段进行聚合查询,获取总条数
DSL查询
{
"size": 0,
"aggs": {
"resultData": {
"terms": {
"script": {
"inline": "doc['requestMethod.keyword'].value +'-split-'+ doc['requestPath.keyword'].value +'-split-'+ doc['targetApp.keyword'].value"
}
}
}
}
}
javaAPI实现
//构建Script对象
Script script = Script.of(s -> s.inline(i -> i.source("doc['requestMethod.keyword'].value +'-split-'+ doc['requestPath.keyword'].value +'-split-'+ doc['targetApp.keyword'].value")));
//执行查询
SearchResponse<Map> search = client.search(s -> s.index("api-gateway-trace-log-2022.10.26")
.ignoreUnavailable(true).ignoreThrottled(true)
.aggregations("resultData", agg -> agg.terms(
t -> t.script(script))
)
, Map.class);
//解析数据,打印
Aggregate resultData = search.aggregations().get("resultData");
if(null != resultData){
List<StringTermsBucket> array = resultData.sterms().buckets().array();
for (StringTermsBucket bucket : array){
System.out.println(bucket.toString());
}
}
//结果
StringTermsBucket: {"doc_count":11,"key":"GET-split-/get-split-ccc_test"}
StringTermsBucket: {"doc_count":3,"key":"GET-split-/security/helloworld-split-gaoyang"}
4.6 终极聚合查询
举例:先聚合查询,再对聚合结果排序,再取前五数据
{
"size": 0,
"aggs": {
"requestPathData": {
"terms": {
"field": "requestPath.keyword"
},
"aggs": {
"avgProcess": {
"avg": {
"field": "processTime"
}
},
"process_bucket_sort": {
"bucket_sort": {
"sort": [
{
"avgProcess": {
"order": "desc"
}
} //根据平均响应时间对桶排序
],
"size": 5 //取前10个桶
}
}
}
}
}
}
参考
https://blog.csdn.net/qq_37774171/article/details/122916354
https://blog.csdn.net/qq_42402854/article/details/126692323
更多推荐
所有评论(0)