在我之前的文章中,我已经介绍了 Elasticsearch 的索引生命周期管理:

索引生命周期管理对于 Time Series Data (TSD) 非常有用。那么到底什么是 Time Series Data 呢?

Data stream 在索引周期管理中的应用

Data stream 在索引周期管理中的应用_哔哩哔哩_bilibili

什么是 Time Series Data?

TSD (Time Series Data) 始终与时间戳关联,该时间戳标识创建事件时该数据的时间点事件。 例如,它可以是传感器数据(温度测量)或安全设备日志,这些数据有什么共同点? 随着时间的流逝,它的重要性趋于松散,与过去事件相关的旧文档不如与新事件相关的文档重要。 你可能不再对上个月的传感器相关数据感兴趣,尤其是非常精确的数据。

因此,在 ES 中,在弹性搜索中处理此数据的最佳选择是使用基于时间的索引。

Time Series Data 具有以下的特点:

  • 它可以是来自一些服务器的日志或者是一些设施的指标,社交媒体流,基于时间的事件
  • 由时间戳 + 数据组成
  • 通常搜索最近事件
  • 旧文件变得不太重要
  • 基于时间的索引是最佳选择

         - 每天,每周,每月,每年...创建一个新索引

在现实的数据管理中,我们可以把数据区分为:static data 及 time series data。它们的区别如下:

在实际生活中,酒店房间的数据就是典型的 Static data。我们不经常添加房间,或者修改有关房间的数据。而针对机器的指标或网站的访问数据,这些都是很典型的 Time series data。它们会随着时间的前进,收集更多的数据。这种数据的更新几乎不发生,而过时的数据对我们的分析不是很有用。通常为了节省成本的需求,删除超过一定期限的数据。通常 Time series data 的数据增长很快并且几乎从来不更新:

鉴于 Time series data 的这些特性,我们可以使用一些方法来管理我们索引,比如时间久了的数据不是经常被用于查询,我们可以把它们放到比较便宜且性能比较差一点的节点上,如果时间特别长的数据,我们甚至可以采用一些方法来定期删除这些数据以节省成本。对于刚写入的数据,我们希望 Elasticsearch 的节点以最高的效率来索引数据,同时给最好的性能供搜素,比如更多的 CPU 内存。当这些数据的使用性不是那么高的话,我们希望通过一定的方法把数据移到另外一些性能少差一点的节点上。这里就涉及到一个操作 rollover。在我之前的文章 “Elasticsearch: rollover API” 有介绍。它依赖于一定的条件来进行 rollover:

  • 索引的大下超过一个指定的数值
  • 索引里的文档超过一点的时间期限
  • 索引的文档数量超过预设的数量

当上面的任何一个条件达到满足时,rollover 操作就会发生:

 如上所示,当 my_index-000001 满足其中的一个条件时,rollover 就会发生,并且接下来写入的文档就会自动写入到 my_index-000002 中。而 my_index-000001 索引可能会 rollover 到另外一个新能稍差的节点中以让出位置给索引最新的文档(通常需要更高性能的节点)。Rollover 操作创建新的索引。那么我们该如何读取这些以 my_index- 开头的文档呢?比如我们需要搜索所有的索引。这个要使用到 rollover alias:

 有关这个方面的内容,请详细阅读文章 “Elasticsearch:Index 生命周期管理入门”。我们通过同样的一个 rollover alias 来进行写入及读取的操作。

处理 Time Series Data 的挑战

当我们处理 TSD 数据时,我们也面临很多的挑战:

我们从上面的图中可以看出来。从数据的有效性来说,数据会随着时间的流失而失去它的重要性,并且对旧的数据的查询率会变低。我们需要使用新的存储模式来对数据进行保存,比如,删除时间很久的数据,或者对稍微久一点的数据保存于一些价格较为便宜的存储设备上以节省成本,并同时设置该索引为只读。我们甚至对一些历史更久的索引进行 close/frozen 操作从而更加进一步节省运行资源。

另外从控制索引的大小来说,我们很难预测索引的大小。当我们开始收集数据时:

我们只有很少量的数据,随着数据量的增加:

最终,我们可能看到更多的数据被收集上来:

这对于我们计划集群的大小和分片的个数带来挑战。我们需要按照我们需求针对每天,或者每周来分别创建索引来满足自己业务的需求。我们甚至删除一些不需要的时间久一些的索引数据。这个就是我们通常所说的 ILM (索引生命周期管理)。

避免过大的单个索引,而是把一个大的索引分为多个小的索引来进行存储

不要让一个索引的大小过大,但是也不要让一个索引的大小过小。

Data stream

Data stream (数据流)是 Elastic Stack 7.9 的一个新的功能。Data stream 使你可以跨多个索引存储只追加数据的时间序列数据,同时为请求提供唯一的一个命名资源。 data stream 非常适合日志,事件,指标以及其他持续生成的数据。

你可以将索引和搜索请求直接提交到 data stream。 stream 自动将请求路由到存储流数据的后备索引。 你可以使用索引生命周期管理(ILM)来自动管理这些后备索引。 例如,你可以使用 ILM 自动将较旧的后备索引移动到较便宜的硬件上,并删除不需要的索引。 随着数据的增长,ILM 可以帮助你降低成本和开销。

请注意,并非所有数据集都以相同的方式老化。 数据层不适用于产品搜索或全文搜索用例,例如,随着文档的老化,查询的频率会降低。 通常,数据生命周期概念适用于时间序列数据,但某些用例可能会有所不同。随着索引的老化而移动索引的过程由称为索引生命周期管理的功能管理,我们将在下面进行绍。

后备索引(Backing indices)

数据流由一个或多个 hidden 的自动生成的后备索引(Backing indices)组成。

每个数据流都需要一个匹配的索引模板。 该模板包含用于配置流的后备索引的映射和设置。

索引到数据流的每个文档都必须包含一个 @timestamp 字段,该字段映射为 datedate_nanos 字段类型。 如果索引模板未为 @timestamp 字段指定映射,则 Elasticsearch 将@timestamp 映射为具有默认选项的日期字段。

同一个索引模板可用于多个数据流。 你不能删除数据流正在使用的索引模板。

读请求

当你向数据流提交读取请求时,该流会将请求路由到其所有后备索引。

写索引

最近创建的后备索引是数据流的写索引。 流仅将新文档添加到该索引。

你不能将新文档添加到其他支持索引,即使直接将请求发送到索引也是如此。

你也不能对可能阻碍索引的写索引执行如下的操作:

Rollover

创建数据流时,Elasticsearch 会自动为该流创建一个后备索引。 该索引还充当流的第一个写入索引。 rollover 会创建一个新的后备索引,该后备索引将成为流的新写入索引。

我们建议当写入索引达到指定的使用期限或大小时,使用 ILM 自动翻转数据流。 如果需要,你还可以手动将数据 rollover。Data stream 也可以在不使用任何 ILM 策略的情况下进行 rollover。这种时候 ,它采用的是一种默认的设置。Data stream 把请求转至合适的后备索引。

生成

每个数据流都跟踪其生成:一个六位数,零填充的整数,用作该流的 rollover 的累积计数,从 000001 开始。

创建支持索引时,将使用以下约定来命名该索引:

.ds-<data-stream>-<generation>

具有更高 generation 的后备索引包含更多最新数据。 例如,web-server-logs 数据流有一个 generation 为 34。该流的最新后备索引名为 .ds-web-server-logs-000034。

某些操作(例如 shrinkrestore)可以更改后备索引的名称。 这些名称更改不会从其数据流中删除后备索引。

只追加

数据流专为很少更新现有数据(如果有的话)的用例而设计。 你不能将对现有文档的更新或删除请求直接发送到数据流。 而是使用  update by query 和  delete by query 删除。

如果需要,你可以通过直接向文档的后备索引提交请求来更新或删除文档

提示:如果您经常更新或删除现有文档,请使用索引别名和索引模板,而不要使用数据流。 你仍然可以使用 ILM 管理别名的索引。

在接下来的联系中,我们将在 ILM 中来使用 data stream。

安装 Elastic Stack

在今天实验中,我们将仿照之前的教程  “Elasticsearch:Index 生命周期管理入门” 来创建有两个 node 的 Elasticsearch 集群。

我们可以参考文章 “Elasticsearch:运用shard filtering来控制索引分配给哪个节点” 运行起来两个 node 的 cluster。其实非常简单,当我们安装好 Elasticsearch 后,打开一个 terminal,并运行如下的命令:

./bin/elasticsearch -E node.name=node1 -E node.attr.data=hot -Enode.max_local_storage_nodes=2

它将运行起来一个叫做 node1 的节点。同时在另外 terminal 中运行如下的命令:

./bin/elasticsearch -E node.name=node2 -E node.attr.data=warm -Enode.max_local_storage_nodes=2

它运行另外一个叫做 node2 的节点。我们可以通过如下的命令来进行查看:

GET _cat/nodes?v

显示两个节点:

我们可以用如下的命令来检查这两个 node 的属性:

GET _cat/nodeattrs?v&s=name

显然其中的一个 node 是 hot,另外一个是 warm。

这样,我们就创建好了我们的 Elasticsearch 集群了。

测试环境如下:

创建一个  data stream

在接下来的动手实践中,我们的操作非常简单。为了利用 ILM 来自动化 rollover 以及管理时序索引,我们作如下新的步骤:

  • 创建一个 Lifecycle Policy
  • 创建一个运用于一个 data stream 的 index template
  • 创建一个 data stream
  • 发送数据到索引,并验证索引经历 Lifecycle 的阶段

如上所示,一个典型的 ILM 通常有4个阶段:Hot, Warm, Cold, 以及 Delete。针对自己的业务需求,你可以分别启动相应的阶段。针对如下的练习,我们将省略掉 Cold 阶段。

创建 Index Lifecycle Policy

在上面,我把 Policy 的名字取做 demo。同时我也对 Hot phase 的 rollover 的条件进行了定义。当它满足如下的任何一个条件:

  • 索引的大小大于 1G
  • 文档的数量大于 5
  • 索引的时间跨度超过30天

那么索引将自动进行 rollover 到另外一个索引。

我们接着定义 Warm phase:

在上面,我们启动 Warm phase。在这个 phase 里,数据将保存于带有 warm 标签的节点上。由于我们只有一个 warm 节点,在本练习中,我将 number of replica 设置为 0。在实际的使用中,有更多的 replica 代表有更多的 read 能力。这个可以根据自己的业务需求和配置进行设置。我也同时启动了 Shrink index,也就是说它将在 warm phase 里把所有的 primary shard 压缩到一个。通常 primary shard 代表导入数据的能力。在 warm phase 中,我们通常不需要导入数据,我们只在 hot 节点中导入数据。

我们接下来定义 Delete phase:

在上面,我们启动了 Delete phase。上面显示当我们的文档进入 Warm 阶段后,再过3分钟,这个索引将会被自动删除。点击上面的 Save as new policy 按钮。我们可以通过如下的 API 来获得被定义的 Policy:

GET _ilm/policy/demo

 {
  "demo" : {
    "version" : 3,
    "modified_date" : "2020-12-03T14:33:30.508Z",
    "policy" : {
      "phases" : {
        "warm" : {
          "min_age" : "0ms",
          "actions" : {
            "allocate" : {
              "number_of_replicas" : 0,
              "include" : { },
              "exclude" : { },
              "require" : {
                "data" : "warm"
              }
            },
            "shrink" : {
              "number_of_shards" : 1
            },
            "set_priority" : {
              "priority" : 50
            }
          }
        },
        "hot" : {
          "min_age" : "0ms",
          "actions" : {
            "rollover" : {
              "max_size" : "1gb",
              "max_age" : "30d",
              "max_docs" : 5
            },
            "set_priority" : {
              "priority" : 100
            }
          }
        },
        "delete" : {
          "min_age" : "3m",
          "actions" : {
            "delete" : {
              "delete_searchable_snapshot" : true
            }
          }
        }
      }
    }
  }
}

上面的显示是针对 7.9.1 版本的结果。针对 7.9.3 及 7.10 (目前最新的版本),有一些原因,为了使得 delete 能正常工作,我们需要使用如下的 API 来对之进行设置:

PUT _ilm/policy/demo
{
  "policy": {
    "phases": {
      "hot": {
        "min_age": "0ms",
        "actions": {
          "rollover": {
            "max_age": "30d",
            "max_size": "1gb",
            "max_docs": 5
          },
          "set_priority": {
            "priority": 100
          }
        }
      },
      "warm": {
        "actions": {
          "allocate": {
            "number_of_replicas": 0,
            "include": {},
            "exclude": {},
            "require": {
              "data": "warm"
            }
          },
          "shrink": {
            "number_of_shards": 1
          },
          "set_priority": {
            "priority": 50
          }
        }
      },
      "delete": {
        "min_age": "3m",
        "actions": {
          "delete" : {}
        }
      }
    }
  }
}

请注意上面的 delete 部分。在 actions 里面含有 "delete": {}。

定义 index template

我们在 Kibana 的 console 中输入如下的命令:

# Create tje template to apply the policy to every new backing index of the data stream
PUT _index_template/template_demo
{
  "index_patterns": ["demo-*"],
  "data_stream": {},
  "priority": 200,
  "template": {
    "settings": {
      "number_of_shards": 2,
      "index.lifecycle.name": "demo",
      "index.routing.allocation.require.data": "hot"
    }
  }
}

在上面,我们创建了一个叫做 template_demo 的 index template。请注意,我们在上面定义 data_stream 为一个空的 object。请注意自己先前定义的 hot 及 warm 所使用的 tag。在上面,我们使用了 data。我们定义了两个 primary shards。

创建一个 data stream

创建一个  data stream 是非常简单的:

# Create a data stream
PUT _data_stream/demo-ds

运行上面的命令。我们甚至直接向 demo-ds 里写入一个文档来创建这个 data stream。

由于我们在上面已经创建了以 demo-* 为 index_pattern 的 index template,所以上面的创建是成功的。否则如果我们用如下的命令:

PUT _data_stream/demo

它将会是失败的。错误代码告诉你没有相对应的 index template。

# Check the shards allocation
GET _cat/shards/demo-ds?v

上面的命显示:

index              shard prirep state      docs store ip        node
.ds-demo-ds-000001 1     p      STARTED       0  208b 127.0.0.1 node1
.ds-demo-ds-000001 1     r      UNASSIGNED                      
.ds-demo-ds-000001 0     p      STARTED       0  208b 127.0.0.1 node1
.ds-demo-ds-000001 0     r      UNASSIGNED                      

由于我们分配了两个 primary shards,但是我们只有一个 hot 节点,所以在上面我们看到的是有两个没有被分分配的 replica shards。

我们可以通过如下的命令来检查 data stream 的索引:

# Verify data stream indexes
GET _data_stream/demo-ds

上面的命令显示:

{
  "data_streams" : [
    {
      "name" : "demo-ds",
      "timestamp_field" : {
        "name" : "@timestamp"
      },
      "indices" : [
        {
          "index_name" : ".ds-demo-ds-000001",
          "index_uuid" : "sPN5JEW8SVuTFNh4UqK9zw"
        }
      ],
      "generation" : 1,
      "status" : "YELLOW",
      "template" : "template_demo",
      "ilm_policy" : "demo"
    }
  ]
}

上面显示了一个叫做 .ds-demo-ds-000001 的索引已经被创建了。我们不需要做任何的事情。

我们也可以通过如下的命令来进行检查这个被创建的索引的设置:

GET .ds-demo-ds-000001/_settings

上面的命令显示:

{
  ".ds-demo-ds-000001" : {
    "settings" : {
      "index" : {
        "lifecycle" : {
          "name" : "demo"
        },
        "routing" : {
          "allocation" : {
            "require" : {
              "data" : "hot"
            }
          }
        },
        "hidden" : "true",
        "number_of_shards" : "2",
        "provided_name" : ".ds-demo-ds-000001",
        "creation_date" : "1606989315969",
        "priority" : "100",
        "number_of_replicas" : "1",
        "uuid" : "sPN5JEW8SVuTFNh4UqK9zw",
        "version" : {
          "created" : "7100099"
        }
      }
    }
  }

从上面,我们可以看出来,它位于 hot 节点上,同时 hidden 显示为 true,也就意味着它将被不能使用通配符表达式来进行返回。

发送数据到 data stream

我们接下来执行如下的命令:

PUT _ingest/pipeline/add-timestamp
{
  "processors": [
    {
      "set": {
        "field": "@timestamp",
        "value": "{{_ingest.timestamp}}"
      }
    }
  ]
}

在上面的 add-timestamp pipeline 是为了加入一个被导入时的 timestamp。我们先执行这个 pipeline,然后执行下面的命令:

POST demo-ds/_doc?pipeline=add-timestamp
{
  "user": {
    "id": "liuxg"
  },
  "message": "This is so cool!"
}

我们可以看到如下的输出:

{
  "_index" : ".ds-demo-ds-000001",
  "_type" : "_doc",
  "_id" : "3JsVKHYBtwZVzHJGZXRc",
  "_version" : 1,
  "result" : "created",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 0,
  "_primary_term" : 1
}

它表明我们的数据是发送到 .ds-demo-ds-000001 这个索引中的。我们可以使用如下的命令来进行搜索:

# Search the data stream
GET demo-ds/_search

上面的命令显示的结果是:

{
  "took" : 351,
  "timed_out" : false,
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : ".ds-demo-ds-000001",
        "_type" : "_doc",
        "_id" : "3JsVKHYBtwZVzHJGZXRc",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2020-12-03T10:10:59.541518Z",
          "message" : "This is so cool!",
          "user" : {
            "id" : "liuxg"
          }
        }
      }
    ]
  }
}

上面显示有一个文档已经被搜索到了,并且它的索引的名称为 .ds-demo-ds-000001。

我们接着再执行如下的命令4次:

POST demo-ds/_doc?pipeline=add-timestamp
{
  "user": {
    "id": "liuxg"
  },
  "message": "This is so cool!"
}

我们将会看到和下面类似的输出:

{
  "_index" : ".ds-demo-ds-000001",
  "_type" : "_doc",
  "_id" : "4JueKHYBtwZVzHJGfnRv",
  "_version" : 1,
  "result" : "created",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 2,
  "_primary_term" : 1
}

到目前为止,我们已经生成了 5 个文档。它们都位于 .ds-demo-ds-000001 索引里。还记得我们之前在 ILM 中定义的 Policy 吗?当文档的个数超过5个的时候,它会自动 rollover, 并且会把之前的文档移到 warm 的节点上。我们接着执行如下的命令:

POST demo-ds/_doc?pipeline=add-timestamp
{
  "user": {
    "id": "liuxg"
  },
  "message": "This is so cool!"
}

这样总的文档个数变为6个了。我们可以使用如下的命来检查 ILM 的状态:

# Check ILM status per demo-ds data stream
GET demo-ds/_ilm/explain

上面的命令显示:

{
  "indices" : {
    ".ds-demo-ds-000001" : {
      "index" : ".ds-demo-ds-000001",
      "managed" : true,
      "policy" : "demo",
      "lifecycle_date_millis" : 1606989315969,
      "age" : "2.86h",
      "phase" : "hot",
      "phase_time_millis" : 1606989317057,
      "action" : "rollover",
      "action_time_millis" : 1606989760360,
      "step" : "check-rollover-ready",
      "step_time_millis" : 1606989760360,
      "phase_execution" : {
        "policy" : "demo",
        "phase_definition" : {
          "min_age" : "0ms",
          "actions" : {
            "rollover" : {
              "max_size" : "1gb",
              "max_age" : "30d",
              "max_docs" : 5
            },
            "set_priority" : {
              "priority" : 100
            }
          }
        },
        "version" : 1,
        "modified_date_in_millis" : 1606988416663
      }
    }
  }
}

上面显示 action 为 rollover。我们稍等一段时间让 rollover 发生。这个时间是由如下的参数来决定的:

indices.lifecycle.poll_interval

我们可以在地址找到这个参数的设置。在默认的情况下,这个参数是10分钟的时间。我们需要等一段时间。我们也可以通过如下的命令来进行修改这个时间:

PUT _cluster/settings
{
    "transient": {
      "indices.lifecycle.poll_interval": "10s"
    }
}

上面表明 Elasticsearch 每隔10秒钟进行查询,并执行 ILM policy。我们可以再次执行如下的命令:

GET demo-ds/_ilm/explain

上面命令的输出显示:

{
  "indices" : {
    ".ds-demo-ds-000002" : {
      "index" : ".ds-demo-ds-000002",
      "managed" : true,
      "policy" : "demo",
      "lifecycle_date_millis" : 1606999959439,
      "age" : "5.22m",
      "phase" : "hot",
      "phase_time_millis" : 1606999961410,
      "action" : "rollover",
      "action_time_millis" : 1607000204426,
      "step" : "check-rollover-ready",
      "step_time_millis" : 1607000204426,
      "phase_execution" : {
        "policy" : "demo",
        "phase_definition" : {
          "min_age" : "0ms",
          "actions" : {
            "rollover" : {
              "max_size" : "1gb",
              "max_age" : "30d",
              "max_docs" : 5
            },
            "set_priority" : {
              "priority" : 100
            }
          }
        },
        "version" : 1,
        "modified_date_in_millis" : 1606988416663
      }
    },
    ".ds-demo-ds-000001" : {
      "index" : ".ds-demo-ds-000001",
      "managed" : true,
      "policy" : "demo",
      "lifecycle_date_millis" : 1606999959458,
      "age" : "5.22m",
      "phase" : "warm",
      "phase_time_millis" : 1606999962564,
      "action" : "shrink",
      "action_time_millis" : 1607000207450,
      "step" : "shrink",
      "step_time_millis" : 1607000271877,
      "phase_execution" : {
        "policy" : "demo",
        "phase_definition" : {
          "min_age" : "0ms",
          "actions" : {
            "allocate" : {
              "number_of_replicas" : 0,
              "include" : { },
              "exclude" : { },
              "require" : {
                "data" : "warm"
              }
            },
            "shrink" : {
              "number_of_shards" : 1
            },
            "set_priority" : {
              "priority" : 50
            }
          }
        },
        "version" : 1,
        "modified_date_in_millis" : 1606988416663
      }
    }
  }
}

有一个新增加的 Warm phase, 并伴有一个 shrink 的 action。

我们再次执行如下的指令:

GET demo-ds/_search

它显示:

{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 3,
    "successful" : 3,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 8,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "shrink-.ds-demo-ds-000001",
        "_type" : "_doc",
        "_id" : "3ZueKHYBtwZVzHJGdHRU",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2020-12-03T12:40:41.812609Z",
          "message" : "This is so cool!",
          "user" : {
            "id" : "liuxg"
          }
        }
      },
      {
        "_index" : "shrink-.ds-demo-ds-000001",
        "_type" : "_doc",
        "_id" : "3pueKHYBtwZVzHJGd3Sd",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2020-12-03T12:40:42.653342Z",
          "message" : "This is so cool!",
          "user" : {
            "id" : "liuxg"
          }
        }
      },
      {
        "_index" : "shrink-.ds-demo-ds-000001",
        "_type" : "_doc",
        "_id" : "4JueKHYBtwZVzHJGfnRv",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2020-12-03T12:40:44.399382Z",
          "message" : "This is so cool!",
          "user" : {
            "id" : "liuxg"
          }
        }
      },
      {
        "_index" : "shrink-.ds-demo-ds-000001",
        "_type" : "_doc",
        "_id" : "4puiKHYBtwZVzHJGeHT3",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2020-12-03T12:45:05.143514Z",
          "message" : "This is so cool!",
          "user" : {
            "id" : "liuxg"
          }
        }
      },
      {
        "_index" : "shrink-.ds-demo-ds-000001",
        "_type" : "_doc",
        "_id" : "45unKHYBtwZVzHJGK3RB",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2020-12-03T12:50:12.929209Z",
          "message" : "This is so cool!",
          "user" : {
            "id" : "liuxg"
          }
        }
      },
      {
        "_index" : "shrink-.ds-demo-ds-000001",
        "_type" : "_doc",
        "_id" : "3JsVKHYBtwZVzHJGZXRc",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2020-12-03T10:10:59.541518Z",
          "message" : "This is so cool!",
          "user" : {
            "id" : "liuxg"
          }
        }
      },
      {
        "_index" : "shrink-.ds-demo-ds-000001",
        "_type" : "_doc",
        "_id" : "35ueKHYBtwZVzHJGe3S4",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2020-12-03T12:40:43.704291Z",
          "message" : "This is so cool!",
          "user" : {
            "id" : "liuxg"
          }
        }
      },
      {
        "_index" : "shrink-.ds-demo-ds-000001",
        "_type" : "_doc",
        "_id" : "4ZuhKHYBtwZVzHJGb3Si",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2020-12-03T12:43:57.217932Z",
          "message" : "This is so cool!",
          "user" : {
            "id" : "liuxg"
          }
        }
      }
    ]
  }
}

从上面显示我们可以看出来,所有的8个文档都已经放置于 shrink-.ds-demo-ds-000001 这个索引中了。这个就是我们在 Warm phase 中所定义的那样,shrink 到一个 primary 索引中了。

我们再次执行如下的命令:

POST demo-ds/_doc?pipeline=add-timestamp
{
  "user": {
    "id": "liuxg"
  },
  "message": "This is so cool!"
}

上面的命令输出显示:

{
  "_index" : ".ds-demo-ds-000002",
  "_type" : "_doc",
  "_id" : "C5uzKHYBtwZVzHJGDHW6",
  "_version" : 1,
  "result" : "created",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 0,
  "_primary_term" : 1
}

从上面我们可以看出来:新的文档保存于一个叫做 .ds-demo-ds-000002 的索引当中。这是一个新的索引,和之前的 .ds-demo-ds-000001 是不一样的。

当我们执行:

# Search the data stream
GET demo-ds/_search

它将显示所有的9个文档:

{
  "took" : 1,
  "timed_out" : false,
  "_shards" : {
    "total" : 3,
    "successful" : 3,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 9,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "shrink-.ds-demo-ds-000001",
        "_type" : "_doc",
        "_id" : "3ZueKHYBtwZVzHJGdHRU",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2020-12-03T12:40:41.812609Z",
          "message" : "This is so cool!",
          "user" : {
            "id" : "liuxg"
          }
        }
      },
      {
        "_index" : "shrink-.ds-demo-ds-000001",
        "_type" : "_doc",
        "_id" : "3pueKHYBtwZVzHJGd3Sd",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2020-12-03T12:40:42.653342Z",
          "message" : "This is so cool!",
          "user" : {
            "id" : "liuxg"
          }
        }
      },
      {
        "_index" : "shrink-.ds-demo-ds-000001",
        "_type" : "_doc",
        "_id" : "4JueKHYBtwZVzHJGfnRv",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2020-12-03T12:40:44.399382Z",
          "message" : "This is so cool!",
          "user" : {
            "id" : "liuxg"
          }
        }
      },
      {
        "_index" : "shrink-.ds-demo-ds-000001",
        "_type" : "_doc",
        "_id" : "4puiKHYBtwZVzHJGeHT3",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2020-12-03T12:45:05.143514Z",
          "message" : "This is so cool!",
          "user" : {
            "id" : "liuxg"
          }
        }
      },
      {
        "_index" : "shrink-.ds-demo-ds-000001",
        "_type" : "_doc",
        "_id" : "45unKHYBtwZVzHJGK3RB",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2020-12-03T12:50:12.929209Z",
          "message" : "This is so cool!",
          "user" : {
            "id" : "liuxg"
          }
        }
      },
      {
        "_index" : "shrink-.ds-demo-ds-000001",
        "_type" : "_doc",
        "_id" : "3JsVKHYBtwZVzHJGZXRc",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2020-12-03T10:10:59.541518Z",
          "message" : "This is so cool!",
          "user" : {
            "id" : "liuxg"
          }
        }
      },
      {
        "_index" : "shrink-.ds-demo-ds-000001",
        "_type" : "_doc",
        "_id" : "35ueKHYBtwZVzHJGe3S4",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2020-12-03T12:40:43.704291Z",
          "message" : "This is so cool!",
          "user" : {
            "id" : "liuxg"
          }
        }
      },
      {
        "_index" : "shrink-.ds-demo-ds-000001",
        "_type" : "_doc",
        "_id" : "4ZuhKHYBtwZVzHJGb3Si",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2020-12-03T12:43:57.217932Z",
          "message" : "This is so cool!",
          "user" : {
            "id" : "liuxg"
          }
        }
      },
      {
        "_index" : ".ds-demo-ds-000002",
        "_type" : "_doc",
        "_id" : "C5uzKHYBtwZVzHJGDHW6",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2020-12-03T13:03:11.546240Z",
          "message" : "This is so cool!",
          "user" : {
            "id" : "liuxg"
          }
        }
      }
    ]
  }
}

它们分别位于 shrink-.ds-demo-ds-000001 及 .ds-demo-ds-000002 索引中。

我们也可以通过如下的命令来查看 data stream 的当前索引信息:

GET _data_stream/demo-ds
{
  "data_streams" : [
    {
      "name" : "demo-ds",
      "timestamp_field" : {
        "name" : "@timestamp"
      },
      "indices" : [
        {
          "index_name" : "shrink-.ds-demo-ds-000001",
          "index_uuid" : "SMlpBdzdTSala5hMt0XmpQ"
        },
        {
          "index_name" : ".ds-demo-ds-000002",
          "index_uuid" : "1uZk3ug0SfmD-1UUgaeqDw"
        }
      ],
      "generation" : 2,
      "status" : "YELLOW",
      "template" : "template_demo",
      "ilm_policy" : "demo"
    }
  ]
}

上面显示有两个索引:shrink-.ds-demo-ds-000001 及 .ds-demo-ds-000002。

我们再等一会执行如下的命令:

GET demo-ds/_ilm/explain

上面的命令显示:

{
  "indices" : {
    "shrink-.ds-demo-ds-000001" : {
      "index" : "shrink-.ds-demo-ds-000001",
      "managed" : true,
      "policy" : "demo",
      "lifecycle_date_millis" : 1606999959458,
      "age" : "13.83m",
      "phase" : "delete",
      "phase_time_millis" : 1607000275318,
      "action" : "complete",
      "action_time_millis" : 1607000275157,
      "step" : "complete",
      "step_time_millis" : 1607000275318,
      "phase_execution" : {
        "policy" : "demo",
        "phase_definition" : {
          "min_age" : "3m",
          "actions" : { }
        },
        "version" : 1,
        "modified_date_in_millis" : 1606988416663
      }
    },
    ".ds-demo-ds-000002" : {
      "index" : ".ds-demo-ds-000002",
      "managed" : true,
      "policy" : "demo",
      "lifecycle_date_millis" : 1606999959439,
      "age" : "13.83m",
      "phase" : "hot",
      "phase_time_millis" : 1606999961410,
      "action" : "rollover",
      "action_time_millis" : 1607000204426,
      "step" : "check-rollover-ready",
      "step_time_millis" : 1607000204426,
      "phase_execution" : {
        "policy" : "demo",
        "phase_definition" : {
          "min_age" : "0ms",
          "actions" : {
            "rollover" : {
              "max_size" : "1gb",
              "max_age" : "30d",
              "max_docs" : 5
            },
            "set_priority" : {
              "priority" : 100
            }
          }
        },
        "version" : 1,
        "modified_date_in_millis" : 1606988416663
      }
    }
  }
}

我们可以看到 shrink-.ds-demo-ds-000001 处于 delete 阶段。

这也是在我们之前的 ILM policy 中所定义的那样。当时间超过 3 分钟后会启动删除的动作。我们再接着执行如下的命令直到有11个以上的文档:

POST demo-ds/_doc?pipeline=add-timestamp
{
  "user": {
    "id": "liuxg"
  },
  "message": "This is so cool!"
}

我们再次使用如下的命令:

# Search the data stream
GET demo-ds/_search

我们可以看到如下的结果:

{
  "took" : 1,
  "timed_out" : false,
  "_shards" : {
    "total" : 3,
    "successful" : 3,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 6,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "shrink-.ds-demo-ds-000002",
        "_type" : "_doc",
        "_id" : "PO34KHYB_Lwe3F0spf66",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2020-12-03T14:19:12.698241Z",
          "message" : "This is so cool!",
          "user" : {
            "id" : "liuxg"
          }
        }
      },
      {
        "_index" : "shrink-.ds-demo-ds-000002",
        "_type" : "_doc",
        "_id" : "Pe34KHYB_Lwe3F0sp_7i",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2020-12-03T14:19:13.249898Z",
          "message" : "This is so cool!",
          "user" : {
            "id" : "liuxg"
          }
        }
      },
      {
        "_index" : "shrink-.ds-demo-ds-000002",
        "_type" : "_doc",
        "_id" : "Ou34KHYB_Lwe3F0soP6E",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2020-12-03T14:19:11.364617Z",
          "message" : "This is so cool!",
          "user" : {
            "id" : "liuxg"
          }
        }
      },
      {
        "_index" : "shrink-.ds-demo-ds-000002",
        "_type" : "_doc",
        "_id" : "O-34KHYB_Lwe3F0so_54",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2020-12-03T14:19:12.120209Z",
          "message" : "This is so cool!",
          "user" : {
            "id" : "liuxg"
          }
        }
      },
      {
        "_index" : "shrink-.ds-demo-ds-000002",
        "_type" : "_doc",
        "_id" : "Pu34KHYB_Lwe3F0sqv4S",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2020-12-03T14:19:13.809864Z",
          "message" : "This is so cool!",
          "user" : {
            "id" : "liuxg"
          }
        }
      },
      {
        "_index" : "shrink-.ds-demo-ds-000002",
        "_type" : "_doc",
        "_id" : "P-34KHYB_Lwe3F0srP4N",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2020-12-03T14:19:14.317471Z",
          "message" : "This is so cool!",
          "user" : {
            "id" : "liuxg"
          }
        }
      }
    ]
  }
}

我们可以看到之前的 shrink-.ds-demo-ds-000001 索引不见了。它在3分钟过后被删除了。我们可以通过如下的命令获得 data stream 的信息:

# Get data stream information
GET _data_stream/demo-ds/_stats
{
  "_shards" : {
    "total" : 5,
    "successful" : 3,
    "failed" : 0
  },
  "data_stream_count" : 1,
  "backing_indices" : 2,
  "total_store_size_bytes" : 29508,
  "data_streams" : [
    {
      "data_stream" : "demo-ds",
      "backing_indices" : 2,
      "store_size_bytes" : 29508,
      "maximum_timestamp" : 1607000591546
    }
  ]
}

使用 alias

有时我们习惯使用自己的名称来写入一个数据。在这种情况下,我们可以为 data stream 创建一个别名,比如:

POST demo_ds/_alias/my_ds
{
  "is_write_index": true
}

在上面我们为 demo_ds 创建了一个别名 my_ds。我们可以通过 my_ds 来写入数据。这个数据最终会进入到后备的索引中去。

删除一个 data stream

我们可以通过如下的命令来删除一个 data stream:

DELETE _data_stream/demo-ds

另外一种方式就是通过 Kibana 界面来进行删除:

点击上面的按钮,我们就可以删除这个 data stream。

Reindex data stream

我们可以创建一个新的 data stream template:

PUT /_data_stream/new-data-stream

当我们针对 data stream 进行 reindex 时,我们必须使用 op_type 为 create:

POST /reindex
{
    "source": {
      "index": "my-data-stream"
    },
    "dest": {
      "index": "new-data-stream",
      "op_type": "create" 
    }
}

当我们使用 _bulk 来写入一个 data stream 时,你也应该使用 create 操作。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐