9、Elasticsearch使用reindex做数据迁移
reindex ,是将数据从一个 index 移动到另一个 index 的过程。我们知道,当一个索引能够写入文档时,就意味着该索引的 shard 分片,mapping 等结构不能再被修改了,而有时候却需要调整索引的结构。
一、为什么要使用reindex?
所谓 reindex ,是将数据从一个 index 移动到另一个 index 的过程。我们知道,当一个索引能够写入文档时,就意味着该索引的 shard 分片,mapping 等结构不能再被修改了,而有时候却需要调整索引的结构。
比如,当前索引的 shard 分片数可能满足不了实际的查询需求,需要扩大一些。
又比如,当前索引文档的某个字段的数据类型,需要修改成另一种数据类型。
面对这些需求,Elasticsearch 提供的 reindex 接口可以帮助我们解决。
二、如何正确使用reindex?
为了更好的描述,将当前的索引称为源索引,另一个索引称为目标索引。使用 reindex 之前,需要满足以下几个条件:
- 源索引和目标索引不能相同,且源索引的_source字段必须是开启的;
- 调用reindex之前,保证目标索引已经创建好,并按需求指定了mapping;
接着,说明不同场景下该如何正确的使用 reindex
1、基本使用方式
POST _reindex
{
"source": {
"index": "product_index"
},
"dest": {
"index": "new_product_index"
}
}
适用场景:源索引的文档数量极少,reindex 的时候不用去考虑效率、不用做其它特别处理的情况。
2、使用分片进行reindex
POST _reindex?slices=auto&refresh
{
"source": {
"index": "product_index"
},
"dest": {
"index": "new_product_index"
}
}
适用场景:源索引的文档数量较多,为了提高 reindex 效率,采用设置 slices 参数进行并行加速处理,当值设置为auto时,ES 会合理的选择切片数量进行处理,建议使用auto。
3、指定部分字段进行reindex
POST _reindex?refresh
{
"source": {
"index": "product_index",
"_source":["productId","productName","updateTime","amuontTotal"]
},
"dest": {
"index": "new_product_index"
}
}
适用场景:如果只需要把源索引的部分字段进行 reindex 到目标索引,在请求体的 source 中设置 _source 参数指定这些字段即可。
4、指定部分文档进行reindex
POST _reindex?refresh
{
"max_docs": 50000,
"source": {
"index": "product_index",
"query":{
"bool":{
"must":[
{"wildcard":{"productName":"康师傅*"}}
]
}
}
},
"dest": {
"index": "new_product_index"
}
}
适用场景:使用 query DSL 语句查询到文档集,进行 reindex 的时候设置 max_docs 最大文档数量不超过5W个。当然,请求体不设置 max_docs 参数也是可以的,将查询到的所有文档集进行 reindex 。
5、指定速率进行 reindex
POST _reindex?requests_per_second=600
{
"source": {
"index": "product_index",
"size": 600
},
"dest": {
"index": "new_product_index"
}
}
适用场景:如果 reindex 操作过快,可能会给 ES 集群造成写入压力,严重的话会导致集群的崩溃。为此,通过请求参数可以设置 requests_per_second 参数限制处理的速率,而 size 用于批量读写操作的文档数,此参数是可选的,缓冲区最大为 200 MB,默认 100 M,而。
6、使用 script 进行 reindex
POST _reindex?refresh
{
"source": {
"index": "product_index"
},
"dest": {
"index": "new_product_index"
},
"script": {
"source": "ctx._source.lastupdatetime = ctx._source.remove(\"updataTime\")"
}
}
适用场景:ES script 是一个强大的存在,可以轻松帮我们实现很多对文档修改的需求,比如,把文档中的 updataTime 字段名称改为 lastupdatetime ;又比如,在文档中新增一个字段并赋默认值等。
7、多个源索引进行 reindex 到一个目标索引
POST _reindex?refresh
{
"source": {
"index": ["product_index","product_index_1","product_index_2"]
},
"dest": {
"index": "new_product_index"
}
}
适用场景:多个源索引向同一个目标索引进行 reindex,但需要注意多个源索引的文档id有可能是一样的,reindex 到目标索引时无法保证是哪个源索引的文档id,最终覆盖只保留一个文档id。
8、从远程 ES 集群进行 reindex
POST _reindex
{
"source": {
"remote": {
"host": "http://otherhost:9200",
"username": "user",
"password": "xxxx"
},
"size": 100,
"index": "product_index",
"query": {
"bool": {
"filter": {
{"wildcard":{"productName":"康师傅*"}}
}
}
}
},
"dest": {
"index": "new_product_index"
}
}
适用场景:从一个远程的 Elasticsearch 的服务器上进行 reindex,需要在请求体的 remote 参数填写连接信息,而主机信息要保证在 elasticsearch.yml 配置文件中的 reindex.remote.whitelist 进行了定义,如下:
reindex.remote.whitelist: "otherhost:9200,localhost:*"
当开启 reindex 时,我们可以使用 task API 查看进度,使用以下命令:
GET _tasks?detailed=true&actions=*reindex
当然,也可以中途取消 reindex,使用以下命令:
# 从查看进度信息里找到具体的task_id
POST _tasks/task_id:xxx/_cancel
更多 reindex 的使用场景和细节,可以参考官方提供的 Reindex API 文档。
三、实践
第一步:创建一个目标索引 payment_cdr_index_v01,语句如下
PUT payment_cdr_index_v01
{
"settings": {
"index": {
"number_of_shards": "9",
"number_of_replicas": "0",
"refresh_interval": "3s",
"translog": {
"durability": "async",
"flush_threshold_size": "1024mb",
"sync_interval": "120s"
},
"merge": {
"scheduler": {
"max_thread_count": "2"
}
},
"mapping": {
"total_fields": {
"limit": 1000
}
}
}
},
"mappings": {
"properties": {
"pkey": { "type": "keyword" },
"cdrObj": { "type": "text" },
"cdrFlag": { "type": "long" },
"nlObj": { "type": "text" },
"nlFlag": { "type": "long" },
"paymentObj": { "type": "text" },
"paymentFlag": { "type": "long" }
}
}
}
目标索引 payment_cdr_index_v01 与源索引 payment_cdr_index_v13 的不同之处有,settings.index.mapping.total_fields.limit 的值由 50000 改为 1000;新增 mappings.properties.paymentFlag 和 mappings.properties.paymentObj。
第二步:调用 Reindex API,语句如下
POST _reindex?slices=auto&refresh&requests_per_second=5000
{
"source": {
"index": "payment_cdr_index_v13",
"size": 5000
},
"dest": {
"index": "payment_cdr_index_v01"
}
}
源索引的文档数量有 2kw 左右,肯定要考虑优化的问题。这里,采用自动切片方式,并限制 reindex 的请求速度为 5000 个文档,批量操作的大小 size 也为 5000 个文档。
第三步:执行后,通过 task API 查看进度信息,如下
{
"nodes" : {
"YTXefW1WSIulNKfpOKmJkA" : {
"name" : "node-1",
"transport_address" : "xxxxxxx:9300",
"host" : "xxxxxxx",
"ip" : "xxxxxxx:9300",
"roles" : [
"master",
"remote_cluster_client"
],
"tasks" : {
"YTXefW1WSIulNKfpOKmJkA:719098237" : {
"node" : "YTXefW1WSIulNKfpOKmJkA",
"id" : 719098237,
"type" : "transport",
"action" : "indices:data/write/reindex",
"status" : {
"slice_id" : 6,
"total" : 2331662,
"updated" : 0,
"created" : 175000,
"deleted" : 0,
"batches" : 35,
"version_conflicts" : 0,
"noops" : 0,
"retries" : {
"bulk" : 0,
"search" : 0
},
"throttled_millis" : 305999,
"requests_per_second" : 555.55554,
"throttled_until_millis" : 6534
},
"description" : "reindex from [payment_cdr_index_v13] to [payment_cdr_index_v01][_doc]",
"start_time_in_millis" : 1658655313123,
"running_time_in_nanos" : 318657619079,
"cancellable" : true,
"parent_task_id" : "YTXefW1WSIulNKfpOKmJkA:719098216",
"headers" : { }
},
"YTXefW1WSIulNKfpOKmJkA:719098239" : {
"node" : "YTXefW1WSIulNKfpOKmJkA",
"id" : 719098239,
"type" : "transport",
"action" : "indices:data/write/reindex",
"status" : {
"slice_id" : 7,
"total" : 2326833,
"updated" : 0,
"created" : 170000,
"deleted" : 0,
"batches" : 35,
"version_conflicts" : 0,
"noops" : 0,
"retries" : {
"bulk" : 0,
"search" : 0
},
"throttled_millis" : 305999,
"requests_per_second" : 555.55554,
"throttled_until_millis" : 0
},
"description" : "reindex from [payment_cdr_index_v13] to [payment_cdr_index_v01][_doc]",
"start_time_in_millis" : 1658655313124,
"running_time_in_nanos" : 318656374818,
"cancellable" : true,
"parent_task_id" : "YTXefW1WSIulNKfpOKmJkA:719098216",
"headers" : { }
},
"YTXefW1WSIulNKfpOKmJkA:719098233" : {
"node" : "YTXefW1WSIulNKfpOKmJkA",
"id" : 719098233,
"type" : "transport",
"action" : "indices:data/write/reindex",
"status" : {
"slice_id" : 4,
"total" : 2331868,
"updated" : 0,
"created" : 175000,
"deleted" : 0,
"batches" : 35,
"version_conflicts" : 0,
"noops" : 0,
"retries" : {
"bulk" : 0,
"search" : 0
},
"throttled_millis" : 305999,
"requests_per_second" : 555.55554,
"throttled_until_millis" : 8188
},
"description" : "reindex from [payment_cdr_index_v13] to [payment_cdr_index_v01][_doc]",
"start_time_in_millis" : 1658655313121,
"running_time_in_nanos" : 318659676960,
"cancellable" : true,
"parent_task_id" : "YTXefW1WSIulNKfpOKmJkA:719098216",
"headers" : { }
},
"YTXefW1WSIulNKfpOKmJkA:719098235" : {
"node" : "YTXefW1WSIulNKfpOKmJkA",
"id" : 719098235,
"type" : "transport",
"action" : "indices:data/write/reindex",
"status" : {
"slice_id" : 5,
"total" : 2329417,
"updated" : 0,
"created" : 170000,
"deleted" : 0,
"batches" : 34,
"version_conflicts" : 0,
"noops" : 0,
"retries" : {
"bulk" : 0,
"search" : 0
},
"throttled_millis" : 296999,
"requests_per_second" : 555.55554,
"throttled_until_millis" : 355
},
"description" : "reindex from [payment_cdr_index_v13] to [payment_cdr_index_v01][_doc]",
"start_time_in_millis" : 1658655313122,
"running_time_in_nanos" : 318658521065,
"cancellable" : true,
"parent_task_id" : "YTXefW1WSIulNKfpOKmJkA:719098216",
"headers" : { }
},
"YTXefW1WSIulNKfpOKmJkA:719098229" : {
"node" : "YTXefW1WSIulNKfpOKmJkA",
"id" : 719098229,
"type" : "transport",
"action" : "indices:data/write/reindex",
"status" : {
"slice_id" : 2,
"total" : 2331076,
"updated" : 0,
"created" : 175000,
"deleted" : 0,
"batches" : 35,
"version_conflicts" : 0,
"noops" : 0,
"retries" : {
"bulk" : 0,
"search" : 0
},
"throttled_millis" : 305999,
"requests_per_second" : 555.55554,
"throttled_until_millis" : 6597
},
"description" : "reindex from [payment_cdr_index_v13] to [payment_cdr_index_v01][_doc]",
"start_time_in_millis" : 1658655313119,
"running_time_in_nanos" : 318661727240,
"cancellable" : true,
"parent_task_id" : "YTXefW1WSIulNKfpOKmJkA:719098216",
"headers" : { }
},
"YTXefW1WSIulNKfpOKmJkA:719098231" : {
"node" : "YTXefW1WSIulNKfpOKmJkA",
"id" : 719098231,
"type" : "transport",
"action" : "indices:data/write/reindex",
"status" : {
"slice_id" : 3,
"total" : 2330020,
"updated" : 0,
"created" : 170000,
"deleted" : 0,
"batches" : 34,
"version_conflicts" : 0,
"noops" : 0,
"retries" : {
"bulk" : 0,
"search" : 0
},
"throttled_millis" : 296999,
"requests_per_second" : 555.55554,
"throttled_until_millis" : 1051
},
"description" : "reindex from [payment_cdr_index_v13] to [payment_cdr_index_v01][_doc]",
"start_time_in_millis" : 1658655313120,
"running_time_in_nanos" : 318660554712,
"cancellable" : true,
"parent_task_id" : "YTXefW1WSIulNKfpOKmJkA:719098216",
"headers" : { }
},
"YTXefW1WSIulNKfpOKmJkA:719098226" : {
"node" : "YTXefW1WSIulNKfpOKmJkA",
"id" : 719098226,
"type" : "transport",
"action" : "indices:data/write/reindex",
"status" : {
"slice_id" : 1,
"total" : 2331428,
"updated" : 0,
"created" : 175000,
"deleted" : 0,
"batches" : 35,
"version_conflicts" : 0,
"noops" : 0,
"retries" : {
"bulk" : 0,
"search" : 0
},
"throttled_millis" : 305999,
"requests_per_second" : 555.55554,
"throttled_until_millis" : 7567
},
"description" : "reindex from [payment_cdr_index_v13] to [payment_cdr_index_v01][_doc]",
"start_time_in_millis" : 1658655313118,
"running_time_in_nanos" : 318662731134,
"cancellable" : true,
"parent_task_id" : "YTXefW1WSIulNKfpOKmJkA:719098216",
"headers" : { }
},
"YTXefW1WSIulNKfpOKmJkA:719098222" : {
"node" : "YTXefW1WSIulNKfpOKmJkA",
"id" : 719098222,
"type" : "transport",
"action" : "indices:data/write/reindex",
"status" : {
"slice_id" : 0,
"total" : 2332121,
"updated" : 0,
"created" : 175000,
"deleted" : 0,
"batches" : 35,
"version_conflicts" : 0,
"noops" : 0,
"retries" : {
"bulk" : 0,
"search" : 0
},
"throttled_millis" : 305999,
"requests_per_second" : 555.55554,
"throttled_until_millis" : 7393
},
"description" : "reindex from [payment_cdr_index_v13] to [payment_cdr_index_v01][_doc]",
"start_time_in_millis" : 1658655313104,
"running_time_in_nanos" : 318676310730,
"cancellable" : true,
"parent_task_id" : "YTXefW1WSIulNKfpOKmJkA:719098216",
"headers" : { }
},
"YTXefW1WSIulNKfpOKmJkA:719098216" : {
"node" : "YTXefW1WSIulNKfpOKmJkA",
"id" : 719098216,
"type" : "transport",
"action" : "indices:data/write/reindex",
"status" : {
"total" : 0,
"updated" : 0,
"created" : 0,
"deleted" : 0,
"batches" : 0,
"version_conflicts" : 0,
"noops" : 0,
"retries" : {
"bulk" : 0,
"search" : 0
},
"throttled_millis" : 0,
"requests_per_second" : 0.0,
"throttled_until_millis" : 0,
"slices" : [
null,
null,
null,
null,
null,
null,
null,
null,
null
]
},
"description" : "reindex from [payment_cdr_index_v13] to [payment_cdr_index_v01][_doc]",
"start_time_in_millis" : 1658655313042,
"running_time_in_nanos" : 318738902509,
"cancellable" : true,
"headers" : { }
},
"YTXefW1WSIulNKfpOKmJkA:719098241" : {
"node" : "YTXefW1WSIulNKfpOKmJkA",
"id" : 719098241,
"type" : "transport",
"action" : "indices:data/write/reindex",
"status" : {
"slice_id" : 8,
"total" : 2329307,
"updated" : 0,
"created" : 175000,
"deleted" : 0,
"batches" : 35,
"version_conflicts" : 0,
"noops" : 0,
"retries" : {
"bulk" : 0,
"search" : 0
},
"throttled_millis" : 305999,
"requests_per_second" : 555.55554,
"throttled_until_millis" : 7336
},
"description" : "reindex from [payment_cdr_index_v13] to [payment_cdr_index_v01][_doc]",
"start_time_in_millis" : 1658655313125,
"running_time_in_nanos" : 318655552613,
"cancellable" : true,
"parent_task_id" : "YTXefW1WSIulNKfpOKmJkA:719098216",
"headers" : { }
}
}
}
}
}
从进度信息我们可以了解以下几点:
- 当 reindex 的 slices 参数设置为自动分片方式时,ES 会按索引的实际分片数进行分配文档,这里一共有9个分片,每个分片平均处理 230W 左右的文档,由于目标索引创建时就是空的,这9个分片的操作状态就都是 created 了。
- 请求体的 size 参数设置的是批量处理 5000 个,batches * 5000 就等于 created 数量,而 version_conflicts 则记录的是 reindex 过程中文档版本的冲突数量,reindex 默认冲突时会退出,因此为了避免因版本冲突而退出,可以在请求体设置conflicts=proceed。
四、谈一谈 Reindex 优化问题
reindex 是一个很耗时的操作,当 ES 索引的文档数量很大时,不得不去面对和思考效率的问题了,有以下几个方面可以参考:
创建目标索引:
- settings.index.refresh_interval:刷新间隔时间,最好设置为-1,即不刷新,等 reindex 结束后可以重新设置该参数;
- settings.index.number_of_replicas:副本数量,设置为0,副本在索引创建之后也是可以动态调整的,reindex 没必要设置;
调用 Reindex API:
- 请求参数使用自动切片方式,即 slices=auto;
- 请求参数适当限制处理速度,避免ES集群出现崩溃,即 requests_per_second;
- 请求体使用参数 conflicts 并修改为 proceed,因为 conflicts 默认的是 abort,即默认遇到版本冲突的时候会退出 reindex;
- 请求体使用参数 size 并根据当前文档的情况评估一个合理的批量处理的数值;
总之,对于大数据量的索引,reindex 是一个耗时而漫长的操作,一定要注意这样优化点。
更多推荐
所有评论(0)