相关环境:

  1. elasticsearch:7.7.1,三个节点的集群
  2. java: 1.8

项目场景:

商城搜索,两个操作。
操作一:用户首次查询,将数据从es查询出来,再缓存到redis,之后的查询,直接读redis。
操作二:后台管理数据,删除es部分数据,重新添加这部分数据,再删除缓存。

问题描述:

上面的流程看起来是没有问题,但是删除es部分数据的时候还是出现了问题。

假设一下流程:

  1. 删除es部分数据
  2. 重新添加这部分数据
  3. 删除缓存
  4. 用户首次查询,将数据从es查询出来
  5. 再缓存到redis

流程很正常,不会有问题,但是如果es还来不及删除呢?就被查询出来并缓存到redis里面了,这个时候就会有问题了。

这里使用的是es的同步批量删除,核心批量删除代码如下

// 根据id批量删除
BulkRequest bulkRequest = new BulkRequest();
for (String id : ids) {
	bulkRequest.add(new DeleteRequest(index, id));
}
BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
System.out.println("耗时: " + bulkResponse.getTook());
if (bulkResponse.hasFailures()) {
	System.out.println("批量删除失败: " + bulkResponse.buildFailureMessage());
}

按照官网的解释,这个删除bulk为同步的批量删除,异步为bulkAsync,这里说的是批量删除,其实在这里可以做增删查改符合的批量操作。


原因分析:

猜测:由于是在es集群上面进行操作的,集群上面设置了索引的分片=5,副本=3,在执行同步删除操作后,es返回了成功,但是并不是所有的副本都删除了,此时,有个查询从其他副本上面查询出了"已被删除"的数据。

代码抽离出来,demo如下:

@Test
public void originalTest() throws IOException {
	// 1. 查询出floorId等于29的所有数据
	SearchRequest searchRequest = new SearchRequest(index);
	searchRequest.source(SearchSourceBuilder.searchSource().size(1000).query(QueryBuilders.termQuery("floorId", 29)));
	SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
	System.out.println("首次查询: " + searchResponse);
	// 2. 记录这批数据的id
	SearchHit[] hits = searchResponse.getHits().getHits();
	List<String> ids = new ArrayList<>(hits.length);
	for (SearchHit hit : hits) {
		Map<String, Object> map = hit.getSourceAsMap();
		ids.add(map.get("id").toString());
	}
	System.out.println("需要删除的id: " + ids);
	// 3根据id批量删除
	BulkRequest bulkRequest = new BulkRequest();
	bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
	for (String id : ids) {
		bulkRequest.add(new DeleteRequest(index, id));
	}
	BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
	System.out.println("耗时: " + bulkResponse.getTook());
	if (bulkResponse.hasFailures()) {
		System.out.println("批量删除失败: " + bulkResponse.buildFailureMessage());
	}
	// 4. 再查询出floorId等于29的数据
	searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
	System.out.println("删除后再次查询: " + searchResponse);
}
将es集群改成单节点,分片=1,副本=1,执行上面的单元测试,结果还是能查询出"被删除"的数据。

结论:查出删除的数据不是由于备份和分片机制导致的。


再看官网:https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-document-bulk.html

大概有这么一段代码

request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 

Leave this request open until a refresh has made the contents of this request visible to search. This refresh policy is compatible with high indexing and search throughput but it causes the request to wait to reply until a refresh occurs.

看不懂,我理解的意思是,删除之后,等待分片与备份刷新完成后再返回。

解决方案:

批量删除操作,加上这个刷新策略 完整代码如下:区别是加了第18行
@Test
public void originalTest() throws IOException {
	// 1. 查询出floorId等于29的所有数据
	SearchRequest searchRequest = new SearchRequest(index);
	searchRequest.source(SearchSourceBuilder.searchSource().size(1000).query(QueryBuilders.termQuery("floorId", 29)));
	SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
	System.out.println("首次查询: " + searchResponse);
	// 2. 记录这批数据的id
	SearchHit[] hits = searchResponse.getHits().getHits();
	List<String> ids = new ArrayList<>(hits.length);
	for (SearchHit hit : hits) {
		Map<String, Object> map = hit.getSourceAsMap();
		ids.add(map.get("id").toString());
	}
	System.out.println("需要删除的id: " + ids);
	// 3根据id批量删除
	BulkRequest bulkRequest = new BulkRequest();
	bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
	for (String id : ids) {
		bulkRequest.add(new DeleteRequest(index, id));
	}
	BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
	System.out.println("耗时: " + bulkResponse.getTook());
	if (bulkResponse.hasFailures()) {
		System.out.println("批量删除失败: " + bulkResponse.buildFailureMessage());
	}
	// 4. 再查询出floorId等于29的数据
	searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
	System.out.println("删除后再次查询: " + searchResponse);
}

再次验证,这次查询不到"已经删除"的数据了。

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐