在这里插入图片描述


本文借助第三方库 olivere/elastic 完成 Go 对 ES 的更新操作。

Go 对 Elasticsearch 的增删改查参见完整博文 Go Elasticsearch 增删改查(CRUD)快速入门

1.根据 ID 修改

可以根据文档 ID 更新对应的文档。

比如将文档 ID 为 1 的用户名改为 “jack”。对应的 RESTful API 为:

POST es_index_userinfo/_update/1?refresh=true
{
   "doc": {
    "username": "jack"
  }
}

注意:修改不存在的文档将报 “document_missing_exception” 404 错误。

翻译成 Go 为:

// Update 修改文档
// param: index 索引; id 文档ID; m 待更新的字段键值结构
func Update(ctx context.Context, index, id string, doc interface{}) error {
	_, err := GetESClient().
		Update().
		Index(index).
		Id(id).
		Doc(doc).
		Refresh("true").
		Do(ctx)
	return err
}

// 示例
err := Update(context.Background(), index, "1", map[string]interface{}{"username": "jack"})

注意:修改不存在的文档将报elastic: Error 404 (Not Found)错误。

2.根据 ID 修改(不存在则插入)

如果文档不存在,作为新文档插入,则可以使用 upsert。

比如修改文档 ID 为 9 的部分信息,如果文档不存在则插入。对应的 RESTful API 为:

POST es_index_userinfo/_update/9?refresh=true
{
	"doc":{
		"id":       9,
		"username": "jerry",
		"age":      11
	},
	"doc_as_upsert": true
}

翻译成 Go 为:

// Upsert 修改文档(不存在则插入)
// params index 索引; id 文档ID; m 待更新的字段键值结构
func Upsert(ctx context.Context, index, id string, doc interface{}) error {
	_, err := GetESClient().
		Update().
		Index(index).
		Id(id).
		Doc(doc).
		Upsert(doc).
		Refresh("true").
		Do(ctx)
	return err
}

// 示例
m := map[string]interface{}{
	"id":       9,
	"username": "jerry",
	"age":      11,
}
err := Upsert(context.Background(), "es_index_userinfo", "9", m)

3.根据条件更新

我们也可以根据条件来更新符合条件的文档,即 Update by Query。

比如我将更新用户名为 alice,年龄小于等于 18 岁的用户昵称和祖籍。

对应的 RESTful api 为:

POST /es_index_userinfo/_update_by_query?refresh=true
{
  "query":{
     "bool":{
       "filter":[
         {"term":{"username":"alice"}},
         {"range" : {"age" : {"lte" : 18}}}
        ]
     }
  },
  "script": {
    "source": "ctx._source['nickname'] = 'cat';ctx._source['ancestral'] ='安徽'"
  }
}

注意:Refresh 只能指定 true 或 false(缺省值),不能指定 wait_for。

翻译成 Go 为:

// UpdateByQuery 根据条件修改文档
// param: index 索引; query 条件; script 脚本指定待更新的字段与值
func UpdateByQuery(ctx context.Context, index string, query elastic.Query, script *elastic.Script) (int64, error) {
	rsp, err := GetESClient().
		UpdateByQuery(index).
		Query(query).
		Script(script).
		Refresh("true").
		Do(ctx)
	if err != nil {
		return 0, err
	}
	return rsp.Updated, nil
}

// 示例
query := elastic.NewBoolQuery()
query.Filter(elastic.NewTermQuery("username", "alice"))
query.Filter(elastic.NewRangeQuery("age").Lte(18))
script := elastic.NewScriptInline("ctx._source.nickname=params.nickname;ctx._source.ancestral=params.ancestral").Params(
	map[string]interface{}{
		"nickname":  "cat",
		"ancestral": "安徽",
	})
ret, err := UpdateByQuery2ES(context.Background(), index, query, script)

4.批量更新

借助 Bulk API 可以完成对文档的批量更新。

比如对多个指定文档 ID 的用户名进行更新操作。

对应的 RESTful api 为:

// UpdateBulk 全部成功
POST /es_index_userinfo/_bulk
{"update":{"_id":"9"}}
{"doc":{"username":"tom"}}
{"update":{"_id":"10"}}
{"doc":{"username": "alice"}}

// UpdateBulk 部分成功
POST /es_index_userinfo/_bulk
{"update":{"_id":"10"}}
{"doc":{"username":"angela"}}
{"update":{"_id":"11"}}
{"doc":{"username": "bill"}}

// Upsert 理论上不会部分成功
POST /es_index_userinfo/_bulk
{"update":{"_id":"10"}}
{"doc":{"id":10, "username":"tony"}, "doc_as_upsert" : true}
{"update":{"_id":"11"}}
{"doc":{"id":11, "username": "pony"}, "doc_as_upsert" : true}

翻译成 Go 时,可借助 BulkService + BulkUpdateRequest 可实现对文档的批量修改。

// UpdateBulk 批量修改文档
func UpdateBulk(ctx context.Context, index string, ids []string, docs []interface{}) error {
	bulkService := GetESClient().Bulk().Index(index).Refresh("true")
	for i := range ids {
		doc := elastic.NewBulkUpdateRequest().
			Id(ids[i]).
			Doc(docs[i])
		bulkService.Add(doc)
	}
	res, err := bulkService.Do(ctx)
	if err != nil {
		return err
	}
	if len(res.Failed()) > 0 {
		return errors.New(res.Failed()[0].Error.Reason)
	}
	return nil
}

// UpsertBulk 批量修改文档(不存在则插入)
func UpsertBulk(ctx context.Context, index string, ids []string, docs []interface{}) error {
	bulkService := GetESClient().Bulk().Index(index).Refresh("true")
	for i := range ids {
		doc := elastic.NewBulkUpdateRequest().
			Id(ids[i]).
			Doc(docs[i]).
			Upsert(docs[i])
		bulkService.Add(doc)
	}
	res, err := bulkService.Do(ctx)
	if err != nil {
		return err
	}
	if len(res.Failed()) > 0 {
		return errors.New(res.Failed()[0].Error.Reason)
	}
	return nil
}

下面是调用示例:

func main() {
	ctx := context.Background()
	// UpdateBulk 全部成功
	// id 9 和 10 均存在
	ids := []string{"9", "10"}
	docs := []interface{}{
		map[string]interface{}{
			"username": "tom",
		},
		map[string]interface{}{
			"username": "alice",
		},
	}
	err := UpdateBulk(ctx, "es_index_userinfo", ids, docs)
	fmt.Printf("UpdateBulk all success err is %v\n", err)

	// UpdateBulk 部分成功
	// id 10 存在,id 11 文档不存在
	ids = []string{"10", "11"}
	docs = []interface{}{
		map[string]interface{}{
			"username": "angela",
		},
		map[string]interface{}{
			"username": "bill",
		},
	}
	err = UpdateBulk(ctx, "es_index_userinfo", ids, docs)
	fmt.Printf("UpdateBulk partial success err is %v\n", err)
	
	// UpsertBulk 理论上不会部分成功
	ids = []string{"10", "11"}
	docs = []interface{}{
		map[string]interface{}{
			"id":       10,
			"username": "tony",
		},
		map[string]interface{}{
			"id":       11,
			"username": "pony",
		},
	}
	err = UpsertBulk(ctx, "es_index_userinfo", ids, docs)
	fmt.Printf("UpsertBulk all success err is %v\n", err)
}

分别输出:

UpdateBulk all success err is <nil>
UpdateBulk partial success err is [_doc][11]: document missing
UpsertBulk all success err is <nil>

5.脚本更新

有时简单的字段赋值无法满足复杂的业务场景。复杂一点的更新逻辑,可能需要借助于脚本来完成。

如何书写更新脚本,官网给了入门说明和示例,建议大家先看一下 How to write scripts

5.1 数组删除元素

ES 中每一个字段都是数组,我们无需对字段做额外的设置,便可以将其当作数组来使用。

当然,实际使用时,其类型必须是一个数组,而不是一个值。什么意思呢?就是我们可以给字段添加多个值,那么此时它便成了数组。只有字段变成一个数组时,我们才可以把它当作数组来用,比如取元素下标。如果不是数组,那么会失败。比如下面的删除数组中的某个元素,如果删除时,其不是数组,只是一个字段值,那么脚本会执行失败。

现在我想删除指定用户的 phone 字段中的某个电话号码,我们可以借助脚本来完成。注意,我们要先查到这个 phone 字段中包含这个号码,不然脚本在执行时会失败。

POST /es_index_userinfo/_update_by_query
{
  "query":{
    "bool":{
      "filter": [
        {"term":{"username":"angela"}},
        {"term":{"phone":18819064334}}
      ]
    }
  },
  "script": {
    "source": "ctx._source.phone.remove(ctx._source.phone.indexOf(params.phone))",
    "params": {
      "phone": 18819064334
    }
  }
}

对应 Go 条件与脚本:

// 查询条件。
boolQuery := elastic.NewBoolQuery().
	Filter(elastic.NewTermQuery("username", "angela")).
	Filter(elastic.NewTermQuery("phone", 18819064334))

// 删除脚本。
scriptStr := "ctx._source.phone.remove(ctx._source.phone.indexOf(params.phone))"
script := elastic.NewScript(scriptStr).Params(
	map[string]interface{}{
		"phone": 18819064334
	})

5.2 数组删除多个元素

同样地,我们可以使用脚本删除数组中的多个元素,并更新时间戳。

POST /es_index_userinfo/_update_by_query
{
  "query":{
    "bool":{
      "filter": [
        {"term":{"username":"angela"}}
      ]
    }
  },
  "script": {
    "source": "for (int i = 0; i < params.phones.length; i++) {if (ctx._source.phone.contains(params.phones[i])) {ctx._source.phone.remove(ctx._source.phone.indexOf(params.phones[i]))}} ctx._source.upd_uts=params.now",
    "params": {
      "phones": [18819064334,18819064335],
      "now":1649839099
    }
  }
}

因为我们在脚本中进行了元素是否存在于数组中的判断,所以不用在查询条件中指定要包含的号码。

对应 Go 条件与脚本:

boolQuery := elastic.NewBoolQuery().
	Filter(elastic.NewTermQuery("username", "angela"))
	scriptStr := `for (int i = 0; i < params.phones.length; i++) {
		if (ctx._source.phone.contains(params.phones[i])) { 	
			ctx._source.phone.remove(ctx._source.phone.indexOf(params.phones[i]))
		}
	}
	ctx._source.upd_uts=params.now`
	script := elastic.NewScript(scriptStr).Params(
		map[string]interface{}{
			"phones": [18819064334,18819454334]
		})

5.3 数组追加元素

我们可以借助脚本向数组字段追加元素。

比如我需要操作的某个字段 tags:[“tag1”,”tag2”,”tag3’],现在发现了一个新标签 ”tag4”,需要加入到 tags 中。

POST <index>/_update/<id>
{
   "script" : {
       "source": "ctx._source.tags.add(params.new_tag)",
       "params" : {
          "new_tag" : "tag4"
       }
   }
}

对应 Go 条件与脚本:

// 条件。
query := elastic.NewTermQuery("_id", id)

// 脚本。
script := elastic.NewScript("ctx._source.tags.add(params.new_tag)").Params(
	map[string]interface{}{
		"new_tag": "tag4",
	})

5.4 数组追加多个元素

我们可以借助脚本向数组字段追加元素。

比如我需要操作的某个字段 tags:[“tag1”,”tag2”,”tag3’],现在发现了多个新标签,比如 ”tag4” 和 “tag5”,需要加入到 tags 中。

POST <index>/_update/<id>
{
   "script" : {
       "source": "ctx._source.tags.addAll(params.new_tags)",
       "params" : {
          "new_tags" : ["tag4","tag5"]
       }
   }
}

对应 Go 条件与脚本:

// 条件。
query := elastic.NewTermQuery("_id", id)

// 脚本。
script := elastic.NewScript("ctx._source.tags.addAll(params.new_tags)").Params(
	map[string]interface{}{
		"new_tags": []string{"tag4", "tag5"},
	})

5.5 数组修改元素

我们可以借助脚本修改数组中的元素。

比如我需要修改字段 tags:[“tag1”,”tag2”,”tag3’],将 tag3 变成 label3。

POST <index>/_update_by_query
{
  "query":{
    "term":{"tags":"tag3"}
  },
   "script" : {
       "source": "ctx._source.tags[ctx._source.tags.indexOf(params.old_tag)]=params.new_tag",
       "params" : {
          "old_tag" : "tag3",
          "new_tag" : "label3"
       }
   }
}

对应 Go 条件与脚本:

// 条件。
query := elastic.NewTermQuery("tags", "tag3")

// 脚本。
script := elastic.NewScript("ctx._source.tags[ctx._source.tags.indexOf(params.old_tag)]=params.new_tag").Params(
	map[string]interface{}{
		"old_tag" : "tag3",
		"new_tag": "label3",
	})

5.6 数组修改多个元素

我们可以借助脚本修改数组中的元素。

比如我需要修改数组 tags:[“tag1”,”tag2”,”tag3’],将 tag1 和 tag3 变成 label。此时,我们需要遍历原数组,将符合条件的元素逐个替换。

POST <index>/_update_by_query
{
  "query":{
    "terms":{"tags":["tag1","tag3"]}
  },
   "script" : {
       "source": "for (int i=0; i<ctx._source.tags.length; i++) {if (params.old_tags.contains(ctx._source.tags[i])) {ctx._source.tags[i] = params.new_tag}}",
       "params" : {
          "old_tags" : ["tag1","tag3"],
          "new_tag" : "label"
       }
   }
}

对应 Go 条件与脚本:

// 条件。
query := elastic.NewTermsQuery("tags", "tag1", "tag3"))

// 脚本。
script := elastic.NewScript(`
	for (int i=0; i<ctx._source.tags.length; i++) {
		if (params.old_tags.contains(ctx._source.tags[i])) {
			ctx._source.tags[i] = params.new_tag
		}
	}
`).Params(
	map[string]interface{}{
		"old_tags": ["tag1", "tag3"],
		"new_tag": "label",
	})

参考文献

elastic - pkg.dev
elastic - type UpdateService
elastic - type UpdateByQueryService
elastic - type BulkService
elastic - type BulkUpdateRequest
Elasticsearch Guide [7.15] » REST APIs » Document APIs » Update API
Elasticsearch Guide [7.13] » REST APIs » Document APIs » Update By Query API
Elasticsearch Guide [7.15] » REST APIs » Document APIs » Bulk API
Elasticsearch Guide [8.1] » Scripting » How to write scripts

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐