目录

游标查询 scroll

构建复合查询条件

第一次查询赋予查询条件和条数:

取第一次的scrolId,继续执行后面的查询

清除scroll:

工具类完整代码如下:


最新的es8官方api没有提供基于scroll的深度分页查询实现方案,结合7版本的实现方式,终于在es8的版本上实现了scroll分页查询,

游标查询 scroll

Scroll 查询可以用来对 Elasticsearch 有效地执行大批量的文档查询,而又不用付出深度分页那种代价。

游标查询允许我们先做查询初始化,然后再批量地拉取结果。这有点儿像传统数据库中的 cursor 。

游标查询会取某个时间点的快照数据。查询初始化之后索引上的任何变化会被它忽略。它通过保存旧的数据文件来实现这个特性,结果就像保留初始化时的索引视图一样。

上代码:

以下内容用到了复合查询,内容可以参考上一篇:

springboot elasticSearch8开发(二),实现复杂检索_大师兄小灰灰的博客-CSDN博客上一篇文章发布已经过去了四个月,终于有时间补充一下了。基于上篇简单查询基础上,新增了复合查询的功能。https://blog.csdn.net/qq_24473507/article/details/126524062

构建复合查询条件

/**
     * 构建复合查询条件
     * @param dto
     * @param queries
     */
    private void getFieldValues(EsQueryDTO dto, List<Query> queries) {
        List<FieldValue> fieldValues = new ArrayList<>();
        //根据关键字列表构件复合查询的值
        dto.getWords().stream().forEach(word -> fieldValues.add(FieldValue.of(word)));
        //查询条件列表
        queries.add(Query.of(q->q.terms(t -> t.field(dto.getField()).terms(v -> v.value(fieldValues)))));
    }

第一次查询赋予查询条件和条数:

List<T> result = new ArrayList<>();
        List<SortOptions> sorts = new ArrayList<>();
        if (StringUtils.isNotBlank(dto.getOrder())) {
            SortOptions sortOptions = SortOptions.of(s -> s.field(f -> f.field(dto.getOrder()).order(SortOrder.valueOf(dto.getOrderType()))));
            sorts.add(sortOptions);
        }
        getFieldValues(dto, queries);
//使用scroll深度分页查询
        SearchResponse<HashMap> search = client.search(s -> s
                        .index(dto.getIndexName()).query(q->q.bool(b->b.must(queries))).size(5000).scroll(t->t.time("5s"))
                        .sort(sorts),
                HashMap.class);

取第一次的scrolId,继续执行后面的查询

StringBuffer scrollId = new StringBuffer(search.scrollId());
        //循环查询,直到查不到数据
        do {
            getResult(target, result, search);
            StringBuffer finalScrollId = scrollId;
            search = client.scroll(s->s.scrollId(finalScrollId.toString()).scroll(t->t.time("5s")),HashMap.class);
            scrollId = new StringBuffer(search.scrollId());
        }while (!search.hits().hits().isEmpty());

清除scroll:

查询结束后,需要使用client.clearScroll() 方法清除 scroll。

使用scroll api就无法实现跳页查询了,因为除了第一次查询外的其它查询都要依赖上一次查询返回的scrollId,这一点需要注意。

完整代码如下:

/**
     * @param
     * @param client
     * @param dto
     * @param target
     * @return java.util.List<T>
     * @author liuch
     * @description 根据关键字查询,基于游标查询scroll
     * @date 2022/4/2 17:15
     */
    public List<T> queryByFileds(ElasticsearchClient client, EsQueryDTO dto, List<Query> queries,Class<T> target) throws Exception {
        List<T> result = new ArrayList<>();
        List<SortOptions> sorts = new ArrayList<>();
        if (StringUtils.isNotBlank(dto.getOrder())) {
            SortOptions sortOptions = SortOptions.of(s -> s.field(f -> f.field(dto.getOrder()).order(SortOrder.valueOf(dto.getOrderType()))));
            sorts.add(sortOptions);
        }
        getFieldValues(dto, queries);
        //使用scroll深度分页查询
        SearchResponse<HashMap> search = client.search(s -> s
                        .index(dto.getIndexName()).query(q->q.bool(b->b.must(queries))).size(5000).scroll(t->t.time("5s"))
                        .sort(sorts),
                HashMap.class);

        StringBuffer scrollId = new StringBuffer(search.scrollId());
        List<String> strings = new ArrayList<>();
        strings.add(search.scrollId());
        //循环查询,直到查不到数据
        do {
            getResult(target, result, search);
            StringBuffer finalScrollId = scrollId;
            search = client.scroll(s->s.scrollId(finalScrollId.toString()).scroll(t->t.time("5s")),HashMap.class);
            scrollId = new StringBuffer(search.scrollId());
            strings.add(search.scrollId());
        }while (!search.hits().hits().isEmpty());
        //清除 scroll
        client.clearScroll(c->c.scrollId(strings));
        //getResult(target, result, search)
        return result;
    }

工具类完整代码如下:

package com.hdkj.hdiot.configure.es.utils;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.FieldValue;
import co.elastic.clients.elasticsearch._types.SortOptions;
import co.elastic.clients.elasticsearch._types.SortOrder;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch._types.query_dsl.QueryBuilders;
import co.elastic.clients.elasticsearch._types.query_dsl.RangeQuery;
import co.elastic.clients.elasticsearch.core.CountResponse;
import co.elastic.clients.elasticsearch.core.GetResponse;
import co.elastic.clients.elasticsearch.core.SearchRequest;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import co.elastic.clients.elasticsearch.core.search.Hit;
import com.alibaba.fastjson.JSON;
import com.hdkj.hdiot.configure.common.PageData;
import com.hdkj.hdiot.configure.es.bean.EsQueryDTO;
import net.sf.jsqlparser.expression.TimeValue;
import org.apache.commons.lang3.StringUtils;

import java.util.*;

/**
 * @author liuch
 * @title: ElasticClientUtils
 * @description: TODO
 * @date 2022/4/2 9:50
 */
public class ElasticClientUtils<T> {

    /**
     * @param
     * @param client
     * @param dto
     * @param target
     * @return java.util.List<T>
     * @author liuch
     * @description 根据关键字查询
     * @date 2022/4/2 17:15
     */
    public List<T> queryByFiled(ElasticsearchClient client, EsQueryDTO dto, Class<T> target) throws Exception {
        List<T> result = new ArrayList<>();
        List<SortOptions> sorts = new ArrayList<>();
        if (StringUtils.isNotBlank(dto.getOrder())) {
            SortOptions sortOptions = SortOptions.of(s -> s.field(f -> f.field(dto.getOrder()).order(SortOrder.valueOf(dto.getOrderType()))));
            sorts.add(sortOptions);
        }
        SearchResponse<HashMap> search = client.search(s -> s
                        .index(dto.getIndexName())
                        .query(q -> q.term(t -> t
                                .field(dto.getField())
                                .value(dto.getWord())
                        )).sort(sorts),
                HashMap.class);
        return getResult(target, result, search);
    }
    /**
     * @param
     * @param client
     * @param dto
     * @param target
     * @return java.util.List<T>
     * @author liuch
     * @description 根据关键字查询,基于游标查询scroll
     * @date 2022/4/2 17:15
     */
    public List<T> queryByFileds(ElasticsearchClient client, EsQueryDTO dto, List<Query> queries,Class<T> target) throws Exception {
        List<T> result = new ArrayList<>();
        List<SortOptions> sorts = new ArrayList<>();
        if (StringUtils.isNotBlank(dto.getOrder())) {
            SortOptions sortOptions = SortOptions.of(s -> s.field(f -> f.field(dto.getOrder()).order(SortOrder.valueOf(dto.getOrderType()))));
            sorts.add(sortOptions);
        }
        getFieldValues(dto, queries);
        //使用scroll深度分页查询
        SearchResponse<HashMap> search = client.search(s -> s
                        .index(dto.getIndexName()).query(q->q.bool(b->b.must(queries))).size(5000).scroll(t->t.time("5s"))
                        .sort(sorts),
                HashMap.class);
        StringBuffer scrollId = new StringBuffer(search.scrollId());
        //循环查询,直到查不到数据
        do {
            getResult(target, result, search);
            StringBuffer finalScrollId = scrollId;
            search = client.scroll(s->s.scrollId(finalScrollId.toString()).scroll(t->t.time("5s")),HashMap.class);
            scrollId = new StringBuffer(search.scrollId());
        }while (!search.hits().hits().isEmpty());
        //getResult(target, result, search)
        return result;
    }

    /**
     * @param
     * @param client
     * @param dto
     * @param target
     * @return java.util.List<T>
     * @author liuch
     * @description 根据关键字分页查询
     * @date 2022/4/2 17:15
     */
    public List<T> queryByFiledWithPage(ElasticsearchClient client, EsQueryDTO dto, Class<T> target) throws Exception {
        List<T> result = new ArrayList<>();
        List<SortOptions> sorts = new ArrayList<>();
        if (StringUtils.isNotBlank(dto.getOrder())) {
            SortOptions sortOptions = SortOptions.of(s -> s.field(f -> f.field(dto.getOrder()).order(SortOrder.valueOf(dto.getOrderType()))));
            sorts.add(sortOptions);
        }
        SearchResponse<HashMap> search = client.search(s -> s
                        .index(dto.getIndexName())
                        .query(q -> q.term(t -> t
                                .field(dto.getField())
                                .value(dto.getWord())
                        )).sort(sorts).from(dto.getFrom()).size(dto.getSize()),
                HashMap.class);
        return getResult(target, result, search);
    }



    private List<T> getResult(Class<T> target, List<T> result, SearchResponse<HashMap> search) {
        List<Hit<HashMap>> hits = search.hits().hits();
        Iterator<Hit<HashMap>> iterator = hits.iterator();
        while (iterator.hasNext()) {
            Hit<HashMap> decodeBeanHit = iterator.next();
            Map<String, Object> docMap = decodeBeanHit.source();
            docMap.put("id",decodeBeanHit.id());
            String json = JSON.toJSONString(docMap);
            T obj = JSON.parseObject(json, target);
            result.add(obj);
        }
        return result;
    }

    /**
     * @param
     * @param client
     * @param dto
     * @return long
     * @author liuch
     * @description 根据关键字查询总条数
     * @date 2022/4/2 17:15
     */
    public long queryCountByFiled(ElasticsearchClient client, EsQueryDTO dto) throws Exception {
        CountResponse count = client.count(c -> c.index(dto.getIndexName()).query(q -> q.term(t -> t
                .field(dto.getField())
                .value(dto.getWord())
        )));
        long total = count.count();
        return total;
    }


    /**
     * @param
     * @param client
     * @param dto
     * @param target
     * @return com.hdkj.hdiot.configure.common.PageData<T>
     * @author liuch
     * @description 查询分页信息
     * @date 2022/4/2 17:16
     */
    public PageData<T> queryPageByFiled(ElasticsearchClient client, EsQueryDTO dto, Class<T> target) throws Exception {
        long total = queryCountByFiled(client, dto);
        List<T> result = queryByFiledWithPage(client, dto, target);
        PageData<T> pageData = new PageData<>(result, total);
        return pageData;
    }

    /**
     * @param
     * @param client
     * @param dto
     * @param target
     * @return com.hdkj.hdiot.configure.common.PageData<T>
     * @author liuch
     * @description 查询分页信息-复合查询
     * @date 2022/4/2 17:16
     */
    public PageData<T> queryPageByFileds(ElasticsearchClient client, EsQueryDTO dto, List<Query> queries,Class<T> target) throws Exception {
        if(null == queries){
            queries = new ArrayList<>();
        }
        long total = queryCountByFileds(client,queries, dto);
        if(total>10000){
            total = 10000;
        }
        List<T> result = queryByFiledsWithPage(client, dto, queries,target);
        PageData<T> pageData = new PageData<>(result, total);
        return pageData;
    }
    /**
     * @param
     * @param client
     * @param dto
     * @return long
     * @author liuch
     * @description 根据关键字查询总条数-复合查询
     * @date 2022/4/2 17:15
     */
    public long queryCountByFileds(ElasticsearchClient client, List<Query> queries, EsQueryDTO dto) throws Exception {
        getFieldValues(dto, queries);
        CountResponse count = client.count(c -> c.index(dto.getIndexName()).query(q->q.bool(b->b.must(queries))));
        long total = count.count();
        return total;
    }


    /**
     * @param client
     * @param dto
     * @param target
     * @return java.util.List<T>
     * @author liuch
     * @description 根据关键字分页查询- 复合查询
     * @date 2022/4/2 17:15
     */
    public List<T> queryByFiledsWithPage(ElasticsearchClient client, EsQueryDTO dto, List<Query> queries,Class<T> target) throws Exception {
        List<T> result = new ArrayList<>();
        List<SortOptions> sorts = new ArrayList<>();
        if (StringUtils.isNotBlank(dto.getOrder())) {
            SortOptions sortOptions = SortOptions.of(s -> s.field(f -> f.field(dto.getOrder()).order(SortOrder.valueOf(dto.getOrderType()))));
            sorts.add(sortOptions);
        }
        SearchResponse<HashMap> search = client.search(s -> s
                        .index(dto.getIndexName())
                        .query(q->q.bool(b->b.must(queries)))
                        .sort(sorts).from(dto.getFrom()).size(dto.getSize()),
                HashMap.class);
        return getResult(target, result, search);
    }

    /**
     * 构件复合查询条件
     * @param dto
     * @param queries
     */
    private void getFieldValues(EsQueryDTO dto, List<Query> queries) {
        List<FieldValue> fieldValues = new ArrayList<>();
        //根据关键字列表构件复合查询的值
        dto.getWords().stream().forEach(word -> fieldValues.add(FieldValue.of(word)));
        //查询条件列表
        queries.add(Query.of(q->q.terms(t -> t.field(dto.getField()).terms(v -> v.value(fieldValues)))));
    }


    /**
     * @param
     * @param client
     * @param dto
     * @param target
     * @return java.lang.Object
     * @author liuch
     * @description 根据文档id查询
     * @date 2022/4/2 17:16
     */
    public Object queryByDocumentId(ElasticsearchClient client, EsQueryDTO dto, Class<T> target) throws Exception {
        GetResponse<HashMap> getResponse = client.get(s -> s
                        .index(dto.getIndexName()).id(dto.getWord()),
                HashMap.class);
        getResponse.source();
        Map<String, Object> docMap = getResponse.source();
        String json = JSON.toJSONString(docMap);
        T obj = JSON.parseObject(json, target);
        return obj;
    }

}

前面的文章实现了基于es8的简单查询和复合查询,希望能帮助到各位:

springboot elasticSearch8开发(二),实现复杂检索_大师兄小灰灰的博客-CSDN博客上一篇文章发布已经过去了四个月,终于有时间补充一下了。基于上篇简单查询基础上,新增了复合查询的功能。https://blog.csdn.net/qq_24473507/article/details/126524062

springboot elasticSearch8开发(一),实现简单查询_大师兄小灰灰的博客-CSDN博客springboot+ElasticSearch8开发,实现简单查询https://blog.csdn.net/qq_24473507/article/details/123924463

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/qq_24473507/article/details/126525835
————————————————
版权声明:本文为CSDN博主「大师兄小灰灰」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qq_24473507/article/details/126525835

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐