我们之前看见了在 Elasticsearch 里的 ingest node 里,我们可以通过以下 processor 的处理帮我们处理我们的一些数据。它们的功能是非常具体而明确的。那么在 Elasticsearch 里,有没有一种更加灵活的方式可供我们来进行编程处理呢?如果有,它使用的语言是什么呢?

在 Elasticsearch 中,它使用了一个叫做 Painless 的语言。它是专门为 Elasticsearch 而建立的。Painless 是一种简单,安全的脚本语言,专为与 Elasticsearch 一起使用而设计。 它是 Elasticsearch 的默认脚本语言,可以安全地用于 inline 和 stored 脚本。它具有像 Groovy 那样的语法。自 Elasticsearch 6.0 以后的版本不再支持 Groovy,Javascript 及 Python 语言。我们可以通过如下的 API 来获得当前 Elastic Stack 所支持的脚本语言:

GET _script_language

上面的命令在我的 Elastic Stack 8.4.3 中返回如下的信息:

{
  "types_allowed": [
    "inline",
    "stored"
  ],
  "language_contexts": [
    {
      "language": "expression",
      "contexts": [
        "aggregation_selector",
        "aggs",
        "bucket_aggregation",
        "field",
        "filter",
        "number_sort",
        "score",
        "terms_set"
      ]
    },
    {
      "language": "mustache",
      "contexts": [
        "ingest_template",
        "template"
      ]
    },
    {
      "language": "painless",
      "contexts": [
        "aggregation_selector",
        "aggs",
        "aggs_combine",
        "aggs_init",
        "aggs_map",
        "aggs_reduce",
        "analysis",
        "boolean_field",
        "bucket_aggregation",
        "bytesref_sort",
        "composite_field",
        "date_field",
        "double_field",
        "field",
        "filter",
        "geo_point_field",
        "ingest",
        "ingest_template",
        "interval",
        "ip_field",
        "keyword_field",
        "long_field",
        "moving-function",
        "number_sort",
        "painless_test",
        "processor_conditional",
        "reindex",
        "score",
        "script_heuristic",
        "similarity",
        "similarity_weight",
        "string_sort",
        "template",
        "terms_set",
        "update",
        "update_by_query",
        "watcher_condition",
        "watcher_transform",
        "xpack_template"
      ]
    }
  ]
}

从上面的输出中,我们可以看出来它支持三种语言:expression,mustache 及 painless。

使用脚本,你可以在 Elasticsearch 中评估自定义表达式。 例如,你可以使用脚本来返回 “script fields” 作为搜索请求的一部分,或者评估查询的自定义分数。

为了能够在 Painless 脚本中使用正则表达式,你需要通过添加以下代码在 elasticsearch.yml 中激活它们:

script.painless.regex.enabled: true

如何使用脚本:

脚本的语法为:

"script": {
    "lang":   "...",  
    "source" | "id": "...", 
    "params": { ... } 
  }
  • 这里 lang 默认的值为 "painless"。在实际的使用中可以不设置,除非有第二种语言供使用
  • source 可以为 inline 脚本,或者是一个 id,那么这个 id 对应于一个 stored 脚本
  • 任何有名字的参数,可以被用于脚本的输入参数

Painless 可以使用 Java 风格的注释语句,支持分支、循环等控制结构,所以 painless 有一些关键字不能用于声明标识符。Painless 关键字比 Java 少很多,一共只有15个。下面的表列出了所有可用关键字,从中可以看出 painless 都支持哪些语句类型。

Painless 关键字

ifelsewhiledofor
incontinuebreakreturnnew
trycatchthrowthisinstanceof

Painlesss 支持了在 Java 语法除去 switch 以外的所有控制语句。

此外,painless 提供了很多可以在脚本中使用的内置函数(主要取自 Java Math 类),例如:

Painless 的简单使用例子

inline 脚本

首先我们来创建一个简单的文档:

PUT twitter/_doc/1
{
  "user" : "双榆树-张三",
  "message" : "今儿天气不错啊,出去转转去",
  "uid" : 2,
  "age" : 20,
  "city" : "北京",
  "province" : "北京",
  "country" : "中国",
  "address" : "中国北京市海淀区",
  "location" : {
    "lat" : "39.970718",
    "lon" : "116.325747"
  },
  "hobbies": [
    "tennis",
    "basketball",
    "volleyball"
    ]
}

在这个文档里,我们现在想把 age 修改为 30,那么一种办法就是把所有的文档内容都读出来,让修改其中的 age 为30,再重新用同样的方法写进去。首先这里需要有几个动作:先读出数据,然后修改,再次写入数据。显然这样比较麻烦。在这里我们可以直接使用 Painless 语言直接进行修改:

POST twitter/_update/1
{
  "script": {
    "source": "ctx._source.age = 30"
  }
}

更新操作由三个不同的步骤组成,如下所示:

  • GET API call:此操作速度非常快,适用于实时数据(无需 refresh)并检索记录。
  • Script 执行:该脚本在文档中执行,如果需要,它会被更新。
  • 保存文档:如果需要,将保存该文档。

脚本执行以下列方式遵循工作流程:

  • 脚本被编译,结果被缓存以改进重新执行。 编译取决于脚本语言; 也就是说,它检测脚本中的错误,例如书写错误、语法错误和与语言相关的错误。 编译步骤也可以受 CPU 限制,以便 Elasticsearch 缓存编译结果以供进一步执行。
  • 文档在脚本上下文中执行; 文档数据在脚本的 ctx 变量中可用

更新脚本可以在 ctx 变量中设置几个参数; 最重要的参数如下:

  • cxt._source:这包含文档的 source。
  • ctx._timestamp:如果已定义,则将此值设置为文档时间戳。
  • ctx.op:这定义了要执行的主要操作类型。 有几个可用的值,例如:
    1.  index:这是默认值; 使用更新值重新索引记录。
    2. delete: 文档被删除而不是更新(也就是说,这可用于更新文档或在超出配额时将其删除)。
    3. none:跳过文档而不重新索引文档。 

这里的 source 表明是我们的 painless 代码。这里我们只写了很少的代码在 DSL 之中。这种代码称之为 inline。在这里我们直接通过 ctx._source.age 来访问  _souce 里的 age。这样我们通过编程的办法直接对年龄进行了修改。运行的结果是:

GET twitter/_doc/1
{
  "_index": "twitter",
  "_id": "1",
  "_version": 2,
  "_seq_no": 1,
  "_primary_term": 1,
  "found": true,
  "_source": {
    "user": "双榆树-张三",
    "message": "今儿天气不错啊,出去转转去",
    "uid": 2,
    "age": 30,
    "city": "北京",
    "province": "北京",
    "country": "中国",
    "address": "中国北京市海淀区",
    "location": {
      "lat": "39.970718",
      "lon": "116.325747"
    },
    "hobbies": [
      "tennis",
      "basketball",
      "volleyball"
    ]
  }
}

显然这个 age 已经改变为 30。上面的方法固然好,但是如果我们把 age 更新为不同的值而不是先前的 30,那么每次的 scripts 是不一样的。这样,每次执行 scripts 都是需要重新进行编译的。编译好的 script 可以缓存并供以后使用。上面的 scripts 如果是改变年龄的话,需要重新进行编译。一种更好的方法是改为这样的:

POST twitter/_update/1
{
  "script": {
    "source": "ctx._source.age = params.value",
    "params": {
      "value": 34
    }
  }
}

这样,我们的 script 的 source 是不用改变的,只需要编译一次。下次调用的时候,只需要修改 params 里的参数即可。

在 Elasticsearch 里:

"script": {
  "source": "ctx._source.num_of_views += 2"
}

"script": {
  "source": "ctx._source.num_of_views += 3"
}

被视为两个不同的脚本,需要分别进行编译,所以最好的办法是使用 params 来传入参数。

此外,如果你需要执行大量更新操作,那么最好批量执行它们以提高应用程序的性能。你可以参考 _bulk 指令。比如像如下的操作:

POST _bulk
{ "update" : {"_id" : "1", "_index" : "index1", "retry_on_conflict" : 3} }
{ "doc" : {"field" : "value"} }
{ "update" : { "_id" : "0", "_index" : "index1", "retry_on_conflict" : 3} }
{ "script" : { "source": "ctx._source.counter += params.param1", "lang" : "painless", "params" : {"param1" : 1}}, "upsert" : {"counter" : 1}}
{ "update" : {"_id" : "2", "_index" : "index1", "retry_on_conflict" : 3} }
{ "doc" : {"field" : "value"}, "doc_as_upsert" : true }
{ "update" : {"_id" : "3", "_index" : "index1", "_source" : true} }
{ "doc" : {"field" : "value"} }
{ "update" : {"_id" : "4", "_index" : "index1"} }
{ "doc" : {"field" : "value"}, "_source": true}

由于重新索引是需要时间的,我们在更新的时候,甚至可以使用脚本来检查如果更新的值和之前的没有区别,或者更新的值已经包含在之前的数组中,我们可以通过设置 cxt.op 为 "none”,而不更新:

POST twitter/_update/1
{
  "script": {
    "source": """
      ctx.op = "none";
      if(ctx._source.containsKey("hobbies")) {
        for (def item: params.new_hobbies) {
          if(!ctx._source.hobbies.contains(item)) {
            ctx._source.hobbies.add(item);
            ctx.op = "index"
          }
        }
      } else {
        ctx._source.hobbies = params.new_hobbies;
        ctx.op = "index"
      }
    """,
    "lang": "painless",
    "params": {
      "new_hobbies": ["table tennis", "skiing"]
    }
  }
}

在上面的代码中,我们通过检查 new_hobbies 里的值是否在之前的 hobbies 里存在。如果有新的 hobby,那么就添加。我们的 cxt.op 为 index。

GET twitter/_doc/1

{
  "_index": "twitter",
  "_id": "1",
  "_version": 2,
  "_seq_no": 1,
  "_primary_term": 1,
  "found": true,
  "_source": {
    "user": "双榆树-张三",
    "message": "今儿天气不错啊,出去转转去",
    "uid": 2,
    "age": 20,
    "city": "北京",
    "province": "北京",
    "country": "中国",
    "address": "中国北京市海淀区",
    "location": {
      "lat": "39.970718",
      "lon": "116.325747"
    },
    "hobbies": [
      "tennis",
      "basketball",
      "volleyball",
      "table tennis",
      "skiing"
    ]
  }
}

如果我们使用如下的方式来进行调用:

POST twitter/_update/1
{
  "script": {
    "source": """
      ctx.op = "none";
      if(ctx._source.containsKey("hobbies")) {
        for (def item: params.new_hobbies) {
          if(!ctx._source.hobbies.contains(item)) {
            ctx._source.hobbies.add(item);
            ctx.op = "index"
          }
        }
      } else {
        ctx._source.hobbies = params.new_hobbies;
        ctx.op = "index"
      }
    """,
    "lang": "painless",
    "params": {
      "new_hobbies": ["tennis"]
    }
  }
}

由于 tennis 已经存在于之前的 hobbies 中,所以这个操作就是 cxt.op 为 none。在运行完上面的命令后,如果我们检查 ID 为 1 的文档,我们会发现它的 _version 还是 之前的 2,也即它没有得到更新。

除了上面的 update 之外,我们也可以使用 script query 来对我们的文档来继续搜索:

GET twitter/_search
{
  "query": {
    "script": {
      "script": {
        "source": "doc['city'].contains(params.name)",
        "lang": "painless",
        "params": {
          "name": "北京"
        }
      }
    }
  }
}

在上面的脚本中,查询在 city 字段中含有 “北京” 的所有文档。

存储的脚本 (stored script)

在这种情况下,scripts 可以被存放于一个集群的状态中。它之后可以通过 ID 进行调用:

PUT _scripts/add_age
{
  "script": {
    "lang": "painless",
    "source": "ctx._source.age += params.value"
  }
}

在这里,我们定义了一个叫做 add_age 的 script。它的作用就是帮我们把 source 里的 age 加上一个数值。我们可以在之后调用它:

POST twitter/_update/1
{
  "script": {
    "id": "add_age",
    "params": {
      "value": 2
    }
  }
}

通过上面的执行,我们可以看到,age 将会被加上 2。

访问 source 里的字段

Painless 中用于访问字段值的语法取决于上下文。在 Elasticsearch 中,有许多不同的 Plainless 上下文。就像那个链接显示的那样,Plainless 上下文包括:ingest processor, update, update by query, sort,filter 等等。

Context访问字段
Ingest node: 访问字段使用  ctxctx.field_name
Updates: 使用_source 字段ctx._source.field_name

这里的 updates 包括 _update,_reindex 以及 update_by_query。这里,我们对于 context(上下文的理解)非常重要。它的意思是针对不同的 API,在使用中 ctx 所包含的字段是不一样的。在下面的例子中,我们针对一些情况来做具体的分析。

Painless 脚本例子

首先我们创建一个叫做 add_field_c 的 pipeline。关于如何创建一个 pipleline,大家可以参考我之前写过的一个文章 “如何在 Elasticsearch 中使用 pipeline AP  I来对事件进行处理”。

例子 1

PUT _ingest/pipeline/add_field_c
{
  "processors": [
    {
      "script": {
        "lang": "painless",
        "source": "ctx.field_c = (ctx.field_a + ctx.field_b) * params.value",
        "params": {
          "value": 2
        }
      }
    }
  ]
}

这个 pipepline 的作用是创建一个新的 field:field_c。它的结果是 field_a 及 field_b 的和,并乘以 2。那么我们创建一个如下的文档:

PUT test_script/_doc/1?pipeline=add_field_c
{
  "field_a": 10,
  "field_b": 20
}

在这里,我们使用了 pipleline add_field_c。执行后的结果是:

{
  "took" : 147,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "test_script",
        "_type" : "_doc",
        "_id" : "1",
        "_score" : 1.0,
        "_source" : {
          "field_c" : 60,
          "field_a" : 10,
          "field_b" : 20
        }
      }
    ]
  }
}

显然,我们可以看到 field_c 被成功创建了。

例子 2

在 ingest 过程中,可以使用脚本处理器来处理 metadata,如 _index 和 _type。 下面是一个 ingest pipeline 的示例,无论原始索引请求中提供了什么,它都会将索引和类型重命名为 my_index:

PUT _ingest/pipeline/my_index
{
    "description": "use index:my_index and type:_doc",
    "processors": [
      {
        "script": {
          "source": """
            ctx._index = 'my_index';
            ctx._type = '_doc';
          """
        }
      }
    ]
}

使用上面的 pipeline,我们可以尝试 index 一个文档到 any_index:

PUT any_index/_doc/1?pipeline=my_index
{
  "message": "text"
}

显示的结果是:

{
  "_index": "my_index",
  "_type": "_doc",
  "_id": "1",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "_seq_no": 89,
  "_primary_term": 1,
}

也就是说真正的文档时存到 my_index 之中,而不是 any_index。

例子 3

PUT _ingest/pipeline/blogs_pipeline
{
  "processors": [
    {
      "script": {
        "source": """
          if (ctx.category == "") { 
             ctx.category = "None"
          } 
"""
      }
    }
  ]
}

我们上面定义了一个 pipeline,它可以帮我们检查如果 category 字段是否为空,如果是,就修改为 “None”。还是以之前的那个 test_script 索引为例:

PUT test_script/_doc/2?pipeline=blogs_pipeline
{
  "field_a": 5,
  "field_b": 10,
  "category": ""
}

GET test_script/_doc/2

显示的结果是:

{
  "_index" : "test_script",
  "_type" : "_doc",
  "_id" : "2",
  "_version" : 2,
  "_seq_no" : 6,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "field_a" : 5,
    "field_b" : 10,
    "category" : "None"
  }
}

显然,它把 category 为 “” 的字段变为 “None” 了。

例子 4

POST _reindex
{
  "source": {
    "index": "blogs"
  },
  "dest": {
    "index": "blogs_fixed"
  },
  "script": {
    "source": """
      if (ctx._source.category == "") {
          ctx._source.category = "None" 
      }
"""
  }
}

上面的这个例子在 reindex 时,如果 category 为空时,写入 “None”。我们可以从上面的两个例子中看出来,针对 pipeline,我们可以直接对 cxt.field 进行操作,而针对 update 来说,我们可以对 cxt._source 下的字段进行操作。这也是之前提到的上下文的区别。

例子 5

PUT test/_doc/1
{
    "counter" : 1,
    "tags" : ["red"]
}

你可以使用和 update 脚本将 tag 添加到 tags 列表(这只是一个列表,因此即使存在标记也会添加):

POST test/_update/1
{
    "script" : {
        "source": "ctx._source.tags.add(params.tag)",
        "lang": "painless",
        "params" : {
            "tag" : "blue"
        }
    }
}

显示结果:

GET test/_doc/1
{
  "_index" : "test",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 4,
  "_seq_no" : 3,
  "_primary_term" : 11,
  "found" : true,
  "_source" : {
    "counter" : 1,
    "tags" : [
      "red",
      "blue"
    ]
  }
}

显示 “blue”,已经被成功加入到 tags 列表之中了。

你还可以从 tags 列表中删除 tag。 删除 tag 的 Painless 函数采用要删除的元素的数组索引。 为避免可能的运行时错误,首先需要确保 tag 存在。 如果列表包含tag的重复项,则此脚本只删除一个匹配项。

POST test/_update/1
{
  "script": {
    "source": "if (ctx._source.tags.contains(params.tag)) { ctx._source.tags.remove(ctx._source.tags.indexOf(params.tag)) }",
    "lang": "painless",
    "params": {
      "tag": "blue"
    }
  }
}

GET test/_doc/1

显示结果:

{
  "_index" : "test",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 5,
  "_seq_no" : 4,
  "_primary_term" : 11,
  "found" : true,
  "_source" : {
    "counter" : 1,
    "tags" : [
      "red"
    ]
  }
}

“blue” 显然已经被删除了。

Painless 脚本简单的操练

为了说明 Painless 的工作原理,让我们将一些曲棍球统计数据加载到 Elasticsearch 索引中:

PUT hockey/_bulk?refresh
{"index":{"_id":1}}
{"first":"johnny","last":"gaudreau","goals":[9,27,1],"assists":[17,46,0],"gp":[26,82,1],"born":"1993/08/13"}
{"index":{"_id":2}}
{"first":"sean","last":"monohan","goals":[7,54,26],"assists":[11,26,13],"gp":[26,82,82],"born":"1994/10/12"}
{"index":{"_id":3}}
{"first":"jiri","last":"hudler","goals":[5,34,36],"assists":[11,62,42],"gp":[24,80,79],"born":"1984/01/04"}
{"index":{"_id":4}}
{"first":"micheal","last":"frolik","goals":[4,6,15],"assists":[8,23,15],"gp":[26,82,82],"born":"1988/02/17"}
{"index":{"_id":5}}
{"first":"sam","last":"bennett","goals":[5,0,0],"assists":[8,1,0],"gp":[26,1,0],"born":"1996/06/20"}
{"index":{"_id":6}}
{"first":"dennis","last":"wideman","goals":[0,26,15],"assists":[11,30,24],"gp":[26,81,82],"born":"1983/03/20"}
{"index":{"_id":7}}
{"first":"david","last":"jones","goals":[7,19,5],"assists":[3,17,4],"gp":[26,45,34],"born":"1984/08/10"}
{"index":{"_id":8}}
{"first":"tj","last":"brodie","goals":[2,14,7],"assists":[8,42,30],"gp":[26,82,82],"born":"1990/06/07"}
{"index":{"_id":39}}
{"first":"mark","last":"giordano","goals":[6,30,15],"assists":[3,30,24],"gp":[26,60,63],"born":"1983/10/03"}
{"index":{"_id":10}}
{"first":"mikael","last":"backlund","goals":[3,15,13],"assists":[6,24,18],"gp":[26,82,82],"born":"1989/03/17"}
{"index":{"_id":11}}
{"first":"joe","last":"colborne","goals":[3,18,13],"assists":[6,20,24],"gp":[26,67,82],"born":"1990/01/30"}

使用 Painless 访问 Doc 里的值

文档里的值可以通过一个叫做 doc 的 Map 值来访问。例如,以下脚本计算玩家的总进球数。 此示例使用类型 int 和 fo r循环。

GET hockey/_search
{
  "query": {
    "function_score": {
      "script_score": {
        "script": {
          "lang": "painless",
          "source": """
            int total = 0;
            for (int i = 0; i < doc['goals'].length; ++i) {
              total += doc['goals'][i];
            }
            return total;
          """
        }
      }
    }
  }
}

这里我们通过 script 来计算每个文档的 _score。通过 script 把每个运动员的 goal 都加起来,并形成最终的 _score。这里我们通过doc['goals'] 这个 Map 类型来访问我们的字段值。显示的结果为:

{
  "took" : 25,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 11,
      "relation" : "eq"
    },
    "max_score" : 87.0,
    "hits" : [
      {
        "_index" : "hockey",
        "_type" : "_doc",
        "_id" : "2",
        "_score" : 87.0,
        "_source" : {
          "first" : "sean",
          "last" : "monohan",
          "goals" : [
            7,
            54,
            26
          ],
          "assists" : [
            11,
            26,
            13
          ],
          "gp" : [
            26,
            82,
            82
          ],
          "born" : "1994/10/12"
        }
      },
...

或者,你可以使用 script_fields 而不是 function_score 执行相同的操作:

GET hockey/_search
{
  "query": {
    "match_all": {}
  },
  "script_fields": {
    "total_goals": {
      "script": {
        "lang": "painless",
        "source": """
          int total = 0;
          for (int i = 0; i < doc['goals'].length; ++i) {
            total += doc['goals'][i];
          }
          return total;
        """
      }
    }
  }
}

显示的结果为:

{
  "took" : 5,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 11,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "hockey",
        "_type" : "_doc",
        "_id" : "1",
        "_score" : 1.0,
        "fields" : {
          "total_goals" : [
            37
          ]
        }
      },
      {
        "_index" : "hockey",
        "_type" : "_doc",
        "_id" : "2",
        "_score" : 1.0,
        "fields" : {
          "total_goals" : [
            87
          ]
        }
      },
...

以下示例使用 Painless 脚本按其组合的名字和姓氏对玩家进行排序。 使用 doc ['first'].value 和 doc ['last'].value 访问名称。

GET hockey/_search
{
  "query": {
    "match_all": {}
  },
  "sort": {
    "_script": {
      "type": "string",
      "order": "asc",
      "script": {
        "lang": "painless",
        "source": "doc['first.keyword'].value + ' ' + doc['last.keyword'].value"
      }
    }
  }
}

检查缺失项

doc ['field'].value。如果文档中缺少该字段,则抛出异常。

要检查文档是否缺少值,可以调用 doc ['field'] .size() == 0。

使用 Painless 更新字段

你还可以轻松更新字段。 你可以使用 ctx._source.<field-name> 访问字段的原始源。

首先,让我们通过提交以下请求来查看玩家的源数据:

GET hockey/_search
{
  "stored_fields": [
    "_id",
    "_source"
  ],
  "query": {
    "term": {
      "_id": 1
    }
  }
}

显示的结果为:

{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "hockey",
        "_type" : "_doc",
        "_id" : "1",
        "_score" : 1.0,
        "_source" : {
          "first" : "johnny",
          "last" : "gaudreau",
          "goals" : [
            9,
            27,
            1
          ],
          "assists" : [
            17,
            46,
            0
          ],
          "gp" : [
            26,
            82,
            1
          ],
          "born" : "1993/08/13"
        }
      }
    ]
  }
}

要将玩家1的姓氏更改为 hockey,只需将 ctx._source.last 设置为新值:

POST hockey/_update/1
{
  "script": {
    "lang": "painless",
    "source": "ctx._source.last = params.last",
    "params": {
      "last": "hockey"
    }
  }
}

你还可以向文档添加字段。 例如,此脚本添加一个包含玩家 nickname 为 hockey的新字段。

POST hockey/_update/1
{
  "script": {
    "lang": "painless",
    "source": """
      ctx._source.last = params.last;
      ctx._source.nick = params.nick
    """,
    "params": {
      "last": "gaudreau",
      "nick": "hockey"
    }
  }
}

显示的结果为:

GET hockey/_doc/1
{
  "_index" : "hockey",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 2,
  "_seq_no" : 11,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "first" : "johnny",
    "last" : "gaudreau",
    "goals" : [
      9,
      27,
      1
    ],
    "assists" : [
      17,
      46,
      0
    ],
    "gp" : [
      26,
      82,
      1
    ],
    "born" : "1993/08/13",
    "nick" : "hockey"
  }
}

有一个叫做 “nick” 的新字段被加入了。

我们甚至可以对日期类型来进行操作从而得到年月等信息:

GET hockey/_search
{
  "script_fields": {
    "birth_year": {
      "script": {
        "source": "doc.born.value.year"
      }
    }
  }
}

显示结果:

{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 11,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "hockey",
        "_type" : "_doc",
        "_id" : "2",
        "_score" : 1.0,
        "fields" : {
          "birth_year" : [
            1994
          ]
        }
      },
...

使用 Script 来定制分数

我们知道在通常的情况下,我们搜索结果的分数是由搜索的词语的相关性来决定的,但是在有些情况下,我们可能不希望是这样的。比如当我搜索衣服的时候,我们希望把价钱高的衣服排在前面,或者是当我们搜索一个员工,我们希望的情况是学历高的,并且客户反馈好的能够排在前面。在这种情况下,我们希望对文档的分数进行定制。我们可以使用 script 来定制分数。详细的讲解可以阅读另外一篇文章 “Elasticsearch:使用 function_score 及 script_score 定制搜索结果的分数”。

返回随机的文档

在很多时候,我们想随机地显示一些文档。比如,当我们进入一个页面时,我们希望让用户每次进来看到不同的文档被显示。我们可以参考文章 “Kibana:Kibana 入门 (一)” 加载 Sample web logs 索引。

我们可以做如下的搜索:

GET kibana_sample_data_logs/_search?filter_path=**.hits
{
  "size": 5,
  "sort": [
    {
      "_script": {
        "script": {
          "source": "Math.random()"
        },
        "type": "number",
        "order": "desc"
      }
    }
  ],
  "_source": false,
  "fields": [
    "_id"
  ]
}

在这个例子中,对于每一个命中,新的排序值是通过执行 Math.random() 脚本函数来计算的。我们执行上面的例子两次,看看返回的结果:

{
  "hits": {
    "hits": [
      {
        "_index": "kibana_sample_data_logs",
        "_id": "-drizIEB1vTZ28zfW87v",
        "_score": null,
        "sort": [
          0.9994440091378213
        ]
      },
      {
        "_index": "kibana_sample_data_logs",
        "_id": "QNrizIEB1vTZ28zfW82A",
        "_score": null,
        "sort": [
          0.9991514519063929
        ]
      },
      {
        "_index": "kibana_sample_data_logs",
        "_id": "TdrizIEB1vTZ28zfXdng",
        "_score": null,
        "fields": {
          "_id": [
            "TdrizIEB1vTZ28zfXdng"
          ]
        },
        "sort": [
          0.9991211451075065
        ]
      },
      {
        "_index": "kibana_sample_data_logs",
        "_id": "WtrizIEB1vTZ28zfYOVo",
        "_score": null,
        "sort": [
          0.9988022050597422
        ]
      },
      {
        "_index": "kibana_sample_data_logs",
        "_id": "qdrizIEB1vTZ28zfY_WG",
        "_score": null,
        "fields": {
          "_id": [
            "qdrizIEB1vTZ28zfY_WG"
          ]
        },
        "sort": [
          0.9447232729199793
        ]
      }
    ]
  }
}
{
  "hits": {
    "hits": [
      {
        "_index": "kibana_sample_data_logs",
        "_id": "p9rizIEB1vTZ28zfX-GX",
        "_score": null,
        "sort": [
          0.9995802742731328
        ]
      },
      {
        "_index": "kibana_sample_data_logs",
        "_id": "odrizIEB1vTZ28zfWsi0",
        "_score": null,
        "sort": [
          0.9989754093647306
        ]
      },
      {
        "_index": "kibana_sample_data_logs",
        "_id": "-trizIEB1vTZ28zfYeqG",
        "_score": null,
        "sort": [
          0.9986951909797271
        ]
      },
      {
        "_index": "kibana_sample_data_logs",
        "_id": "oNrizIEB1vTZ28zfXdjg",
        "_score": null,
        "fields": {
          "_id": [
            "oNrizIEB1vTZ28zfXdjg"
          ]
        },
        "sort": [
          0.9986765820228966
        ]
      },
      {
        "_index": "kibana_sample_data_logs",
        "_id": "n9rizIEB1vTZ28zfY_WG",
        "_score": null,
        "fields": {
          "_id": [
            "n9rizIEB1vTZ28zfY_WG"
          ]
        },
        "sort": [
          0.9553508275459301
        ]
      }
    ]
  }
}

在上面,我们可以看出来,两次执行同样的命令返回的结果是不一样的。

为了得到一个索引的一个子集来进行统计,我们可以为这个索引添加一个字段,并赋予随机值。在我们统计时,我们可以只针对其中的一部分数据来进行统计。这种方法特别适用于一个很大的数据集,数据统计非常耗时,但是我们不一定要使用全部的数据集来进行统计。

我们首先来创建一个如下的 pipeline:

PUT _ingest/pipeline/random_number_pipeline
{
  "description": "Add a field with random number between 0-100",
  "processors": [
    {
      "script": {
        "source": "ctx['random_number'] = new Random().nextInt(100)"
      }
    }
  ]
}

然后,我们来更新我们所有的文档:

POST kibana_sample_data_logs/_update_by_query?pipeline=random_number_pipeline

这样我们的所有文档都含有有个新的字段叫做 random_number:

GET kibana_sample_data_logs/_search?filter_path=aggregations
{
  "size": 0, 
  "query": {
    "match": {
      "host": "www.elastic.co"
    }
  },
  "aggs": {
    "my_sample": {
      "filter": {
        "match": {
          "random_number": 2
        }
      },
      "aggs": {
        "top_ips": {
          "terms": {
            "field": "clientip",
            "size": 10
          }
        }
      }
    }
  }
}

在上面,我们通过 filter 来过滤其中的 1/100 的数据集。其中的 random_number 为 2。运行上面的结果为:

{
  "aggregations": {
    "my_sample": {
      "meta": {},
      "doc_count": 41,
      "top_ips": {
        "doc_count_error_upper_bound": 0,
        "sum_other_doc_count": 31,
        "buckets": [
          {
            "key": "10.245.118.177",
            "doc_count": 1
          },
          {
            "key": "17.57.95.228",
            "doc_count": 1
          },
          {
            "key": "19.253.238.55",
            "doc_count": 1
          },
          {
            "key": "21.8.113.52",
            "doc_count": 1
          },
          {
            "key": "23.42.38.4",
            "doc_count": 1
          },
          {
            "key": "32.150.70.211",
            "doc_count": 1
          },
          {
            "key": "40.190.86.239",
            "doc_count": 1
          },
          {
            "key": "44.209.117.254",
            "doc_count": 1
          },
          {
            "key": "47.115.93.167",
            "doc_count": 1
          },
          {
            "key": "59.227.120.72",
            "doc_count": 1
          }
        ]
      }
    }
  }
}

这种解决方案有点类似 Elasticsearch 所提供的 “Elasticsearch:通过 sampler 聚合来改善繁重的 Elasticsearch 聚合”。

在 ingest pipeline 里使用脚本

摄入处理器(ingest processors)是摄入管道的构建块; 它们描述了可以在文档上执行以修改它的操作。 脚本是处理器中用于提供完整性核心功能的主要功能。虽然 Elasticsearch 已经提供了比较丰富的 processors,但是在实际的使用中,它肯定有它覆盖不到的地方。在这种情况下,script processor为实现这种定制的功能提供可能。它可以重复地利用脚本编程轻松实现我们想要的功能。比如,我们设计如下的一个 ingest pipeline:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "script": {
          "description": "Extract 'tags' from 'env' field",
          "lang": "painless",
          "if": "ctx.env != null",
          "source": """
            String[] envSplit = ctx['env'].splitOnToken(params['delimiter']);
            ArrayList tags = new ArrayList();
            tags.add(envSplit[params['position']].trim());
            ctx['tags'] = tags;
          """,
          "params": {
            "delimiter": "-",
            "position": 1
          }
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "env": "es01-prod"
      }
    }
  ]
}

如上所示,我们的这个 script 非常之简单。我们首先通过 "if" 来检查 env 字段是否存在。如果不存在,那么脚本将不会被执行。 它提前一个字段 env 的其中一部分,并把它添加到一个新的字段 tags 里去。它按照 "-" 把 env 字段进行拆分,并提取位置为 1 的字符串添加到 tags 里。运行上面的命令:

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_id": "_id",
        "_source": {
          "env": "es01-prod",
          "tags": [
            "prod"
          ]
        },
        "_ingest": {
          "timestamp": "2022-07-06T01:00:19.907342Z"
        }
      }
    }
  ]
}

从上面的输出中,我们可以看到 tags 里现在还有 prod,也就是 es01-prod 的破折号 “-” 之后的字符串。那么在实际的使用中,我们该如何运用呢?

我们创建如下的一个 pipeline:

PUT _ingest/pipeline/extract_tag
{
  "description": "Extract a tag from env field",
  "processors": [
    {
      "script": {
        "description": "Extract 'tags' from 'env' field",
        "lang": "painless",
        "if": "ctx.env != null",
        "source": """
            String[] envSplit = ctx['env'].splitOnToken(params['delimiter']);
            ArrayList tags = new ArrayList();
            tags.add(envSplit[params['position']].trim());
            ctx['tags'] = tags;
          """,
        "params": {
          "delimiter": "-",
          "position": 1
        }
      }
    }
  ]
}

在上面,我们创建一个叫做 extract_tag 的 pipeline。我们使用如下的例子来做展示:

PUT clusters/_doc/1?pipeline=extract_tag
{
  "env": "es01-prod"
}

我们可以通过如下的命令来进行检查:

GET clusters/_doc/1

上面命令运行的结果是:

{
  "_index": "clusters",
  "_id": "1",
  "_version": 1,
  "_seq_no": 0,
  "_primary_term": 1,
  "found": true,
  "_source": {
    "env": "es01-prod",
    "tags": [
      "prod"
    ]
  }
}

从上面,我们可以看出来,在文档的 _source 里新增加了一个叫做 tags 的字段。它里面含有 prod。通过 script processor 的运用,我们可以通过脚本编程,灵活地实现到目前位置 ingest processors 不能提供的功能。在使用 script processor 时,需要注意的一点是,它毕竟是脚本运行。太多繁重的运算会降低摄入数据的效率。

Script Caching

Elasticsearch第一次看到一个新脚本,它会编译它并将编译后的版本存储在缓存中。无论是 inline 或是 stored 脚本都存储在缓存中。新脚本可以驱逐缓存的脚本。默认的情况下是可以存储100个脚本。我们可以通过设置 script.cache.max_size 来改变其大小,或者通过 script.cache.expire 来设置过期的时间。这些设置需要在 config/elasticsearch.yml 里设置。

Script 调试

不能调试的脚本是非常难的。有一个好的调试手段无疑对我们的脚本编程是非常有用的。

Debug.explain

Painless 没有 REPL,虽然有一天它很好,但它不会告诉你关于调试 Elasticsearch 中嵌入的 Painless 脚本的全部故事,因为脚本可以访问的数据或 “上下文” 是如此重要。 目前,调试嵌入式脚本的最佳方法是在选择位置抛出异常。 虽然你可以抛出自己的异常(throw new exception('whatever'),但 Painless 的沙箱会阻止你访问有用的信息,如对象的类型。 所以 Painless 有一个实用工具方法 Debug.explain,它会为你抛出异常。 例如,你可以使用 _explain 来探索 script query 可用的上下文。

PUT /hockey/_doc/1?refresh
{"first":"johnny","last":"gaudreau","goals":[9,27,1],"assists":[17,46,0],"gp":[26,82,1]}

POST /hockey/_explain/1
{
  "query": {
    "script": {
      "script": "Debug.explain(doc.goals)"
    }
  }
}

这表明 doc.goals 类是通过 org.elasticsearch.index.fielddata.ScriptDocValues.Long 来响应的:

{
  "error": {
    "root_cause": [
      {
        "type": "script_exception",
        "reason": "runtime error",
        "painless_class": "org.elasticsearch.index.fielddata.ScriptDocValues.Longs",
        "to_string": "[1, 9, 27]",
        "java_class": "org.elasticsearch.index.fielddata.ScriptDocValues$Longs",
        "script_stack": [
          "Debug.explain(doc.goals)",
          "                 ^---- HERE"
        ],
        "script": "Debug.explain(doc.goals)",
        "lang": "painless"
      }
    ],
    "type": "script_exception",
    "reason": "runtime error",
    "painless_class": "org.elasticsearch.index.fielddata.ScriptDocValues.Longs",
    "to_string": "[1, 9, 27]",
    "java_class": "org.elasticsearch.index.fielddata.ScriptDocValues$Longs",
    "script_stack": [
      "Debug.explain(doc.goals)",
      "                 ^---- HERE"
    ],
    "script": "Debug.explain(doc.goals)",
    "lang": "painless",
    "caused_by": {
      "type": "painless_explain_error",
      "reason": null
    }
  },
  "status": 400
}

你可以使用相同的技巧来查看 _source 是 _update API 中的 LinkedHashMap:

POST /hockey/_update/1
{
  "script": "Debug.explain(ctx._source)"
}

显示的结果是:

{
  "error": {
    "root_cause": [
      {
        "type": "remote_transport_exception",
        "reason": "[localhost][127.0.0.1:9300][indices:data/write/update[s]]"
      }
    ],
    "type": "illegal_argument_exception",
    "reason": "failed to execute script",
    "caused_by": {
      "type": "script_exception",
      "reason": "runtime error",
      "painless_class": "java.util.LinkedHashMap",
      "to_string": "{first=johnny, last=gaudreau, goals=[9, 27, 1], assists=[17, 46, 0], gp=[26, 82, 1], born=1993/08/13, nick=hockey}",
      "java_class": "java.util.LinkedHashMap",
      "script_stack": [
        "Debug.explain(ctx._source)",
        "                 ^---- HERE"
      ],
      "script": "Debug.explain(ctx._source)",
      "lang": "painless",
      "caused_by": {
        "type": "painless_explain_error",
        "reason": null
      }
    }
  },
  "status": 400
}

执行脚本 API

在很多时候,我们可以把我们算法写成一个与 Painless context 无关的 method,通过使用 _execute API 来进行测试。测试好的代码可以再放到各种执行的 context 中进行调用。有关 context 及 _execute API 的使用,请参阅文章 “Elasticsearch:Painless execute API”。

比如,我们可以通过 _execute API 来测试如下的一个计算 BMI:

POST /_scripts/painless/_execute
{
  "script": {
    "lang": "painless",
    "source": """
      double calculateBMI(double height, double weight) {
        return weight/(height*height);
      }
      
      calculateBMI(params['height'], params['weight'])
     """,
    "params": {
      "height": 165,
      "weight": 60
    }
  }
}

我们可以通过定义一个叫做 calculateBMI 方法,来测试它的正确性,然后我们再在不同的 context 下进行对该方法的调用。比如我们有如下的文档:

PUT employee/_bulk?refresh
{"index":{"_id": 1}}
{ "salary" : 5000, "bonus": 500, "@timestamp" : "2021-02-28", "weight": 60, "height": 175, "name" : "Peter", "occupation": "software engineer","hobbies": ["dancing", "badminton"]}
{"index":{"_id": 2}}
{ "salary" : 6000, "bonus": 500, "@timestamp" : "2020-02-01", "weight": 50, "height": 160, "name" : "John", "occupation": "sales", "hobbies":["singing", "volleyball"]}
{"index":{"_id": 3}}
{ "salary" : 7000, "bonus": 600, "@timestamp" : "2019-03-01", "weight": 55, "height": 172, "name" : "mary", "occupation": "manager", "hobbies":["dancing", "tennis"]}
{"index":{"_id": 4}}
{ "salary" : 8000, "bonus": 700, "@timestamp" : "2018-02-28", "weight": 45, "height": 166, "name" : "jerry", "occupation": "sales", "hobbies":["biking", "swimming"]}
{"index":{"_id": 5}}
{ "salary" : 9000, "bonus": 800, "@timestamp" : "2017-02-01", "weight": 60, "height": 170, "name" : "cathy", "occupation": "manager", "hobbies":["climbing", "jigging"]}
{"index":{"_id": 6}}
{ "salary" : 7500, "bonus": 500, "@timestamp" : "2017-03-01", "weight": 40, "height": 158, "name" : "cherry", "occupation": "software engineer", "hobbies":["basketball", "yoga"]}

我们可以进行如下的搜索:

GET employee/_search
{
  "script_fields": {
    "BMI": {
      "script": {
        "source": """
        double calculateBMI(double height, double weight) {
          return weight/(height*height);
        }
        
        double height = (double)doc['height'].value;
        double weight = (double)doc['weight'].value;
        
        return calculateBMI(height, weight)
        """
      }
    }
  }
}

由于我们的 calculateBMI 的方法是正确的,我们只需要把相应的值传入即可。

参考:

【1】A Brief Painless Walkthrough | Painless Scripting Language [7.16] | Elastic

【2】Painless Debugging | Painless Scripting Language [7.16] | Elastic

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐