这是继上一篇文章 “Elasticsearch:Data streams(一)” 的续篇。在上一篇文章中,我详述了如何创建 ILM 策略,index template 以及数据流的相关知识。设置数据流后,你可以执行以下操作:

  • 将文档添加到数据流
  • 搜索数据流
  • 获取数据流的统计信息
  • 手动翻转数据流
  • 开放及关闭后备索引
  • 使用数据流重建索引
  • 通过查询更新数据流中的文档
  • 通过查询删除数据流中的文档
  • 更新或删除支持索引中的文档

将文档添加到数据流

要添加单个文档,请使用 index API。 支持 ingest pipeline。首先我们创建如下的一个 ingest pipeline:

PUT _ingest/pipeline/add-timestamp
{
  "processors": [
    {
      "set": {
        "field": "@timestamp",
        "value": "{{_ingest.timestamp}}"
      }
    }
  ]
}

上述的命令为事件添加一个当前 ingest pipeline 运行时的时间戳。我们可以通过如下的命令来写入一个文档到 Elasticsearch 中:

POST /my-data-stream/_doc/?pipeline=add-timestamp
{
  "user": {
    "id": "8a4f500d"
  },
  "message": "Login successful"
}

您不能使用索引 API 的 PUT /<target>/_doc/<_id> 请求格式将新文档添加到数据流。 要指定文档 ID,请改用 PUT /<target>/_create/<_id> 格式。 仅支持 op_type 的创建。

要通过单个请求添加多个文档,请使用 _bulk API。 仅支持创建操作。

PUT /my-data-stream/_bulk?pipeline=add-timestamp&refresh
{"create":{ }}
{ "user": { "id": "vlb44hny" }, "message": "Login attempt failed" }
{"create":{ }}
{ "user": { "id": "8a4f500d" }, "message": "Login successful" }
{"create":{ }}
{ "user": { "id": "l7gk7f82" }, "message": "Logout successful" }

搜索数据流

以下搜索 API 支持数据流:

例子,我们使用 search API 来搜索上面写入的文档:

GET my-data-stream/_search?filter_path=**.hits

上面的命令的响应为:

{
  "hits": {
    "hits": [
      {
        "_index": ".ds-my-data-stream-2022.11.17-000001",
        "_id": "ODWUhIQBSwCQ4y3lc_VM",
        "_score": 1,
        "_source": {
          "message": "Login attempt failed",
          "user": {
            "id": "vlb44hny"
          },
          "@timestamp": "2022-11-17T07:53:52.203397799Z"
        }
      },
      {
        "_index": ".ds-my-data-stream-2022.11.17-000001",
        "_id": "OTWUhIQBSwCQ4y3lc_VM",
        "_score": 1,
        "_source": {
          "message": "Login successful",
          "user": {
            "id": "8a4f500d"
          },
          "@timestamp": "2022-11-17T07:53:52.203707924Z"
        }
      },
      {
        "_index": ".ds-my-data-stream-2022.11.17-000001",
        "_id": "OjWUhIQBSwCQ4y3lc_VM",
        "_score": 1,
        "_source": {
          "message": "Logout successful",
          "user": {
            "id": "l7gk7f82"
          },
          "@timestamp": "2022-11-17T07:53:52.203796507Z"
        }
      },
      {
        "_index": ".ds-my-data-stream-2022.11.17-000001",
        "_id": "NzWPhIQBSwCQ4y3lz_V1",
        "_score": 1,
        "_source": {
          "message": "Login successful",
          "user": {
            "id": "8a4f500d"
          },
          "@timestamp": "2022-11-17T07:48:47.915655422Z"
        }
      }
    ]
  }
}

获取数据流的统计信息

使用数据流统计 API 获取一个或多个数据流的统计信息:

GET /_data_stream/my-data-stream/_stats?human=true

上面的命令响应为:

{
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "data_stream_count": 1,
  "backing_indices": 1,
  "total_store_size": "9.5kb",
  "total_store_size_bytes": 9762,
  "data_streams": [
    {
      "data_stream": "my-data-stream",
      "backing_indices": 1,
      "store_size": "9.5kb",
      "store_size_bytes": 9762,
      "maximum_timestamp": 1668671632203
    }
  ]
}

手动滚动数据流

使用 rollover API 手动滚动数据流:

POST /my-data-stream/_rollover/

上面的命令返回:

{
  "acknowledged": true,
  "shards_acknowledged": true,
  "old_index": ".ds-my-data-stream-2022.11.17-000001",
  "new_index": ".ds-my-data-stream-2022.11.17-000002",
  "rolled_over": true,
  "dry_run": false,
  "conditions": {}
}

上面显示有两个 backing indices。其中的 .ds-my-data-stream-2022.11.17-000002 是新的 write index。我们可以使用 _stat API 来进行查看:

GET /_data_stream/my-data-stream/_stats?human=true
{
  "_shards": {
    "total": 3,
    "successful": 2,
    "failed": 0
  },
  "data_stream_count": 1,
  "backing_indices": 2,
  "total_store_size": "5.3kb",
  "total_store_size_bytes": 5475,
  "data_streams": [
    {
      "data_stream": "my-data-stream",
      "backing_indices": 2,
      "store_size": "5.3kb",
      "store_size_bytes": 5475,
      "maximum_timestamp": 1668671632203
    }
  ]
}

从上面的输出中,我们可以看出来,有两个 backing_indices。

GET .ds-my-data-stream-2022.11.17-000001/_count

上面的命令返回:

{
  "count": 4,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  }
}

也就是说之前的 4 个文档被写入到 .ds-my-data-stream-2022.11.17-000001 索引中,而之后写入的文档会写入到 .ds-my-data-stream-2022.11.17-000002 索引中。目前它的文档数为 0:

GET .ds-my-data-stream-2022.11.17-000002/_count

上面的命令返回:

{
  "count": 0,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  }
}

开放封闭后备索引

你无法搜索封闭(close)的后备索引,即使通过搜索其数据流也是如此。你也不能更新(update)或删除(delete)封闭索引中的文档。

要重新打开封闭的后备索引,请直接向索引提交 open index API 请求

POST /.ds-my-data-stream-2022.11.17-000002/_open

要重新打开数据流的所有关闭后备索引,请向流提交打开索引 API 请求:

POST /my-data-stream/_open/

使用数据流重建索引(reindex)

使用 reindex API 将文档从现有索引、别名或数据流复制到数据流。 因为数据流是仅附加(append_only)的,所以对数据流的重新索引(reindex)必须使 op_type 为 create。 重建索引无法更新数据流中的现有文档。

POST /_reindex
{
  "source": {
    "index": "archive"
  },
  "dest": {
    "index": "my-data-stream",
    "op_type": "create"
  }
}

通过查询更新数据流中的文档

使用 update by query API 来更新数据流中与提供的查询匹配的文档:

POST /my-data-stream/_update_by_query
{
  "query": {
    "match": {
      "user.id": "l7gk7f82"
    }
  },
  "script": {
    "source": "ctx._source.user.id = params.new_id",
    "params": {
      "new_id": "XgdX0NoX"
    }
  }
}

通过查询删除数据流中的文档

使用 delete by query API 删除数据流中与提供的查询匹配的文档:

POST /my-data-stream/_delete_by_query
{
  "query": {
    "match": {
      "user.id": "vlb44hny"
    }
  }
}

更新或删除支持索引中的文档

如果需要,你可以通过向包含文档的后备索引发送请求来更新或删除数据流中的文档。 你需要:

要获取此信息,请使用搜索请求:

GET /my-data-stream/_search
{
  "seq_no_primary_term": true,
  "query": {
    "match": {
      "user.id": "yWIumJd7"
    }
  }
}

相应:

{
  "took": 20,
  "timed_out": false,
  "_shards": {
    "total": 3,
    "successful": 3,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 1,
      "relation": "eq"
    },
    "max_score": 0.2876821,
    "hits": [
      {
        "_index": ".ds-my-data-stream-2099.03.08-000003",  #1    
        "_id": "bfspvnIBr7VVZlfp2lqX",              #2
        "_seq_no": 0,                               #3
        "_primary_term": 1,                         #4
        "_score": 0.2876821,
        "_source": {
          "@timestamp": "2099-03-08T11:06:07.000Z",
          "user": {
            "id": "yWIumJd7"
          },
          "message": "Login successful"
        }
      }
    ]
  }
}

说明:

  • #1 包含匹配文档的后备索引
  • #2 文档的文档 ID
  • #3 文档的当前序号
  • #4 文档的主要术语

要更新文档,请使用带有有效 if_seq_no 和 if_primary_term 参数的索引 API 请求:

PUT /.ds-my-data-stream-2099-03-08-000003/_doc/bfspvnIBr7VVZlfp2lqX?if_seq_no=0&if_primary_term=1
{
  "@timestamp": "2099-03-08T11:06:07.000Z",
  "user": {
    "id": "8a4f500d"
  },
  "message": "Login successful"
}

详细关于更新文档的操作,请阅读文章 “Elasticsearch:深刻理解文档中的 verision 及乐观并发控制”。

要删除文档,请使用删除 API:

DELETE /.ds-my-data-stream-2099.03.08-000003/_doc/bfspvnIBr7VVZlfp2lqX

要使用单个请求删除或更新多个文档,请使用 _bulk API 的删除、索引和更新操作。 对于索引操作,包括有效的 if_seq_no 和 if_primary_term 参数。

PUT /_bulk?refresh
{ "index": { "_index": ".ds-my-data-stream-2099.03.08-000003", "_id": "bfspvnIBr7VVZlfp2lqX", "if_seq_no": 0, "if_primary_term": 1 } }
{ "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }

数据流生命周期演示

我们在这个部分来演示文档在写入 Elasticsearch 后的生命周期管理。如果你先前已经做个上面的练习,经过三分钟后,rollover 发生后,之前写入的文档将被删除,而在当前 write index 里的文档还会继续存在。为了确保你有一个干净的环境,我们做如下的操作:

DELETE _data_stream/my-data-stream

接下来,我们执行如下的操作 5 次:

POST /my-data-stream/_doc/?pipeline=add-timestamp
{
  "user": {
    "id": "8a4f500d"
  },
  "message": "Login successful"
}

我们使用如下的命令来进行查看:

GET _data_stream/my-data-stream

上面的命令显示:

{
  "data_streams": [
    {
      "name": "my-data-stream",
      "timestamp_field": {
        "name": "@timestamp"
      },
      "indices": [
        {
          "index_name": ".ds-my-data-stream-2022.11.18-000001",
          "index_uuid": "ln2AtGG0S4CKGs9kAvsSsQ"
        }
      ],
      "generation": 1,
      "_meta": {
        "my-custom-meta-field": "More arbitrary metadata",
        "description": "Template for my time series data"
      },
      "status": "YELLOW",
      "template": "my-index-template",
      "ilm_policy": "my-lifecycle-policy",
      "hidden": false,
      "system": false,
      "allow_custom_routing": false,
      "replicated": false
    }
  ]
}

从上面的输出中,我们可以看到有一个叫做 .ds-my-data-stream-2022.11.18-000001 的索引已经生成了。在我们的生命周期中,我们设置 primary 文档的最大数为 5 个,如果我们再向这个数据流中写入文档的话,那么 rollover 就会发生,并且在 rollover 发生后的 3 分钟之内,.ds-my-data-stream-2022.11.18-000001 将会被自动删除。在 rollover 发生后,它会自动转入到 warm phase。经过一段时间后,我们进行查看:

GET _data_stream/my-data-stream
{
  "data_streams": [
    {
      "name": "my-data-stream",
      "timestamp_field": {
        "name": "@timestamp"
      },
      "indices": [
        {
          "index_name": ".ds-my-data-stream-2022.11.18-000001",
          "index_uuid": "ln2AtGG0S4CKGs9kAvsSsQ"
        },
        {
          "index_name": ".ds-my-data-stream-2022.11.18-000002",
          "index_uuid": "azlD_LO9QJqXW1akhLRGAA"
        }
      ],
      "generation": 2,
      "_meta": {
        "my-custom-meta-field": "More arbitrary metadata",
        "description": "Template for my time series data"
      },
      "status": "YELLOW",
      "template": "my-index-template",
      "ilm_policy": "my-lifecycle-policy",
      "hidden": false,
      "system": false,
      "allow_custom_routing": false,
      "replicated": false
    }
  ]
}
GET my-data-stream/_ilm/explain
{
  "indices": {
    ".ds-my-data-stream-2022.11.18-000002": {
      "index": ".ds-my-data-stream-2022.11.18-000002",
      "managed": true,
      "policy": "my-lifecycle-policy",
      "index_creation_date_millis": 1668746714257,
      "time_since_index_creation": "4.22m",
      "lifecycle_date_millis": 1668746714257,
      "age": "4.22m",
      "phase": "hot",
      "phase_time_millis": 1668746714298,
      "action": "rollover",
      "action_time_millis": 1668746714498,
      "step": "check-rollover-ready",
      "step_time_millis": 1668746714498,
      "phase_execution": {
        "policy": "my-lifecycle-policy",
        "phase_definition": {
          "min_age": "0ms",
          "actions": {
            "rollover": {
              "max_primary_shard_size": "50gb",
              "max_age": "30d",
              "max_docs": 5,
              "max_primary_shard_docs": 5
            },
            "set_priority": {
              "priority": 204
            }
          }
        },
        "version": 1,
        "modified_date_in_millis": 1668666436429
      }
    }
  }
}

上面显示当前的 write index 为 .ds-my-data-stream-2022.11.18-000002。它处于 hot phase,它的 action 为 rollover。

我们再次使用如下的命令来进行查看:

GET /_data_stream/my-data-stream/_stats?human=true

上面的命令返回的结果为:

{
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "data_stream_count": 1,
  "backing_indices": 1,
  "total_store_size": "225b",
  "total_store_size_bytes": 225,
  "data_streams": [
    {
      "data_stream": "my-data-stream",
      "backing_indices": 1,
      "store_size": "225b",
      "store_size_bytes": 225,
      "maximum_timestamp": 0
    }
  ]
}

也即当前的后备索引也就只有一个,也就是 .ds-my-data-stream-2022.11.18-000002, 而之前的那个 .ds-my-data-stream-2022.11.18-000001 已经在三分钟之后自动被删除了。

GET .ds-my-data-stream-2022.11.18-000002/_count

上面的命令显示为:

{
  "count": 0,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  }
}

也即文档数为 0。我们再次执行命令 5 次:

POST /my-data-stream/_doc/?pipeline=add-timestamp
{
  "user": {
    "id": "8a4f500d"
  },
  "message": "Login successful"
}
GET my-data-stream/_ilm/explain
{
  "indices": {
    ".ds-my-data-stream-2022.11.18-000002": {
      "index": ".ds-my-data-stream-2022.11.18-000002",
      "managed": true,
      "policy": "my-lifecycle-policy",
      "index_creation_date_millis": 1668746714257,
      "time_since_index_creation": "11.85m",
      "lifecycle_date_millis": 1668746714257,
      "age": "11.85m",
      "phase": "hot",
      "phase_time_millis": 1668746714298,
      "action": "rollover",
      "action_time_millis": 1668746714498,
      "step": "check-rollover-ready",
      "step_time_millis": 1668746714498,
      "phase_execution": {
        "policy": "my-lifecycle-policy",
        "phase_definition": {
          "min_age": "0ms",
          "actions": {
            "rollover": {
              "max_primary_shard_size": "50gb",
              "max_age": "30d",
              "max_docs": 5,
              "max_primary_shard_docs": 5
            },
            "set_priority": {
              "priority": 204
            }
          }
        },
        "version": 1,
        "modified_date_in_millis": 1668666436429
      }
    }
  }
}

再过一会儿执行命令:

GET my-data-stream/_ilm/explain
{
  "indices": {
    ".ds-my-data-stream-2022.11.18-000002": {
      "index": ".ds-my-data-stream-2022.11.18-000002",
      "managed": true,
      "policy": "my-lifecycle-policy",
      "index_creation_date_millis": 1668746714257,
      "time_since_index_creation": "12.86m",
      "lifecycle_date_millis": 1668747484089,
      "age": "2.06s",
      "phase": "warm",
      "phase_time_millis": 1668747484490,
      "action": "forcemerge",
      "action_time_millis": 1668747485091,
      "step": "segment-count",
      "step_time_millis": 1668747485091,
      "phase_execution": {
        "policy": "my-lifecycle-policy",
        "phase_definition": {
          "min_age": "0d",
          "actions": {
            "allocate": {
              "number_of_replicas": 0,
              "include": {},
              "exclude": {},
              "require": {}
            },
            "forcemerge": {
              "max_num_segments": 1
            },
            "set_priority": {
              "priority": 50
            },
            "shrink": {
              "number_of_shards": 1
            }
          }
        },
        "version": 1,
        "modified_date_in_millis": 1668666436429
      }
    },
    ".ds-my-data-stream-2022.11.18-000004": {
      "index": ".ds-my-data-stream-2022.11.18-000004",
      "managed": true,
      "policy": "my-lifecycle-policy",
      "index_creation_date_millis": 1668747484227,
      "time_since_index_creation": "1.93s",
      "lifecycle_date_millis": 1668747484227,
      "age": "1.93s",
      "phase": "hot",
      "phase_time_millis": 1668747484290,
      "action": "rollover",
      "action_time_millis": 1668747484490,
      "step": "check-rollover-ready",
      "step_time_millis": 1668747484490,
      "phase_execution": {
        "policy": "my-lifecycle-policy",
        "phase_definition": {
          "min_age": "0ms",
          "actions": {
            "rollover": {
              "max_primary_shard_size": "50gb",
              "max_age": "30d",
              "max_docs": 5,
              "max_primary_shard_docs": 5
            },
            "set_priority": {
              "priority": 204
            }
          }
        },
        "version": 1,
        "modified_date_in_millis": 1668666436429
      }
    }
  }
}

再过一会儿执行命令:

{
  "indices": {
    ".ds-my-data-stream-2022.11.18-000002": {
      "index": ".ds-my-data-stream-2022.11.18-000002",
      "managed": true,
      "policy": "my-lifecycle-policy",
      "index_creation_date_millis": 1668746714257,
      "time_since_index_creation": "13.51m",
      "lifecycle_date_millis": 1668747484089,
      "age": "41.29s",
      "phase": "warm",
      "phase_time_millis": 1668747484490,
      "action": "complete",
      "action_time_millis": 1668747493998,
      "step": "complete",
      "step_time_millis": 1668747493998,
      "phase_execution": {
        "policy": "my-lifecycle-policy",
        "phase_definition": {
          "min_age": "0d",
          "actions": {
            "allocate": {
              "number_of_replicas": 0,
              "include": {},
              "exclude": {},
              "require": {}
            },
            "forcemerge": {
              "max_num_segments": 1
            },
            "set_priority": {
              "priority": 50
            },
            "shrink": {
              "number_of_shards": 1
            }
          }
        },
        "version": 1,
        "modified_date_in_millis": 1668666436429
      }
    },
    ".ds-my-data-stream-2022.11.18-000004": {
      "index": ".ds-my-data-stream-2022.11.18-000004",
      "managed": true,
      "policy": "my-lifecycle-policy",
      "index_creation_date_millis": 1668747484227,
      "time_since_index_creation": "41.15s",
      "lifecycle_date_millis": 1668747484227,
      "age": "41.15s",
      "phase": "hot",
      "phase_time_millis": 1668747484290,
      "action": "rollover",
      "action_time_millis": 1668747484490,
      "step": "check-rollover-ready",
      "step_time_millis": 1668747484490,
      "phase_execution": {
        "policy": "my-lifecycle-policy",
        "phase_definition": {
          "min_age": "0ms",
          "actions": {
            "rollover": {
              "max_primary_shard_size": "50gb",
              "max_age": "30d",
              "max_docs": 5,
              "max_primary_shard_docs": 5
            },
            "set_priority": {
              "priority": 204
            }
          }
        },
        "version": 1,
        "modified_date_in_millis": 1668666436429
      }
    }
  }
}

我们看到在 warm 阶段,已经是完成了。

再次执行命令:

GET _data_stream/my-data-stream
{
  "data_streams": [
    {
      "name": "my-data-stream",
      "timestamp_field": {
        "name": "@timestamp"
      },
      "indices": [
        {
          "index_name": ".ds-my-data-stream-2022.11.18-000002",
          "index_uuid": "azlD_LO9QJqXW1akhLRGAA"
        },
        {
          "index_name": ".ds-my-data-stream-2022.11.18-000004",
          "index_uuid": "2nq9klz0Qiir5UA2_s1I1w"
        }
      ],
      "generation": 4,
      "_meta": {
        "my-custom-meta-field": "More arbitrary metadata",
        "description": "Template for my time series data"
      },
      "status": "YELLOW",
      "template": "my-index-template",
      "ilm_policy": "my-lifecycle-policy",
      "hidden": false,
      "system": false,
      "allow_custom_routing": false,
      "replicated": false
    }
  ]
}

上面显示有两个索引同时存在:.ds-my-data-stream-2022.11.18-000002 及 .ds-my-data-stream-2022.11.18-000004。

再过一段时间,再次执行:

GET _data_stream/my-data-stream

上面的命令显示:

{
  "data_streams": [
    {
      "name": "my-data-stream",
      "timestamp_field": {
        "name": "@timestamp"
      },
      "indices": [
        {
          "index_name": ".ds-my-data-stream-2022.11.18-000004",
          "index_uuid": "2nq9klz0Qiir5UA2_s1I1w"
        }
      ],
      "generation": 5,
      "_meta": {
        "my-custom-meta-field": "More arbitrary metadata",
        "description": "Template for my time series data"
      },
      "status": "YELLOW",
      "template": "my-index-template",
      "ilm_policy": "my-lifecycle-policy",
      "hidden": false,
      "system": false,
      "allow_custom_routing": false,
      "replicated": false
    }
  ]
}

从返回的数据中,我们可以看到只有一个索引 .ds-my-data-stream-2022.11.18-000004 存在。之前的那个索引 .ds-my-data-stream-2022.11.18-000002 已经被删除了。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐