java使用ElasticSearch的scroll查询,高效的解决es查询数量的限制。

一、为什么要使用ES的scroll

(1)首先我们要明白es的查询机制:ES的搜索是分2个阶段进行的,即Query阶段和Fetch阶段

  • Query阶段比较轻量级,通过查询倒排索引,获取满足查询结果的文档ID列表。

  • Fetch阶段比较重,需要将每个分片的查询结果取回,在协调结点进行全局排序。 通过From+size这种方式分批获取数据的时候,随着from加大,需要全局排序并丢弃的结果数量随之上升,性能越来越差。

(2)es在进行普通的查询时,默认只给查询出来十条数据。

​ 通过设置size的值可以使查询结果从10增大到1000条数据,当超出1000条数据的时候就会只显示出来1000条数据。

​ 为了解决上面的问题可以采用一种效率比较低的方法,在创建索引的时候添加如下配置

  • 
    "settings":{
    		"index":{
    			"max_result_window": 在这里填入你需要的大小				}
    			}
    

(3)如果进行高效的查询呢?那就需要使scroll滚动查询了。

Scroll查询,先做轻量级的Query阶段以后,免去了繁重的全局排序过程。 它只是将查询结果集,也就是doc_id列表保留在一个上下文里, 之后每次分批取回的时候,只需根据设置的size,在每个分片内部按照一定顺序(默认doc_id续), 取回size数量大小的数据即可。

二、如何使用scroll

(1)首先引入elasticsearch的坐标

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
		</dependency>

(2)编写如下代码


    /**
     * 使用es的scroll方法来滚动查询es的数据,可以有效的解决大数据容量读取的限制
     * @param index  es的索引名称
     * @param host   es的主机ip号
     * @param port   es的端口号
     * @param beginDate   构造查询条件需要的条件之一 (可以根据自己需求定义es的查询条件)
     * @param endDate     构造查询条件需要的条件之一 (可以根据自己需求定义es的查询条件)
     */
    public void scrollDemo(String index,String host,int port,String beginDate,String  endDate) throws ParseException {
        RestHighLevelClient restHighLevelClient=new RestHighLevelClient(RestClient.builder(new HttpHost(host,port,"http")));
        //构造查询条件
        SearchRequest searchRequest = new SearchRequest(index);
        SearchSourceBuilder builder = new SearchSourceBuilder();
        //设置查询超时时间
        Scroll scroll = new Scroll(TimeValue.timeValueMinutes(5L));
        builder.query(QueryBuilders.rangeQuery("datetime").gte(beginDate).lte(endDate));
        //设置最多一次能够取出1000笔数据,从第1001笔数据开始,将开启滚动查询  
        //PS:滚动查询也属于这一次查询,只不过因为一次查不完,分多次查
        builder.size(1000);
        searchRequest.source(builder);
        //将滚动放入
        searchRequest.scroll(scroll);
        SearchResponse searchResponse = null;
        try {
            searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            System.out.println("查询索引库失败");
        }
        SearchHits hits= searchResponse.getHits();
        SearchHit[] hit= hits.getHits();
        /**
        *在这个位置已经读到了前一千条数据,可以在这先对这一千数据进行处理。下面滚动查询剩下的数据
        */
        //记录要滚动的ID
        String scrollId = searchResponse.getScrollId();
        //滚动查询部分,将从第1001笔数据开始取
        SearchHit[] hitsScroll = hits.getHits();
        while (hitsScroll != null && hitsScroll.length > 0 ) {
            //构造滚动查询条件
            SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId);
            searchScrollRequest.scroll(scroll);
            try {
                //响应必须是上面的响应对象,需要对上一层进行覆盖
                searchResponse = restHighLevelClient.scroll(searchScrollRequest, RequestOptions.DEFAULT);
            } catch (IOException e) {
                System.out.println("滚动查询失败");
            }
            scrollId = searchResponse.getScrollId();
            hits = searchResponse.getHits();
            hitsScroll = hits.getHits();
            /**
            *在这个位置可以对滚动查询到的从1001条数据开始的数据进行处理。
            */
        }
        //清除滚动,否则影响下次查询
        ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
        clearScrollRequest.addScrollId(scrollId);
        ClearScrollResponse clearScrollResponse = null;
        try {
            clearScrollResponse = restHighLevelClient.clearScroll(clearScrollRequest,RequestOptions.DEFAULT);
        } catch (IOException e) {
            System.out.println("滚动查询删除失败");
        }
        //清除滚动是否成功
        boolean succeeded = clearScrollResponse.isSucceeded();
    }

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐