参考:Elasticsearch Search Scroll API(滚动查询) - 简书

Elasticsearch 中,传统的分页查询使用from+size的模式,from就是页码,从 0 开始。默认情况下,当(from+1)*size大于 10000 时,也就是已查询的总数据量大于 10000 时,会出现异常。

如下,用循环模拟一个连续分页查询:

public void search() {
        // 记录页码
        int page = 0;
        // 记录已经查询到总数据量
        long total = 0;

        while (true) {
            NativeSearchQuery nativeSearchQuery = new NativeSearchQueryBuilder()
                    // 设置分页
                    .withPageable(PageRequest.of(page, 1000))
                    .withSort(new FieldSortBuilder("commentCount").order(SortOrder.DESC))
                    .build();

            SearchHits<Book> searchHits = elasticsearchRestTemplate.search(nativeSearchQuery, Book.class);
            if (!searchHits.hasSearchHits()) {
                break;
            }
            for (SearchHit<Book> searchHit : searchHits.getSearchHits()) {
                Book book = searchHit.getContent();
            }
            page++;
            System.out.println(page);
            System.out.println(total += searchHits.getSearchHits().size());
        }
}

最终当 page 等于 10 时会抛出如下异常:

Caused by: ElasticsearchException[Elasticsearch exception [type=illegal_argument_exception, reason=Result window is too large, from + size must be less than or equal to: [10000] but was [11000]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level setting.]]

从异常信息中,我们可以发现官方给我们提供了两种方案来解决这个问题:

1、max_result_window

  • 将 Elasticsearch 配置参数index.max_result_window修改为大于 100000 的值,对应的 RESTful API 如下:
PUT book/_settings
{
    "index": {
        "max_result_window": 1000000
    }
}

虽然可以通过修改index.max_result_window来解决查询时数据量的限制,但是这并不是不推荐的做法,当数据量达到百万、千万级别时,使用from+size模式查询时性能会越来越差,每次查询的耗时也会越来越久,严重影响体验,同时对 CPU 和内存的消耗也很大的。

2、scroll api

如果需要查询大量的数据,可以考虑使用 Search Scroll API,这是一种更加高效的方式。

如果直接使用 Java Client,可以参考官方的 API 文档:
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/7.9/java-rest-high-search-scroll.html

我们这里还是和 SpringBoot 整合去使用,其实核心的用法都是很类似的。如下同样模拟一个连分页查询:

public void scrollSearch() {
        NativeSearchQuery nativeSearchQuery = new NativeSearchQueryBuilder()
                .withSort(new FieldSortBuilder("commentCount").order(SortOrder.DESC))
                .build();
        // 设置每页数据量
        nativeSearchQuery.setMaxResults(1000);
    
        long scrollTimeInMillis = 60 * 1000;
        // 第一次查询
        SearchScrollHits<Book> searchScrollHits = elasticsearchRestTemplate.searchScrollStart(scrollTimeInMillis, nativeSearchQuery, Book.class, IndexCoordinates.of("book"));
        String scrollId = searchScrollHits.getScrollId();

        while (searchScrollHits.hasSearchHits()) {
            System.out.println(total += searchScrollHits.getSearchHits().size());

            for (SearchHit<Book> searchHit : searchScrollHits.getSearchHits()) {
                Book book = searchHit.getContent();
            }
            // 后续查询
            searchScrollHits = elasticsearchRestTemplate.searchScrollContinue(scrollId, scrollTimeInMillis, Book.class, IndexCoordinates.of("book"));
            scrollId = searchScrollHits.getScrollId();
        }

        List<String> scrollIds = new ArrayList<>();
        scrollIds.add(scrollId);
        // 清除 scroll
        elasticsearchRestTemplate.searchScrollClear(scrollIds);
}

以下几点需要注意:

  • setMaxResults(1000)用来设置查询时每页的数据量,我这里使用 Elasticsearch7.9 有这个方法,如果其它旧版本没有这个方法,可以使用PageRequest.of(0, 1000)来设置,注意页码要为 0。
  • 第一次查询使用searchScrollStart(),后续查询使用searchScrollContinue(),查询结果中都携带了一个scrollId
  • 除了第一次查询外,后续的查询都需要携带scrollId,可以理解为游标,用它来控制分页。和from+size模式中页码是一个作用。
  • scrollTimeInMillis,表示查询结果中scrollId的有效时间,单位毫秒,可根据实际情况设置。
  • 查询结束后,需要使用searchScrollClear()清除 scroll。
  • from+size分页查询模式中,我们可以指定任意合理的页码,实现跳页查询;但使用scroll api就无法实现跳页查询了,因为除了第一次查询外的其它查询都要依赖上一次查询返回的scrollId,这一点需要注意。


原文中可能会空查一次,少许修改代码,如下:

void searchScroll(){
        NativeSearchQuery query = new NativeSearchQuery(QueryBuilders.matchAllQuery());
        query.setMaxResults(1);//设置每页数据量
        query.addSort(Sort.by(Sort.Direction.DESC,"age"));

        long scrollTimeInMillis=5_000;
        long currentTotal=0;
        int pageNo=1;
        List<String> scrollIdList = new ArrayList<>();

        //scroll一共有三个方法:searchScrollStart(第一次查询)、searchScrollContinue(第二次到最后一次)、searchScrollClear(查询完成后执行)

        //第一次查询使用:searchScrollStart
        SearchScrollHits<People> searchScrollHits = this.elasticsearchRestTemplate.searchScrollStart(scrollTimeInMillis, query, People.class, IndexCoordinates.of("people_index"));
        String scrollId = searchScrollHits.getScrollId();
        scrollIdList.add(scrollId);
        System.out.println("scrollId:"+scrollId);

        long totalHits = searchScrollHits.getTotalHits();
        currentTotal=searchScrollHits.getSearchHits().size();
        System.out.println("totalHits:"+totalHits);

        List<People> list = searchScrollHits.get().map(SearchHit::getContent).collect(Collectors.toList());
        System.out.println("============pageNo:==========="+pageNo);
        for (People people : list) {
            System.out.println(people);
        }

        while (currentTotal<totalHits){
            SearchScrollHits<People> searchScrollHitsContinue = elasticsearchRestTemplate.searchScrollContinue(scrollId, scrollTimeInMillis, People.class, IndexCoordinates.of("people_index"));
            scrollId=searchScrollHitsContinue.getScrollId();
            scrollIdList.add(scrollId);
            pageNo++;
            if(searchScrollHitsContinue.hasSearchHits()){
                currentTotal+=searchScrollHitsContinue.getSearchHits().size();
                List<People> peopleList = searchScrollHitsContinue.get().map(SearchHit::getContent).collect(Collectors.toList());
                System.out.println("============pageNo:==========="+pageNo);
                for (People people : peopleList) {
                    System.out.println(people);
                }
            }else{
                System.out.println("============pageNo not hasSearchHits===========");
                break;
            }
        }
        System.out.println(scrollIdList);
        elasticsearchRestTemplate.searchScrollClear(scrollIdList);
    }

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐