简介

   Elasticsearch滚动查询也叫游标查询

   适合那种需要一次性或分批拉出大量数据做离线处理、迁移等。可以提升点效率。

实践中我使用到滚动的场景
  • 需求需要从几个不同的es数据源拉取、截取数据,合到一个新的业务数据源中。
  • 每天夜里有定时任务需要拉取某天的索引数据,根据某个字段去重后拿去做离线业务处理。

注意:scroll不适合支持那种实时的和用户交互的前端分页工作,实时分页查询可以使用from-size方式。但同时from-size也不适用上述离线大数据量处理业务场景。

from-size分页的缺点
GET /{index_name}/_search
{
"from":0,
"size":10
}

   es客户端实时分页一般使用from-size。如果有100条数据,按size=10共分10页,那么当用户查询第n页的时候,实际上es是把前n页的数据全部找出来,再去除前n-1页最后得到需要的数据返回,查最后一页就相当于全扫描。其中利弊大家自行思考。所以离线大批量数据的处理业务或迁移不适合使用from-size方式查询。

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.from((page.getPageNum() - 1) * page.getPageSize());
searchSourceBuilder.size(page.getPageSize());
json处理步骤

   我们可以给初始化查询传递参数scroll=5m ,es会返回一个_scroll_id,这是一个base64编码的长字符串,用于下次查询时传入。5m表示_scroll_id缓存5分钟,之后自动过期,可以根据需要配置。size可以指定每次滚动拉取多少数据。不过如果你做了分片,查询结果可能超过指定的 size 大小。

案例如下

例如:第一次查询

GET /sms/_search?scroll=5m
{
  "size": 20,
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "userId": "9d995c0b90fe4128896a1a84eca213bf"
          }
        }
      ]
    }
  }
}

返回:

{
  "_scroll_id": "DnF1ZXJ5VGhlbkZldGNoBgAAAAAATJH1FlFTYzlSZ0VNVGdlM2o0T0dTX2tVUncAAAAAAE0-zBZQUVp6Sy04X1J1NjJCaVZfQUhHWjFnAAAAAABMkfYWUVNjOVJnRU1UZ2UzajRPR1Nfa1VSdwAAAAAATXVxFk83UWRhNGg3UmxTQnpXTEUzd0dreXcAAAAAAEyR9xZRU2M5UmdFTVRnZTNqNE9HU19rVVJ3AAAAAABNPs0WUFFaekstOF9SdTYyQmlWX0FIR1oxZw==",
  "took": 6,
      ......
}

之后我们把上一次得到的_scroll_id拿到按以下查询即可得到下一轮的数据。

GET /_search/scroll/
{
  "scroll":"1m",
  "scroll_id":"DnF1ZXJ5VGhlbkZldGNoBgAAAAAATJH1FlFTYzlSZ0VNVGdlM2o0T0dTX2tVUncAAAAAAE0-zBZQUVp6Sy04X1J1NjJCaVZfQUhHWjFnAAAAAABMkfYWUVNjOVJnRU1UZ2UzajRPR1Nfa1VSdwAAAAAATXVxFk83UWRhNGg3UmxTQnpXTEUzd0dreXcAAAAAAEyR9xZRU2M5UmdFTVRnZTNqNE9HU19rVVJ3AAAAAABNPs0WUFFaekstOF9SdTYyQmlWX0FIR1oxZw=="
}

除了等待scroll_id过期时间之外,我们也可以手动删除scroll_id:


// 手动删除scroll_id的方法
DELETE /_search/scroll { "scroll_id" : "DnF1ZXJ5VGhlbkZldGNoBgAAAAAATJH1FlFTYzlSZ0VNVGdlM2o0T0dTX2tVUncAAAAAAE0-zBZQUVp6Sy04X1J1NjJCaVZfQUhHWjFnAAAAAABMkfYWUVNjOVJnRU1UZ2UzajRPR1Nfa1VSdwAAAAAATXVxFk83UWRhNGg3UmxTQnpXTEUzd0dreXcAAAAAAEyR9xZRU2M5UmdFTVRnZTNqNE9HU19rVVJ3AAAAAABNPs0WUFFaekstOF9SdTYyQmlWX0FIR1oxZw==" }
java 处理步骤
代码逻辑
  • 我们可以使用一个循环去查询,第一次查询的时候按需要的查询条件处理,加上参数scroll即可,
  • 之后的查询均使用GET /_search/scroll/ 传递_scroll_id查询,如果返回数据为空则终止循环。

es version: 6.3.2

	<dependency>
			<groupId>io.searchbox</groupId>
			<artifactId>jest</artifactId>
		</dependency>
简化版java代码如下:
String rollId = null;
//滚动查询
while (true) {
    // 第一次查询
    if (rollId == null) {
            // 按业务需求查询
            JestResult jestResult = jestSmsClient.execute(getSmsShortSearchBuilder(createDate));
            if (jestResult.isSucceeded()) {
                // 获取scroll_id以备下次查询
                rollId =  jestResult.getJsonObject().get("_scroll_id").getAsString();
                //获取查询结果业务处理
                result = .......
            }
    } else {
                      // 后续查询使用rollId
            JestResult jestResult = jestSmsClient.execute(new SearchScroll.Builder(rollId, "5m").build());
            if (jestResult.isSucceeded()) {
                // 获取scroll_id以备下次查询
                rollId =  jestResult.getJsonObject().get("_scroll_id").getAsString();
               //获取查询结果业务处理
                            result =  .......
            }
    }
    // 没有数据返回,你也可以认为:如果返回的数据长度小于size时可以跳出循环。没毛病可能会少调用一次es。不过可能会由于多个分片导致并没有少调用一次
    if(CollectionUtils.isEmpty(result)) {
        break;
    }
}

稍微写详细点凑下字数:

public final static String ES_INDEX = "es索引名";
public final static String ES_TYPE = "_doc";
String rollId = null;
//滚动查询
while (true) {
    List<Xxxx> result = Lists.newArrayList();
    //首次访问
    if (rollId == null) {
        try {
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
            searchSourceBuilder.query(boolQueryBuilder);
            searchSourceBuilder.size(10000);
            searchSourceBuilder.fetchSource(new String[]{"sendUrl"}, new String[]{});
            Search search = new Search.Builder(searchSourceBuilder.toString()).addIndex(
                    Constants.ES_INDEX).addType(Constants.ES_TYPE)
                    .setParameter("preference", "_primary_first")
                    .setParameter(Parameters.SCROLL, "5m")
                    .build();
            JestResult jestResult = jestSmsClient.execute(search);
            if (jestResult.isSucceeded()) {
                rollId =  jestResult.getJsonObject().get("_scroll_id").getAsString();
                result = jestResult.getSourceAsObjectList(Xxxx.class, false);
            }
        } catch (IOException e) {
            log.error("异常信息:", e);
            return null;
        }
    } else {
        try {
            JestResult jestResult = jestSmsClient.execute(new SearchScroll.Builder(rollId, "5m").build());
            if (jestResult.isSucceeded()) {
                rollId =  jestResult.getJsonObject().get("_scroll_id").getAsString();
                result = jestResult.getSourceAsObjectList(Xxxx.class, false);
            }
        } catch (IOException e) {
            log.error("异常信息:", e);
            return null;
        }
    }
    if(CollectionUtils.isEmpty(result)) {
        break;
    }
    //对返回数据做处理result ...省略
}

ok,结束了。莫莫绵溜了溜了。。


水平有限,如果你觉得上述有任何疑问、不足、错误的地方,欢迎在评论区指正。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐