一、什么是数据流

官方定义:
Data streams 数据流
数据流是可以跨多个索引存储仅限于追加存储的时间序列数据,同时为请求提供单个命名资源。

在 Elasticsearch 7.9之前,通常会使用带写索引权限的索引别名来管理时间序列数据。数据流取代了这一功能,需要更少的维护,并自动与数据层集成。
所以可以把数据流看作是带有写索引权限的索引别名的升级特性。

官网说明:Data Stream

从定义中我们可以看出,数据流的一些特点:
1、可以跨多个索引存储
2、仅限追加存储,不支持删除、修改操作
3、时间序列数据
4、为请求提供单个命名资源,可以理解成天然具有公共的别名

使用场景:
数据流非常适合于日志、事件、指标和其他连续生成的数据。

可以将索引和搜索请求直接提交到数据流,流会自动将请求路由到存储流数据的备份索引。
也可以使用索引生命周期管理(ILM)来对数据流中的备份索引的自动化管理。例如,您可以使用 ILM 自动将较旧的支持索引移动到较便宜的硬件,并删除不需要的索引。ILM 可以帮助您在数据增长时降低成本和开销。

二、核心概念

1、后备索引 Backing indices

数据流由一个或多个隐藏的自动生成的后备索引组成。

在这里插入图片描述
1、数据流需要匹配的索引模板。模板包含用于配置流的后台索引的映射和设置。

2、索引到数据流的每个文档必须包含一个@timestamp 字段,映射为 date 或 date _ nanos 字段类型。如果索引模板没有为@timestamp 字段指定映射,Elasticsearch 将@timestamp 映射为带有默认选项的日期字段。

3、同一索引模板可用于多个数据流。不能删除数据流正在使用的索引模板。

2、读请求 Read request

当您向数据流提交一个读请求时,数据流将请求路由到它的所有后备索引。
在这里插入图片描述

3、写索引 write index

最近创建的备份索引是数据流的写索引。流仅向此索引添加新文档。
在这里插入图片描述

不能将新文档添加到其他后备索引中,即使是直接向索引发送请求也不行。
也不能对写入索引执行可能阻碍索引的操作,比如删除、克隆、冻结

4、滚动 Rollover

滚动操作将创建一个新的后备索引,该索引将成为流的新写索引。

一般会使用 ILM 在写索引达到指定的年龄或大小时自动滚动数据流。如果需要,还可以手动滚动数据流。

5、自增序号 generation

每触发一次滚动操作就会生成新的后备索引,generation后备索引的自增序号。
每个数据流跟踪id的生成规则:
一个六位数的零填充整数,作为流的滚动累计计数,从000001开始。

创建备份索引时,索引使用以下约定命名:

.ds-<data-stream>-<yyyy.MM.dd>-<generation>

说明:
data-stream是数据流名称
yyyy.MM.dd是后备索引创建时间
generation自增序号

6、仅限追加 Append Only

数据流是为现有数据很少更新的用例而设计的。不能将现有文档的更新或删除请求直接发送到数据流。

如果需要,可以通过直接向文档的后备索引提交请求来更新或删除文档。

如果经常更新或删除现有的时间序列数据,请使用具有写索引权限的索引别名,而不是数据流。

三、创建数据流

官网说明:创建数据流
在这里插入图片描述

1.创建索引生命周期策略

PUT _ilm/policy/timeseries_policy
{
  "policy": {
    "phases": {
      "hot": {                                
        "actions": {
          "rollover": {
            "max_docs":3
          }
        }
      },
      "delete": {
       "min_age": "30s",                     
        "actions": {
          "delete": {}                        
        }
      }
    }
  }
}

2.创建索引模板

说明:通过data_stream属性说明这是一个数据流的索引模板。

PUT _index_template/timeseries_template
{
  "index_patterns": ["timeseries"],                   
  "data_stream": { },
  "template": {
    "settings": {
      "number_of_shards": 1,
      "number_of_replicas": 1,
      "index.lifecycle.name": "timeseries_policy"     
    }
  }
}

3.创建数据流

方式一:创建并添加数据
创建数据流timeseries,同时向数据流中写入一条数据。数据中必须包含@timestamp字段信息

POST timeseries/_doc
{
  "message": "logged the request",
  "@timestamp": "1633677855467"
}

方式二:仅创建数据流

PUT _data_stream/timeseries

注意⚠️:
1、数据流名称必须和索引模板中的index_patterns匹配。
2、写入的数据中必须包含@timestamp字段信息

4.向数据流写入数据

批量写入:

PUT timeseries/_bulk
{ "create":{ } }
{"message": "logged the request1","@timestamp": "1633677862467"}
{ "create":{ } }
{"message": "logged the request2","@timestamp": "1633677872468"}
{ "create":{ } }
{"message": "logged the request3","@timestamp": "1633682619628"}

单条写入:

POST timeseries/_doc
{
  "message": "logged the request",
  "@timestamp": "1633677872468"
}

5.将索引别名转化成数据流

POST _data_stream/_migrate/my-time-series-data

6.获取数据流信息

GET _data_stream/my-data-stream

四、删除数据流

DELETE _data_stream/my-data-stream

注意⚠️:
不同于删除别名,该命令会同时删除数据流及其包含的后备索引数据,一定要非常谨慎的操作。

五、查看数据流的生命周期

GET .ds-timeseries-*/_ilm/explain

六、采用update_by_query更新数据

POST /my-data-stream/_update_by_query
{
  "query": {
    "match": {
      "user.id": "l7gk7f82"
    }
  },
  "script": {
    "source": "ctx._source.user.id = params.new_id",
    "params": {
      "new_id": "XgdX0NoX"
    }
  }
}

七、采用delete_by_query删除数据

POST /my-data-stream/_delete_by_query
{
  "query": {
    "match": {
      "user.id": "vlb44hny"
    }
  }
}

八、直接向后备索引发起删除或更新请求

也可以通过向包含文档的后备索引直接发送请求来更新或删除数据流中的文档。
1、先查询索引记录的信息
主要是为了获取后备索引的名称_index,文档id号_id,序列号_seq_no,主分片号_seq_no。

GET /my-data-stream/_search
{
  "seq_no_primary_term": true,
  "query": {
    "match": {
      "user.id": "yWIumJd7"
    }
  }
}

响应:

 "hits": [
      {
        "_index": ".ds-my-data-stream-2099.03.08-000003",      
        "_type": "_doc",
        "_id": "bfspvnIBr7VVZlfp2lqX",              
        "_seq_no": 0,                               
        "_primary_term": 1,                         
        "_score": 0.2876821,
        "_source": {
          "@timestamp": "2099-03-08T11:06:07.000Z",
          "user": {
            "id": "yWIumJd7"
          },
          "message": "Login successful"
        }
      }
    ]

2、更新

PUT /.ds-my-data-stream-2099-03-08-000003/_doc/bfspvnIBr7VVZlfp2lqX?if_seq_no=0&if_primary_term=1
{
  "@timestamp": "2099-03-08T11:06:07.000Z",
  "user": {
    "id": "8a4f500d"
  },
  "message": "Login successful"
}

3、删除

DELETE /.ds-my-data-stream-2099.03.08-000003/_doc/bfspvnIBr7VVZlfp2lqX

总结

数据流并不是什么高深莫测的东西,只是ES中对具有写权限的索引别名机制的一种升级,使用过程中可以类比索引别名来理解。
1、数据流适用于仅限追加存储的时间序列数据,比如日志、事件、指标等。
2、不支持对数据流直接发起更新、删除请求,但是可以直接向数据流中包含的后备索引发起更新、删除请求。
3、数据流中必须包含@timestamp字段信息
4、数据流中会包含一系列后备索引,读请求会发送到所有后备索引,写请求会发送到最新的后备索引。
5、数据流一般都会结合索引生命周期管理ILM一起使用,实现对索引数据生命周期的自动化管理。
6、数据流的创建一般是先创建索引生命周期管理策略,再创建索引模板,然后创建索引。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐