背景:

  • 统计对某个字段去重后的聚合数据

  • 实现类似 sql:select count(distinct fault_name) from wangqi group by fault_name;

问题: 使用 es 的 cardinality 做数据去重 会导致 结果出现 +-%5误差;

Query:
{
  "size": 0,
  "query": {
    "bool": {
      "filter": [
        {
          "range": {
            "rpt_dt": {
              "gte": "2022-01-01",
              "lt": "2022-12-13"
            }
          }
        }
      ]
  },
  "aggs": {
    "01": {
      "filter": {
        "terms": {
          "data_sources": [
            "01"
          ]
        }
      },
      "aggs": {
        "01": {
          "cardinality": {
            "field": "mac_id"
          }
        }
      }
    }
  }
}

Return:
{
    "01" : {
      "doc_count" : 10901762,
      "01" : {
        "value" : 1425288
      }
    }
}

 方案一:

  • 使用composite 多列聚合 类似 mysql select count(commissionamount) from xxx_table group by timeperiod,orgId
Query:
{
  "size": 0,
  "query": {
    "bool": {
      "filter": [
        {
          "range": {
            "rpt_dt": {
              "gte": "2022-01-01",
              "lt": "2022-12-13"
            }
          }
        }
      ]
    }},
  "aggs": {
    "NAME": {
      "composite": {
        "sources": [
          {
            "fault_name": {
             "terms": {
               "field": "mac_id"
             }
            }
          },
          {
            "mac_id":{
              "terms": {
                "field": "fault_name"
              }
            }
          }
        ]
      }
    }
  }
}

Return:
{
  "key" : {
    "fault_name" : "***",
    "mac_id" : "***"
  },
  "doc_count" : 2
}
  • 通过after key 循环查询 所有返回的 fault_name 和 mac_id 排列组合,在python 中进行数量排名,去重数量计算。
  • 优点:
    • 聚合+去重结果 准确
  • 问题:
    • 需要循环查询多次(取决于排列组合数量),代码需要大量改动

方案二:

  • 使用 stats_bucket
扩大es terms 聚合size 上限
PUT test/_settings
{
  "persistent": {
    "search.max_buckets": 2000000
  }
}
GET test/_search?filter_path=aggregations.**.count,aggregations.mac_id.buckets.key,aggregations.mac_id.buckets.key_as_string
{
  "query": {
    "bool": {
      "filter": [
        {
          "range": {
            "rpt_dt": {
              "gte": "2022-06-01",
              "lt": "2022-06-04"
            }
          }
        }
      ]
    }
  },
  "size": 0,
  "aggs": {
    "mac_id": {
        "terms": {
          "field": "rpt_dt",
          "size": 20
        },
        "aggs": {
            "fault_name": {
                "terms": {
                    "field": "mac_id",
                    "size": 100000000
                  }
            },
            "count": {
                "stats_bucket": {
                    "buckets_path": "fault_name._count"
                  }
            }
        }
    }
  }
}

具体介绍:Stats Bucket Aggregation - elasticsearch中文文档  

  • 二级聚合拿到所有二级的桶,使用 stats_bucket 统计所有桶的数量即为 去重后的数量;

  • 优点:

    • 聚合+去重结果 准确,不需要循环查询,代码改动量少
  • 问题:

    • 二级聚合拿到所有桶,比较耗时,查询时间为原始方案的 5-6倍

方案三:

使用 scripted_metric 实现自定义聚合

init_script 定义
map_script 操作 判断
combine_script 操作返回记录
reduce_script 返回sum值

Query:
{
  "query": {
    "bool": {
      "filter": [
        {
          "range": {
            "rpt_dt": {
              "gte": "2022-06-01",
              "lt": "2022-06-04"
            }
          }
        }
      ]
    }
  },
  "size": 0,
  "aggs": {
    "mac_id": {
      "terms": {
        "field": "fault_name",
        "size": 10
      }, 
      "aggs": {
        "spu": {
          "scripted_metric": {
            "init_script": {
              "source": "state.numas=new HashMap();",
              "lang": "painless"
            },
            "map_script": {
              "source": """
              if(doc.mac_id.length>=1){
                String houseKey = doc.mac_id.value;
                state.numas.put(houseKey,1);
              }
              """,
              "lang": "painless"
            },
            "combine_script": {
              "source": """
              double item_finish_count=0;
              for(key in state.numas.keySet()){
                item_finish_count+=1;
              }
              return item_finish_count;""",
              "lang": "painless"
            },
            "reduce_script": {
              "source": """double result=0;
               for(e in states){
                if(!Objects.isNull(e)){
                    result+=e;
                }
              }
              return result;""",
              "lang": "painless"
            },
            "params": {
              "close_sum_key": "close_sum3",
              "house_sum_key": "house_sum3"
            }
          }
        }
      }
    }
  }
}

Return:
{
    "key_as_string" : "2022-06-01 00:00:00",
    "key" : 1654041600000,
    "doc_count" : 274282,
    "spu" : {
        "value" : 268144.0
    }
}
  • es 支持 scripted_metric 使用java 语法脚本的方式 自定义聚合

  • 这里使用 hashmap 数据结构,对第一层 桶做遍历,将 需要去重的字段 放入hashmap ,最后统计hashmap的key数量,得到去重后数量

  • 优点:

    • 聚合去重结果准确,代码改动少
    • 速度快,几乎和原始方案差别不到 慢20%不到
  • 缺点

    • 多分片时脚本较为复杂,因为每个分片有一个hashmap,需要汇总统计,单分片可解决
    • 天量数据可能会占用较大内存,2-3年内数据问题不大

 

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐