这篇文章是翻译过来的,原文在此,需要科学上网。

当查询页很深或者查询的数据量很大时,深查询就会出现。es 的自我保护机制允许的一次最大查询量是 10000 条数据。在请求中加入trackTotalHits(true) 可以解除10000条的上限。

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().trackTotalHits(true);

三种批量查询

from size

这种实现方式有点类似于 MySQL 中的 limit。性能差,实现简单,适用于少量数据,但优点是可以随机跳转页面。

package com.example.es.test;

import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;


public class ESTest_from_size {

       public static final Logger logger = LoggerFactory.getLogger(ESTest_searchAfter.class);

        public static void main(String[] args) throws Exception{
            long startTime = System.currentTimeMillis();
            RestHighLevelClient esClient = new RestHighLevelClient(
                    RestClient.builder(new HttpHost("localhost", 9200, "http")));
            SearchRequest searchRequest = new SearchRequest("audit2");
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
            // 偏移量(第几页)
            sourceBuilder.from(0);
            // 每页多少个元素。
            sourceBuilder.size(1000);
            // 按照指定属性排序。
            sourceBuilder.sort(SortBuilders.fieldSort("operationtime").order(SortOrder.DESC));
            searchRequest.source(sourceBuilder);
            SearchResponse searchResponse = esClient.search(searchRequest, RequestOptions.DEFAULT);
            SearchHit[] hits = searchResponse.getHits().getHits();
            List<Map<String, Object>> result = new ArrayList<>();
            if (hits != null && hits.length > 0) {
                for (SearchHit hit : hits) {
                    Map<String, Object> sourceAsMap = hit.getSourceAsMap();
                    result.add(sourceAsMap);
                }
            }
            logger.info("The number of data queried is:{}", result.size());
            esClient.close();
            logger.info("Running time: " + (System.currentTimeMillis() - startTime) + "ms");
        }
    }
}

scroll

高效的滚动查询,第一个查询会在内存中保存一个历史快照和光标(scroll_id)来记录当前消息查询的终止位置。下次查询会从光标记录的位置往后进行查询。这种方式性能好,不是事实的,一般用于海量数据导出或者重建索引。但是 scroll_id 有过期时间,两次查询之间如果 scroll_id 过期了,第二次查询会抛异常“找不到 “scroll_id”。假如场景是读一批数据,处理,再读再处理,恰好处理过称很花费时间且不确定,那很可能会遇到 scroll_id 过期。

package com.example.es.test;

import org.apache.http.HttpHost;
import org.elasticsearch.action.search.*;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;


/**
 * @author 
 * @Description  java Realize scroll scrolling query
 * @date 2021/12/08 14:09
 */
public class ESTest_Scroll {

    public static final Logger logger = LoggerFactory.getLogger(ESTest_Scroll.class);

    public static void main(String[] args) throws Exception{
        long startTime = System.currentTimeMillis();
        // Create ES client
        RestHighLevelClient esClient = new RestHighLevelClient(
                RestClient.builder(new HttpHost("localhost", 9200, "http"))
        );
        // 1. Create searchRequest
        SearchRequest searchRequest = new SearchRequest("audit2");
        // 2. Specify scroll information
        searchRequest.scroll(TimeValue.timeValueMinutes(1L));
        // 3. Specify query criteria
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.size(1000);
        searchSourceBuilder.sort(SortBuilders.fieldSort("operationtime").order(SortOrder.DESC));//Multi condition query
        searchRequest.source(searchSourceBuilder);
        //4. Get the returned result scrollId, source
        SearchResponse searchResponse = esClient.search(searchRequest, RequestOptions.DEFAULT); //Initialize the search context by sending an initial search request
        String scrollId = searchResponse.getScrollId();
        SearchHit[] searchHits = searchResponse.getHits().getHits();
        List<Map<String, Object>> result = new ArrayList<>();
        for (SearchHit hit: searchHits) {
            result.add(hit.getSourceAsMap());
        }
        // java is the same. We need to query twice. First, find out our home page
        // After the query, we need to get his id
        // Then use his id to query his next page
        while (true) {
            //5. Loop - create SearchScrollRequest create a new search scroll request and save the last returned scroll identifier and scroll interval
            // Get scrollId to query the next page
            SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
            //6. Specifies the lifetime of the scrollId
            scrollRequest.scroll(TimeValue.timeValueMinutes(1L));
            //7. Execute the query to get the returned results
            SearchResponse scrollResp = esClient.scroll(scrollRequest, RequestOptions.DEFAULT);
            //8. Judge whether the data is queried and output
            SearchHit[] hits = scrollResp.getHits().getHits();
            //Cycle output next page
            if (hits != null && hits.length > 0) {
                for (SearchHit hit : hits) {
                    result.add(hit.getSourceAsMap());
                }
            } else {
                //9. Judge that no data is found and exit the cycle
                break;
            }
        }
        //After checking, we delete the id stored in the cache. After scrolling, clear the scrolling context
        //10. Create ClearScrollRequest
        ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
        //11. Specify scrollId
        clearScrollRequest.addScrollId(scrollId);
        //12. Delete scrollId
        ClearScrollResponse clearScrollResponse = esClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
        //13. Output results
        boolean succeeded = clearScrollResponse.isSucceeded();
        logger.info("delete scrollId: {}", succeeded);
        logger.info("Total number of queries:{}", result.size());
        // Close client
        esClient.close();
        logger.info("Running time: " + (System.currentTimeMillis() - startTime) + "ms");
    }

}

search after

顾名思义,从指定的某个数据后面开始读。这种方式不能随机跳转分页,只能一页一页地读取数据,而且必须用一个唯一且不重复的属性对待查数据进行排序。这种方式的优点是批量查询但不依赖于 scroll_id,所以后续处理可以不考虑耗费时间的问题。

package com.example.es.test;

import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * @author
 * @Description  es Search for_ After method
 * @date 2022/01/11 14:04
 */
public class ESTest_searchAfter {

    public static final Logger logger = LoggerFactory.getLogger(ESTest_searchAfter.class);

    public static void main(String[] args) throws Exception{
        long startTime = System.currentTimeMillis();
        // Create ES client
        RestHighLevelClient esClient = new RestHighLevelClient(
                RestClient.builder(new HttpHost("localhost", 9200, "http"))
        );
        // 1. Create searchRequest
        SearchRequest searchRequest = new SearchRequest("audit2");
        // 2. Specify query criteria
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().trackTotalHits(true);//Track must be added_ total_ Hits, or only 10000 will be displayed
        //Set the number of data queried per page
        sourceBuilder.size(1000);
        // Set unique sort value positioning
        sourceBuilder.sort(SortBuilders.fieldSort("operationtime").order(SortOrder.DESC));//Multi condition query
        //Add the sourceBuilder object to the search request
        searchRequest.source(sourceBuilder);
        // Send request
        SearchResponse searchResponse = esClient.search(searchRequest, RequestOptions.DEFAULT);
        SearchHit[] hits1 = searchResponse.getHits().getHits();
        List<Map<String, Object>> result = new ArrayList<>();
        if (hits1 != null && hits1.length > 0) {
            do {
                for (SearchHit hit : hits1) {
                    // Get required data
                    Map<String, Object> sourceAsMap = hit.getSourceAsMap();
                    result.add(sourceAsMap);
                }
                // Get the last sort value sort, which is used to record the data retrieval from this place next time
                SearchHit[] hits = searchResponse.getHits().getHits();
                Object[] lastNum = hits[hits.length - 1].getSortValues();
                // Set the last sort value of searchAfter
                sourceBuilder.searchAfter(lastNum);
                searchRequest.source(sourceBuilder);
                // Make the next query
                searchResponse = esClient.search(searchRequest, RequestOptions.DEFAULT);
            } while (searchResponse.getHits().getHits().length != 0);
        }
        logger.info("The number of data queried is:{}", result.size());
        // Close client
        esClient.close();
        logger.info("Running time: " + (System.currentTimeMillis() - startTime) + "ms");
    }

}
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐