【ElasticSearch8】springboot整合es8(三),基于scroll的深度分页
springboot elasticSearch8开发(三),基于scroll的深度分页
目录
最新的es8官方api没有提供基于scroll的深度分页查询实现方案,结合7版本的实现方式,终于在es8的版本上实现了scroll分页查询,
游标查询 scroll
Scroll 查询可以用来对 Elasticsearch 有效地执行大批量的文档查询,而又不用付出深度分页那种代价。
游标查询允许我们先做查询初始化,然后再批量地拉取结果。这有点儿像传统数据库中的 cursor 。
游标查询会取某个时间点的快照数据。查询初始化之后索引上的任何变化会被它忽略。它通过保存旧的数据文件来实现这个特性,结果就像保留初始化时的索引视图一样。
上代码:
以下内容用到了复合查询,内容可以参考上一篇:
构建复合查询条件
/**
* 构建复合查询条件
* @param dto
* @param queries
*/
private void getFieldValues(EsQueryDTO dto, List<Query> queries) {
List<FieldValue> fieldValues = new ArrayList<>();
//根据关键字列表构件复合查询的值
dto.getWords().stream().forEach(word -> fieldValues.add(FieldValue.of(word)));
//查询条件列表
queries.add(Query.of(q->q.terms(t -> t.field(dto.getField()).terms(v -> v.value(fieldValues)))));
}
第一次查询赋予查询条件和条数:
List<T> result = new ArrayList<>();
List<SortOptions> sorts = new ArrayList<>();
if (StringUtils.isNotBlank(dto.getOrder())) {
SortOptions sortOptions = SortOptions.of(s -> s.field(f -> f.field(dto.getOrder()).order(SortOrder.valueOf(dto.getOrderType()))));
sorts.add(sortOptions);
}
getFieldValues(dto, queries);
//使用scroll深度分页查询
SearchResponse<HashMap> search = client.search(s -> s
.index(dto.getIndexName()).query(q->q.bool(b->b.must(queries))).size(5000).scroll(t->t.time("5s"))
.sort(sorts),
HashMap.class);
取第一次的scrolId,继续执行后面的查询
StringBuffer scrollId = new StringBuffer(search.scrollId());
//循环查询,直到查不到数据
do {
getResult(target, result, search);
StringBuffer finalScrollId = scrollId;
search = client.scroll(s->s.scrollId(finalScrollId.toString()).scroll(t->t.time("5s")),HashMap.class);
scrollId = new StringBuffer(search.scrollId());
}while (!search.hits().hits().isEmpty());
清除scroll:
查询结束后,需要使用client.clearScroll() 方法清除 scroll。
使用scroll api就无法实现跳页查询了,因为除了第一次查询外的其它查询都要依赖上一次查询返回的scrollId,这一点需要注意。
完整代码如下:
/**
* @param
* @param client
* @param dto
* @param target
* @return java.util.List<T>
* @author liuch
* @description 根据关键字查询,基于游标查询scroll
* @date 2022/4/2 17:15
*/
public List<T> queryByFileds(ElasticsearchClient client, EsQueryDTO dto, List<Query> queries,Class<T> target) throws Exception {
List<T> result = new ArrayList<>();
List<SortOptions> sorts = new ArrayList<>();
if (StringUtils.isNotBlank(dto.getOrder())) {
SortOptions sortOptions = SortOptions.of(s -> s.field(f -> f.field(dto.getOrder()).order(SortOrder.valueOf(dto.getOrderType()))));
sorts.add(sortOptions);
}
getFieldValues(dto, queries);
//使用scroll深度分页查询
SearchResponse<HashMap> search = client.search(s -> s
.index(dto.getIndexName()).query(q->q.bool(b->b.must(queries))).size(5000).scroll(t->t.time("5s"))
.sort(sorts),
HashMap.class);
StringBuffer scrollId = new StringBuffer(search.scrollId());
List<String> strings = new ArrayList<>();
strings.add(search.scrollId());
//循环查询,直到查不到数据
do {
getResult(target, result, search);
StringBuffer finalScrollId = scrollId;
search = client.scroll(s->s.scrollId(finalScrollId.toString()).scroll(t->t.time("5s")),HashMap.class);
scrollId = new StringBuffer(search.scrollId());
strings.add(search.scrollId());
}while (!search.hits().hits().isEmpty());
//清除 scroll
client.clearScroll(c->c.scrollId(strings));
//getResult(target, result, search)
return result;
}
工具类完整代码如下:
package com.hdkj.hdiot.configure.es.utils;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.FieldValue;
import co.elastic.clients.elasticsearch._types.SortOptions;
import co.elastic.clients.elasticsearch._types.SortOrder;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch._types.query_dsl.QueryBuilders;
import co.elastic.clients.elasticsearch._types.query_dsl.RangeQuery;
import co.elastic.clients.elasticsearch.core.CountResponse;
import co.elastic.clients.elasticsearch.core.GetResponse;
import co.elastic.clients.elasticsearch.core.SearchRequest;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import co.elastic.clients.elasticsearch.core.search.Hit;
import com.alibaba.fastjson.JSON;
import com.hdkj.hdiot.configure.common.PageData;
import com.hdkj.hdiot.configure.es.bean.EsQueryDTO;
import net.sf.jsqlparser.expression.TimeValue;
import org.apache.commons.lang3.StringUtils;
import java.util.*;
/**
* @author liuch
* @title: ElasticClientUtils
* @description: TODO
* @date 2022/4/2 9:50
*/
public class ElasticClientUtils<T> {
/**
* @param
* @param client
* @param dto
* @param target
* @return java.util.List<T>
* @author liuch
* @description 根据关键字查询
* @date 2022/4/2 17:15
*/
public List<T> queryByFiled(ElasticsearchClient client, EsQueryDTO dto, Class<T> target) throws Exception {
List<T> result = new ArrayList<>();
List<SortOptions> sorts = new ArrayList<>();
if (StringUtils.isNotBlank(dto.getOrder())) {
SortOptions sortOptions = SortOptions.of(s -> s.field(f -> f.field(dto.getOrder()).order(SortOrder.valueOf(dto.getOrderType()))));
sorts.add(sortOptions);
}
SearchResponse<HashMap> search = client.search(s -> s
.index(dto.getIndexName())
.query(q -> q.term(t -> t
.field(dto.getField())
.value(dto.getWord())
)).sort(sorts),
HashMap.class);
return getResult(target, result, search);
}
/**
* @param
* @param client
* @param dto
* @param target
* @return java.util.List<T>
* @author liuch
* @description 根据关键字查询,基于游标查询scroll
* @date 2022/4/2 17:15
*/
public List<T> queryByFileds(ElasticsearchClient client, EsQueryDTO dto, List<Query> queries,Class<T> target) throws Exception {
List<T> result = new ArrayList<>();
List<SortOptions> sorts = new ArrayList<>();
if (StringUtils.isNotBlank(dto.getOrder())) {
SortOptions sortOptions = SortOptions.of(s -> s.field(f -> f.field(dto.getOrder()).order(SortOrder.valueOf(dto.getOrderType()))));
sorts.add(sortOptions);
}
getFieldValues(dto, queries);
//使用scroll深度分页查询
SearchResponse<HashMap> search = client.search(s -> s
.index(dto.getIndexName()).query(q->q.bool(b->b.must(queries))).size(5000).scroll(t->t.time("5s"))
.sort(sorts),
HashMap.class);
StringBuffer scrollId = new StringBuffer(search.scrollId());
//循环查询,直到查不到数据
do {
getResult(target, result, search);
StringBuffer finalScrollId = scrollId;
search = client.scroll(s->s.scrollId(finalScrollId.toString()).scroll(t->t.time("5s")),HashMap.class);
scrollId = new StringBuffer(search.scrollId());
}while (!search.hits().hits().isEmpty());
//getResult(target, result, search)
return result;
}
/**
* @param
* @param client
* @param dto
* @param target
* @return java.util.List<T>
* @author liuch
* @description 根据关键字分页查询
* @date 2022/4/2 17:15
*/
public List<T> queryByFiledWithPage(ElasticsearchClient client, EsQueryDTO dto, Class<T> target) throws Exception {
List<T> result = new ArrayList<>();
List<SortOptions> sorts = new ArrayList<>();
if (StringUtils.isNotBlank(dto.getOrder())) {
SortOptions sortOptions = SortOptions.of(s -> s.field(f -> f.field(dto.getOrder()).order(SortOrder.valueOf(dto.getOrderType()))));
sorts.add(sortOptions);
}
SearchResponse<HashMap> search = client.search(s -> s
.index(dto.getIndexName())
.query(q -> q.term(t -> t
.field(dto.getField())
.value(dto.getWord())
)).sort(sorts).from(dto.getFrom()).size(dto.getSize()),
HashMap.class);
return getResult(target, result, search);
}
private List<T> getResult(Class<T> target, List<T> result, SearchResponse<HashMap> search) {
List<Hit<HashMap>> hits = search.hits().hits();
Iterator<Hit<HashMap>> iterator = hits.iterator();
while (iterator.hasNext()) {
Hit<HashMap> decodeBeanHit = iterator.next();
Map<String, Object> docMap = decodeBeanHit.source();
docMap.put("id",decodeBeanHit.id());
String json = JSON.toJSONString(docMap);
T obj = JSON.parseObject(json, target);
result.add(obj);
}
return result;
}
/**
* @param
* @param client
* @param dto
* @return long
* @author liuch
* @description 根据关键字查询总条数
* @date 2022/4/2 17:15
*/
public long queryCountByFiled(ElasticsearchClient client, EsQueryDTO dto) throws Exception {
CountResponse count = client.count(c -> c.index(dto.getIndexName()).query(q -> q.term(t -> t
.field(dto.getField())
.value(dto.getWord())
)));
long total = count.count();
return total;
}
/**
* @param
* @param client
* @param dto
* @param target
* @return com.hdkj.hdiot.configure.common.PageData<T>
* @author liuch
* @description 查询分页信息
* @date 2022/4/2 17:16
*/
public PageData<T> queryPageByFiled(ElasticsearchClient client, EsQueryDTO dto, Class<T> target) throws Exception {
long total = queryCountByFiled(client, dto);
List<T> result = queryByFiledWithPage(client, dto, target);
PageData<T> pageData = new PageData<>(result, total);
return pageData;
}
/**
* @param
* @param client
* @param dto
* @param target
* @return com.hdkj.hdiot.configure.common.PageData<T>
* @author liuch
* @description 查询分页信息-复合查询
* @date 2022/4/2 17:16
*/
public PageData<T> queryPageByFileds(ElasticsearchClient client, EsQueryDTO dto, List<Query> queries,Class<T> target) throws Exception {
if(null == queries){
queries = new ArrayList<>();
}
long total = queryCountByFileds(client,queries, dto);
if(total>10000){
total = 10000;
}
List<T> result = queryByFiledsWithPage(client, dto, queries,target);
PageData<T> pageData = new PageData<>(result, total);
return pageData;
}
/**
* @param
* @param client
* @param dto
* @return long
* @author liuch
* @description 根据关键字查询总条数-复合查询
* @date 2022/4/2 17:15
*/
public long queryCountByFileds(ElasticsearchClient client, List<Query> queries, EsQueryDTO dto) throws Exception {
getFieldValues(dto, queries);
CountResponse count = client.count(c -> c.index(dto.getIndexName()).query(q->q.bool(b->b.must(queries))));
long total = count.count();
return total;
}
/**
* @param client
* @param dto
* @param target
* @return java.util.List<T>
* @author liuch
* @description 根据关键字分页查询- 复合查询
* @date 2022/4/2 17:15
*/
public List<T> queryByFiledsWithPage(ElasticsearchClient client, EsQueryDTO dto, List<Query> queries,Class<T> target) throws Exception {
List<T> result = new ArrayList<>();
List<SortOptions> sorts = new ArrayList<>();
if (StringUtils.isNotBlank(dto.getOrder())) {
SortOptions sortOptions = SortOptions.of(s -> s.field(f -> f.field(dto.getOrder()).order(SortOrder.valueOf(dto.getOrderType()))));
sorts.add(sortOptions);
}
SearchResponse<HashMap> search = client.search(s -> s
.index(dto.getIndexName())
.query(q->q.bool(b->b.must(queries)))
.sort(sorts).from(dto.getFrom()).size(dto.getSize()),
HashMap.class);
return getResult(target, result, search);
}
/**
* 构件复合查询条件
* @param dto
* @param queries
*/
private void getFieldValues(EsQueryDTO dto, List<Query> queries) {
List<FieldValue> fieldValues = new ArrayList<>();
//根据关键字列表构件复合查询的值
dto.getWords().stream().forEach(word -> fieldValues.add(FieldValue.of(word)));
//查询条件列表
queries.add(Query.of(q->q.terms(t -> t.field(dto.getField()).terms(v -> v.value(fieldValues)))));
}
/**
* @param
* @param client
* @param dto
* @param target
* @return java.lang.Object
* @author liuch
* @description 根据文档id查询
* @date 2022/4/2 17:16
*/
public Object queryByDocumentId(ElasticsearchClient client, EsQueryDTO dto, Class<T> target) throws Exception {
GetResponse<HashMap> getResponse = client.get(s -> s
.index(dto.getIndexName()).id(dto.getWord()),
HashMap.class);
getResponse.source();
Map<String, Object> docMap = getResponse.source();
String json = JSON.toJSONString(docMap);
T obj = JSON.parseObject(json, target);
return obj;
}
}
前面的文章实现了基于es8的简单查询和复合查询,希望能帮助到各位:
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/qq_24473507/article/details/126525835
————————————————
版权声明:本文为CSDN博主「大师兄小灰灰」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qq_24473507/article/details/126525835
更多推荐
所有评论(0)