查es大于10000条数据-滚动查询(scroll)

背景

总所周知,es一般查询只支持最多查询出前1w条数据,很难受。想要一次性查询出你想要的数据,一些大数据的场景下,我们需要用到ElasicSearch的两种查询方式:深度分页或者滚动查询,我们今天使用的是滚动查询方式,因为需要一批次加载全部使用的数据。

介绍

深度分页

使用from和size来查询,操作比较简单,如下:

{
    "query": {
        "match_all": {}
    },
    "from": 9990,
    "size": 10
}

{
    "query": {
        "match_all": {}
    },
    "from": 9999,
    "size": 10
}

我们在获取第9999条到10009条数据的时候,每次需要将前9990、9999条都查出来,然后再向下寻找后10条。如果es还有分片存在,加载的数量就是9990*分片数量,这样查询到以后,还要排序处理,得到10条数据。。。如此一来,搜索得太深,就会造成性能问题,会耗费内存和占用cpu。

其实我们应该避免深度分页操作(限制分页页数),比如最多只能提供100页的展示,从第101页开始就没了,毕竟用户也不会搜的那么深,我们平时搜索淘宝或者百度,一般也就看个10来页就顶多了。

譬如淘宝搜索限制分页最多100页,如下:
在这里插入图片描述

滚动查询

通过上面可以指定,from-size不适合做离线大数据的场景,因此我们使用es提供的另一种查询大量数据的方式——滚动查询,也叫游标查询:
json处理如下:

#第一次查询:
GET /sms/_search?scroll=5m
{
  "size": 20,
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "userId": "9d995c0b90fe4128896a1a84eca213bf"
          }
        }
      ]
    }
  }
}
返回结果:
{
  "_scroll_id": "DnF1ZXJ5VGhlbkZldGNoBgAAAAAATJH1FlFTYzlSZ0VNVGdlM2o0T0dTX2tVUncAAAAAAE0-zBZQUVp6Sy04X1J1NjJCaVZfQUhHWjFnAAAAAABMkfYWUVNjOVJnRU1UZ2UzajRPR1Nfa1VSdwAAAAAATXVxFk83UWRhNGg3UmxTQnpXTEUzd0dreXcAAAAAAEyR9xZRU2M5UmdFTVRnZTNqNE9HU19rVVJ3AAAAAABNPs0WUFFaekstOF9SdTYyQmlWX0FIR1oxZw==",
  "took": 6,
      ......
}
把上一次得到的_scroll_id拿到按以下查询即可得到下一轮的数据:
GET /_search/scroll/
{
  "scroll":"1m",
  "scroll_id":"DnF1ZXJ5VGhlbkZldGNoBgAAAAAATJH1FlFTYzlSZ0VNVGdlM2o0T0dTX2tVUncAAAAAAE0-zBZQUVp6Sy04X1J1NjJCaVZfQUhHWjFnAAAAAABMkfYWUVNjOVJnRU1UZ2UzajRPR1Nfa1VSdwAAAAAATXVxFk83UWRhNGg3UmxTQnpXTEUzd0dreXcAAAAAAEyR9xZRU2M5UmdFTVRnZTNqNE9HU19rVVJ3AAAAAABNPs0WUFFaekstOF9SdTYyQmlWX0FIR1oxZw=="
}
这样直到把数据查完为止。

使用json查询只是做一个简单的理解,我们真正用到的还是使用语言去操作它,RestHignLevelClient就是一个很实用的es客户端,接下来我们使用java对其进行操作:导入的pom如下,需要与es版本对应:

		<dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.0.0</version>
        </dependency>

直接上一个对api调用的工具类吧:❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️
其中,获得SearchResponse使用了滚动查询:具体是由以下几个模块组成的:

1、构建searchRequest

Scroll scroll = new Scroll(TimeValue.timeValueMillis(SCROLL_TIMEOUT));

        //构建searchRequest
        SearchRequest request = new SearchRequest(indices);
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        if (includes != null) {
            //构造器加入需要查找的字段
            sourceBuilder.fetchSource(includes, null);
        }
        //加入query语句
        sourceBuilder.query(query);
        //每次滚动的长度
        sourceBuilder.size(SIZE);
        //加入排序字段
        if (orderField != null && !"".equals(orderField.trim())) {
            sourceBuilder.sort(orderField, order);
        }
        //加入scroll和构造器
        request.scroll(scroll);
        request.source(sourceBuilder);

获取返回结果:

SearchResponse searchResponse = client.search(request, RequestOptions.DEFAULT);
        //拿到第一个ScrollId(游标)
        String scrollId = searchResponse.getScrollId();
        //拿到hits结果
        SearchHit[] hits = searchResponse.getHits().getHits();
        //保存返回结果List
        List<T> result = new ArrayList<>();
        scrollIdList.add(scrollId);

循环滚动查询—>保存结果:

//滚动查询将SearchHit封装到result中
            while (ArrayUtils.isNotEmpty(hits)) {
                for (SearchHit hit : hits) {
                    //Function<SearchHit, T>, 输入SearchHit,经过操作后,返回T结果
                    result.add(fun.apply(hit));
                }
                //说明滚动完了,返回结果即可
                if (hits.length < SIZE) {
                    break;
                }
                //继续滚动,根据上一个游标,得到这次开始查询位置
                SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId);
                searchScrollRequest.scroll(scroll);
                //得到结果
                SearchResponse searchScrollResponse = client.scroll(searchScrollRequest, RequestOptions.DEFAULT);
                //定位游标
                scrollId = searchScrollResponse.getScrollId();
                hits = searchScrollResponse.getHits().getHits();
                scrollIdList.add(scrollId);

util全部代码如下:

package com.yq.demo.Util;

import org.apache.commons.lang3.ArrayUtils;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;

public class ESUtil {
    private static final Logger log = LoggerFactory.getLogger(ESUtil.class);

    private static final long SCROLL_TIMEOUT = 180000;

    private static int SIZE = 1000;

    private static int MAX_BUFFER = 209715200;

    /**
     * 构建SearchResponse
     *
     * @param client     restHighLevelClient
     * @param indices    索引
     * @param query      queryBuilder
     * @param includes   包含的字段
     * @param orderField 排序字段
     * @param order      排序类型
     * @param fun        返回函数
     * @param <T>        返回类型
     * @return List, 可以使用fun转换为T结果
     * @throws Exception
     */
    public static <T> List<T> searchResponse(RestHighLevelClient client, String[] indices, QueryBuilder query, String[] includes, String orderField, SortOrder order, Function<SearchHit, T> fun) throws Exception {
        //滚动查询的Scroll
        Scroll scroll = new Scroll(TimeValue.timeValueMillis(SCROLL_TIMEOUT));

        //构建searchRequest
        SearchRequest request = new SearchRequest(indices);
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        if (includes != null) {
            //构造器加入需要查找的字段
            sourceBuilder.fetchSource(includes, null);
        }
        //加入query语句
        sourceBuilder.query(query);
        //每次滚动的长度
        sourceBuilder.size(SIZE);
        //加入排序字段
        if (orderField != null && !"".equals(orderField.trim())) {
            sourceBuilder.sort(orderField, order);
        }
        //加入scroll和构造器
        request.scroll(scroll);
        request.source(sourceBuilder);
        //存储scroll的list
        List<String> scrollIdList = new ArrayList<>();
        //返回结果
        SearchResponse searchResponse = client.search(request, RequestOptions.DEFAULT);
        //拿到第一个ScrollId(游标)
        String scrollId = searchResponse.getScrollId();
        //拿到hits结果
        SearchHit[] hits = searchResponse.getHits().getHits();
        //保存返回结果List
        List<T> result = new ArrayList<>();
        scrollIdList.add(scrollId);

        try {
            //滚动查询将SearchHit封装到result中
            while (ArrayUtils.isNotEmpty(hits)) {
                for (SearchHit hit : hits) {
                    //Function<SearchHit, T>, 输入SearchHit,经过操作后,返回T结果
                    result.add(fun.apply(hit));
                }
                //说明滚动完了,返回结果即可
                if (hits.length < SIZE) {
                    break;
                }
                //继续滚动,根据上一个游标,得到这次开始查询位置
                SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId);
                searchScrollRequest.scroll(scroll);
                //得到结果
                SearchResponse searchScrollResponse = client.scroll(searchScrollRequest, RequestOptions.DEFAULT);
                //定位游标
                scrollId = searchScrollResponse.getScrollId();
                hits = searchScrollResponse.getHits().getHits();
                scrollIdList.add(scrollId);
            }
        } finally {
            //清理scroll,释放资源
            ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
            clearScrollRequest.setScrollIds(scrollIdList);
            client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
        }
        return result;
    }

    /**
     * 聚合查询的SearchResponse
     * @param client
     * @param indices        索引
     * @param query QueryBuilder
     * @param aggregations  AggregationBuilder
     * @return SearchResponse
     * @throws Exception
     */

    public static SearchResponse searchResponse(RestHighLevelClient client, String[] indices, QueryBuilder query, AggregationBuilder... aggregations) throws Exception {
        //构建request请求
        SearchRequest request = new SearchRequest(indices);
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.query(query);
        //加入Agg
        if (aggregations != null && aggregations.length > 0) {
            for (AggregationBuilder aggregation : aggregations) {
                sourceBuilder.aggregation(aggregation);
            }
        }
        sourceBuilder.size(0);
        //忽略不可用索引,只用于开放索引
        request.indicesOptions(IndicesOptions.lenientExpandOpen());
        request.source(sourceBuilder);
        return client.search(request, RequestOptions.DEFAULT);
    }
}

希望对你有所帮助,Thank you for whatching!!!😆😆😆😆😆😆

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐