1. 版本

示例运行环境:
go版本: 1.14
es版本: 6.7.0
工具及版本: github.com/olivere/elastic v6.2.35

此文章适合有go基础的人阅读, 因为代码示例中都是部分代码片段而不是完整可执行代码, 需要自行完善

2. 连接es

options := []elastic.ClientOptionFunc{
	elastic.SetURL("http://xxxxxxx:9200"),
	elastic.SetSniff(true),      //是否开启集群嗅探
	elastic.SetHealthcheckInterval(10 * time.Second), //设置两次运行状况检查之间的间隔, 默认60s
	elastic.SetGzip(false),  //启用或禁用gzip压缩
	elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)),  //ERROR日志输出配置
	elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)),  //INFO级别日志输出配置
}
options = append(options, elastic.SetBasicAuth(
	"xxxx",            //账号
	"xxxxxxxxxxxxxx",  //密码
))
con, err := elastic.NewClient(options...)

下述代码中 conf.ES() 表示con

3. 索引操作

3.1 创建索引

type mi = map[string]interface{}
mapping := mi{
	"settings": mi{
		"number_of_shards":   3,
		"number_of_replicas": 2,
	},
	"mappings": mi{
		"_doc": mi{  //type名
			"properties": mi{
				"id": mi{  //整形字段, 允许精确匹配
					"type": "integer",
				},
				"name": mi{
					"type":            "text",  //字符串类型且进行分词, 允许模糊匹配
					"analyzer":        "ik_smart", //设置分词工具
					"search_analyzer": "ik_smart",
					"fields": mi{    //当需要对模糊匹配的字符串也允许进行精确匹配时假如此配置
						"keyword": mi{
							"type":         "keyword",
							"ignore_above": 256,
						},
					},
				},
				"date_field": mi{  //时间类型, 允许精确匹配
					"type": "date",
				},
				"keyword_field": mi{ //字符串类型, 允许精确匹配
					"type": "keyword",
				},
				"nested_field": mi{ //嵌套类型
					"type": "nested",
					"properties": mi{
						"id": mi{
							"type": "integer",
						},
						"start_time": mi{ //长整型, 允许精确匹配
							"type": "long",
						},
						"end_time": mi{
							"type": "long",
						},
					},
				},
			},
		},
	},
}
indexName := "xxxxxxxxxxxxxxxxxxxxx"  //要创建的索引名
_, err = conf.ES().CreateIndex(indexName).BodyJson(mapping).Do(context.Background())

3.2 判断索引是否存在

//exists true 表示索引已存在
exists, err := conf.ES().IndexExists(indexName).Do(context.Background())

3.3 更新索引

仅支持添加字段, 已有字段无法修改

type mi = map[string]interface{}
mapping := mi{ 
	"properties": mi{
		"id": mi{  //整形字段, 允许精确匹配
			"type": "integer",
		},
	},
}
_, err = conf.ES().PutMapping().
	Index(indexName).
	Type("_doc").
	BodyJson(mapping).
	Do(context.Background())

3.4 删除索引

_, err = conf.ES().DeleteIndex(indexName).Do(context.Background())

3.5 数据迁移

将一个索引的数据迁移到另一个索引中, 一般用于索引结构发生改变时使用新索引存储数据

type mi = map[string]interface{}
_, err = conf.ES().Reindex().Body(mi{
	"source": mi{
		"index": oldIndexName,
	},
	"dest": mi{
		"index": newIndexName,
	},
}).Do(context.Background())

3.6 设置别名

设置别名后就可以使用别名查询数据

_, err = conf.ES().Alias().Action(
	elastic.NewAliasAddAction(oldIndexName).Index(newIndexName),
).Do(context.Background())

4. 数据操作

4.1 新增或覆盖数据(单条)

此操作相同id的数据会被覆盖

_, err = conf.ES().Index().
	Index(indexName).
	Type("_doc").
	// id为字符串, 创建一条此id的数据或覆盖已有此id的记录
	// data为结构体或map, 当然结构需要跟索引的mapping类型保持一致
	Id(id).BodyJson(data).
	Do(context.Background())

4.2 根据id新增或更新数据(单条)

仅更新传入的字段, 而不是像 4.1 进行整条记录覆盖

_, err = conf.ES().Update().
	Index(t.index()).
	Type("_doc").
	Id(id).
	// data为结构体或map, 需注意的是如果使用结构体零值也会去更新原记录
	Upsert(data).
	// true 无则插入, 有则更新, 设置为false时记录不存在将报错
	DocAsUpsert(true). 
	Do(context.Background())

4.3 根据id新增或更新数据(批量)

bulkRequest := conf.ES().Bulk()
// data map[int]interface{}, key为id, value为要更新的数据
for id, v := range data {
	doc := elastic.NewBulkUpdateRequest().
		Index(t.index()).
		Type("_doc").
		Id(strconv.Itoa(id)).
		Doc(v).
		// true 无则插入, 有则更新, 设置为false时记录不存在将报错
		DocAsUpsert(true)
	bulkRequest.Add(doc)
}
bulkResponse, err := bulkRequest.Do(context.Background())
if err != nil {
	return
}
// 获取操作失败的记录
bad := bulkResponse.Failed()
if len(bad) > 0 {
	s, _ := jsoniter.MarshalToString(bad)
	err = errors.New("部分记录更新失败 " + s)
}

4.4 根据条件更新数据

_, err = conf.ES().UpdateByQuery().
	Index(indexName).
	Type("_doc").
	//查询条件, 详细配置查询条件请查看章节 5
	Query(query).
	//要执行的更新操作, 详细配置请查看章节 6及7.1
	Script(script).
	Do(context.Background())

4.5 查询

_, err = conf.ES().Search().
	Index(indexName).
	//偏移量
	From(0).
	//返回数据的条数
	Size(10).
	//指定返回数据的字段(此处指定返回id和name), 全部返回则无需设置
	FetchSourceContext(elastic.NewFetchSourceContext(true).Include("id", "name")).
	//查询条件, 详细配置查询条件请查看章节 5
	Query(query).
	//按照id升序排序, 无需排序则可跳过此设置, 多个Sort会按先后顺序依次生效
	Sort("id", true).
	//自定义排序规则, 详细写法请查看章节 6及7.2
	SortBy(sorter).
	Do(context.Background())

5. 查询条件query设置

5.1 一个示例

{
    "bool": {
        "filter": [
            {
                "nested": {
                    "path": "nested_field",
                    "query": {
                        "range": {
                            "nested_field.start_time": {
                                "from": 1581475200,
                                "include_lower": true,
                                "include_upper": true,
                                "to": null
                            }
                        }
                    }
                }
            },
            {
                "nested": {
                    "path": "nested_field",
                    "query": {
                        "range": {
                            "nested_field.end_time": {
                                "from": null,
                                "include_lower": true,
                                "include_upper": true,
                                "to": 1581481440
                            }
                        }
                    }
                }
            }
        ],
        "must": {
            "terms": {
                "id": [
                    4181,
                    4175
                ]
            }
        }
    }
}

实现上述查询条件的go代码如下

query := elastic.NewBoolQuery()
query.Must(elastic.NewTermsQuery("id", []int{4181, 4175}))
query.Filter(elastic.NewNestedQuery(
	"nested_field",
	// nested_field.start_time >= 1581475200
	elastic.NewRangeQuery("nested_field.start_time").Gte(1581475200),
))
query.Filter(elastic.NewNestedQuery(
	"nested_field",
	// nested_field.start_time <= 1581481440
	elastic.NewRangeQuery("nested_field.end_time").Lte(1581481440),
))

5.2 match 模糊匹配

// name字段模糊匹配
elastic.NewMatchQuery("name", val)

5.3 terms 精确匹配

// name字段精确匹配
elastic.NewTermsQuery("name.keyword", val...)

5.4 range 范围匹配

// id >= 10, id <= 100
elastic.NewRangeQuery("id").Gte(10).Lte(100)

5.5 nested 嵌套结构查询

elastic.NewNestedQuery(
	"nested_field",
	query,  //此处query中的字段 都需要加上nested_field前缀, 比如 nested_field.id
)

5.6 更多

es query中的每一种查询结构都会有一个对应的go结构体, 此处不再过多赘述

6. script painless语法

官方文档

6.1 变量

int index = 1;

6.2 if else

if (index == -1) {
	// 逻辑
} else {
	// 
}

6.3 for循环

for (om_cms_id in params.om_cms_ids) {
	continue
}

6.4 类型转换

// 数字转字符串
String.valueOf(11)

7. script 实际应用

7.1 更新

数据库中goods_id与omcms的关联关系存储如下

om_cms_idgoods_id
11, 2
21, 2
31
44

es中goods_id与omcms关联关系存储如下

goods_idom_cms_id
1[1, 2, 3]
2[1, 2, 4]

当数据库中关联关系发生变换时要对应更新es中数据

  1. 获取omcms发生变化的记录
  2. 删除这些omcms在es中与goods的关联关系
  3. 添加omcms与goods的新关联关系
var goodsOmCmsIds = make(map[int][]int)
var goodsIds []interface{}
var omCmsIds []interface{}
for _, v := range list {
	omCmsIds = append(omCmsIds, v.Id)
	reg := regexp.MustCompile("^\\d+(,\\d+)*$")
	v.Ids = strings.ReplaceAll(v.Ids, "NaN", "0")
	if !reg.MatchString(v.Ids) {
		fmt.Println("不支持的格式:", v.Ids)
		continue
	}
	err = jsoniter.UnmarshalFromString("["+v.Ids+"]", &v.GoodsIds)
	if err != nil {
		return
	}
	for _, goodsId := range v.GoodsIds {
		if goodsId == 0 {
			continue
		}
		if _, ok := goodsOmCmsIds[goodsId]; !ok {
			goodsIds = append(goodsIds, goodsId)
		}
		goodsOmCmsIds[goodsId] = append(goodsOmCmsIds[goodsId], v.Id)
	}
}
// 查询出omcsm变更导致可能发生变更的所有商品
query := NewBoolQuery().Should(
	NewBoolQuery().Must(elastic.NewTermsQuery("id", goodsIds...)),
	NewBoolQuery().Must(elastic.NewTermsQuery("om_cms_id", omCmsIds...)),
)
script := elastic.NewScript(`
	for (om_cms_id in params.om_cms_ids) {
		if (ctx._source.om_cms_id == null) {
			continue
		}
		int index = ctx._source.om_cms_id.indexOf(om_cms_id);
		if (index == -1) {
			continue
		}
		ctx._source.om_cms_id.remove(index);
	}
	if (params.goods_om_cms_ids == null || params.goods_om_cms_ids[ctx._id] == null) {
		return
	}
	for (om_cms_id in params.goods_om_cms_ids[ctx._id]) {
		if (ctx._source.om_cms_id == null) {
			ctx._source.om_cms_id = [om_cms_id]
		} else {
			ctx._source.om_cms_id.add(om_cms_id)
		}
	}
`).Params(map[string]interface{}{
	"goods_om_cms_ids": goodsOmCmsIds,
	"goods_ids":        goodsIds,
	"om_cms_ids":       omCmsIds,
})
// 使用4.4更行方法

7.2 自定义排序

根据传入的id顺序排序

m := make(map[string]interface{})
// 循环要查询的id, 并转成map, key是id值, value是在原数组种的索引
for index, goodsId := range ids {
	m[strconv.Itoa(goodsId)] = index
}
sort := elastic.NewScriptSort(
	// params 存储自定义数据
	// doc 表示当前记录
	elastic.NewScript(`
		params.id[String.valueOf(doc['id'].value)]
	`).Param("id", m),
	"number",
).Order(vv[0] == 1)
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐