1. 问题复现

今天在跟前端进行联调的过程中发现了一个问题,向elasticsearch中新增一条文档数据后,立即请求查询文档列表,发现刚刚新增的文档查不出来,需要等一会后再次请求查询才能查询出来,随后测试了删除文档列表和更新文档列表中的文档这两个接口,出现了同样的问题,即elasticsearch数据更新有延迟。

2. 原因分析

在写操作中,一般会先在内存中缓冲一段数据,再将这些数据写入硬盘,每次写入硬盘的这批数据称为一个分段,如同任何写操作一样。一般情况下,通过操作系统write接口写到磁盘的数据先到达系统缓存(内存),write函数返回成功时,数据未必被刷到磁盘。通过手工调用flush,或者操作系统通过一定策略将系统缓存刷到磁盘。这种策略大幅提升了写入效率。从write函数返回成功开始,无论数据有没有被刷到磁盘,该数据已经对读取可见。ES正是利用这种特性实现了近实时搜索。每秒产生一个新分段,新段先写入文件系统缓存,但稍后再执行flush刷盘操作,写操作很快会执行完,一旦写成功,就可以像其他文件一样被打开和读取了。由于系统先缓冲一段数据才写,且新段不会立即刷入磁盘,这两个过程中如果出现某些意外情况(如主机断电),则会存在丢失数据的风险。通用的做法是记录事务日志,每次对ES进行操作时均记录事务日志,当ES启动的时候,重放translog中所有在最后一次提交后发生的变更操作。比如HBase等都有自己的事务日志。

在ES中,每秒清空一次写缓冲,将这些数据写入文件,这个过程称为refresh,每次refresh会创建一个新的Lucene 段。但是分段数量太多会带来较大的麻烦,每个段都会消耗文件句柄、内存。每个搜索请求都需要轮流检查每个段,查询完再对结果进行合并;所以段越多,搜索也就越慢。因此需要通过一定的策略将这些较小的段合并为大的段,常用的方案是选择大小相似的分段进行合并。在合并过程中,标记为删除的数据不会写入新分段,当合并过程结束,旧的分段数据被删除,标记删除的数据才从磁盘删除。

默认情况下索引的refresh_interval为1秒,这意味着数据写1秒后就可以被搜索到,每次索引的refresh会产生一个新的Lucene段,这会导致频繁的segment merge行为,如果不需要这么高的搜索实时性,应该降低索引refresh周期

indexing buffer在为doc建立索引时使用,当缓冲满时会刷入磁盘,生成一个新的segment,这是除refresh_interval刷新索引外,另一个生成新segment的机会。每个shard有自己的indexingbuffer。

3. 解决思路

问题是elasticsearch中更新数据后有延迟

① 解决方案1 :

在新增一条文档数据,返回响应数据之前,让线程睡眠1秒钟,然后再返回响应数据,发现页面在新增文档跳转到文档列表页面后可以看到新增的文档数据了。

public DocAddRespVo add(@RequestBody @Valid DocAddReqVo docAddReqVo) {
    // 保存文档到mongodb中
    Doc doc = docService.addDoc(new Doc(docAddReqVo), docAddReqVo.getAttachmentIds(), docAddReqVo.getImageIds());
    // 保存文档到elasticsearch中
    elasticSearchService.saveDoc(doc);
    // 让线程睡眠1秒钟
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return new DocAddRespVo(doc.getId());
}

② 解决方法2:indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

Elasticsearch 刚索引的文档并不是立即对搜索可见,它们会先在内存 buffer(缓存区)中,待 buffer 数据满后或主动刷新操作才会写入到文件缓存区中,便可以搜索。

手动让es中的数据每次更新后都从indexing buffer中刷新到磁盘中,,从而让更新的数据立即可见,但是从前面的分析可见,会影响性能。

@Override
public void saveDoc(Doc doc) {
    doc.setCreateTime(getEsDate(doc.getCreateTime()));
    doc.setUpdateTime(getEsDate(doc.getUpdateTime()));
    IndexRequest indexRequest = new IndexRequest(ELASTICSEARCH_KNOWLEDGE_INDEX);
    indexRequest.id(doc.getId());
    String jsonString = JSON.toJSONString(doc);
    indexRequest.source(jsonString, XContentType.JSON);
    // 手动将缓存区中的数据刷新到磁盘中,但是会影响性能
    indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
    try {
        restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
    } catch (IOException e) {
        log.error("failed to add document to elasticsearch,the doc is:{},the exception is {}", doc, e);
        throw new CommonException(BizCodeEnum.ELASTICSEARCH_DOC_SAVE_ERROR);
    }
}
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐