elasticsearch 的 updateByQuery 使用script脚本完成部分字段的更新

elasticsearch 文档批量更新

最近项目中用到了对es文档的批量更新操作,根据id单个单个进行文档更新时 比较影响性能,故而使用es的script脚本对query查询出来的文档进行更新操作。

{
	“script”: {
		“source”: “ctx._source[‘要修改的字段名’]=‘要修改为的参数’”
	},
	“query”: {
		“term”: {
			“条件字段名”: “条件字段参数”
		}
	}
}

前置知识

1、会使用es
2、需要了解script
3、了解painless语法

案例1

批量将文档中一个字段(url)赋值给另外一个字段(newUrl)

在kibana上使用如下命令,能将query查询到的文档中 url 字段上的值 赋值给url字段。

POST  索引名/_update_by_query
{
  "query": {
    "bool": {
      "must": [
        {
          "exists": {
            "field": "url" 
          }
        },
      ],
      "must_not": [
        {
          "exists": {
            "field": "newUrl"
          }
        }
      ]
    }
  },
  "script":{
    "inline" : "ctx._source.newUrl= ctx._source.url;",
    "lang" : "painless"
  }
}

案例2

批量将某个字段更新到文档中某些字段中

在kibana上使用如下命令,能将query查询到所有文档中 newUrl 字段上的值设置为 www.baidu.com(即params 中的 url 的值)。

POST  索引名/_update_by_query
{
 "query": {
    "match_all": {}
  },
  "script":{
    "source" : "ctx._source.newUrl= params.url",
   "params": {
      "url": "www.baidu.com"
    }
  }
}

案例3

批量将数据插入到文档中一个多值字段中,不能覆盖已有的数据

在kibana上使用如下命令,能将query查询到所有存在 linked_persons 字段的文档中,如果存在 linked_persons 字段 并且 linked_persons 字段中不包含需要添加的 数据,则向linked_persons字段中插入该字段。

GET sourcedoc_prod/_update_by_query
{
  "query": {
    "exists": {
      "field": "linked_persons"
    }
  },
  "script": {
    "source": " if (ctx._source.containsKey('linked_persons')) {
    						if(!ctx._source.linked_persons.contains(params.linked_persons)){
    							ctx._source.linked_persons.add(params.linked_persons);
    						}
    				}else {
    					ctx._source.linked_persons= [params.linked_persons];
    				}",
    "params": {
      "linked_persons": "111"
    }
  }
}

java Api 调用样例

 	@Test
    public void testUpdateByQuery() throws IOException {
		Set<String> docIds = new HashSet<>();
		docIds.add("xxxxxxxxxxx");
		
        HashMap<String, Object> params = new HashMap<>(2);
        params.put("linked_persons","张三");

        Script script = new Script(ScriptType.INLINE, "painless",
                "if(ctx._source.containsKey('linked_persons')){ if (!ctx._source.linked_persons.contains(params.linked_persons)){ ctx._source.linked_persons.add(params.linked_persons); }} else { ctx._source.linked_persons = [params.linked_persons];}",
                params);
    
        // 构造查询条件,查询需要进行修改的文档
        BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
        queryBuilder.must(QueryBuilders.termsQuery("id",docIds));
       
        updateByQuery(queryBuilder , script , Index_a, Index_b);
	}
	
   /**
     *  通过脚本 批量修改  文档es
     * @param queryBuilder 查询条件
     * @param script  脚本
     * @param indices  需要修改的 索引
     * @throws IOException
     */
    public void updateByQuery(QueryBuilder queryBuilder, Script script, String... indices) throws IOException {
        UpdateByQueryRequest request = new UpdateByQueryRequest(indices);
        request.setConflicts("proceed");
        request.setQuery(queryBuilder);
        request.setScript(script);

        restHighLevelClient.updateByQuery(request, RequestOptions.DEFAULT);
    }
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐