python 读取Elasticsearch数据
elasticsearch 库pip installelasticsearchfrom elasticsearch import ElasticsearchES_CONF = {"host": "192.168.11.22","port": 9200,"user": "elastic","passwd": "1234"}index = "test_index"def process_func():
目录
(2)条件过滤的条件构成(match、term和range)
1、基本概念
全文搜索属于最常见的需求,开源的 Elasticsearch是目前全文搜索引擎的首选。它可以快速地储存、搜索和分析海量数据。维基百科、Stack Overflow、Github 都采用它。(阮一峰老师博客)
ES相对SQL型数据库来说,没有很明确的表的概念,它与SQL型数据库的结构类比如下:
对于SQL型数据来说,基本的数据单位是一条record,由不同的属性列组成,而在ES中,基本的数据单位是一个document,它是由key-value形式的键值对组成的一个json类型的文本,这里的key-value则叫做field。相同类型的document就形成一个type,也就是我们常说的数据表,最后所有的数据表一起形成一个数据库,在ES里面就叫做index。如果在写入数据时没有设计type,那么所有的数据默认的type都是"_doc",这个有时候在查询数据时会用到,ES现在也有在弱化这个type的存在。对于SQL数据库来说,表的设计叫schema,对应的在ES中,一个index的设计叫做mapping。
对于ES中可以存储的数据,目前的数据类型有:(此处有参考)
基本数据结构:text, keyword, date, long, double, boolean,ip
json数据结构:object, nested
专用数据结构:geo_point, geo_shape, completion
2、基本操作
关于index的建立和相应操作,可以参考阮一峰老师的博客,对于在kibnan之类的可以用命令行操作的,可以参考:es查询DSL语句。
对于只是使用数据库中的数据来说,有时候可能最关心的是,如何使用数据库的一些基本操作来帮助我们读写数据,以及做一些简单的数据分析。数据库的操作基本就是增删改查,在MySQL中是通过select……from……where之类的关键词构成SQL语句,然后执行语句完成功能,在ES中这样的语句则是由一个json格式的dict组成,如下是一个ES的搜索body涉及,与SQL语句的对应:
GET /analysis/_search
{
"_source": { ---SELECT
"includes": ["fileName","starttime","duration","repNo","repGroup","typeOfService"],
"excludes": ["blankKeyword","keyword","topicHitDetail"]
},
"query": { ---WHERE
"bool": {
"filter": {
"term": {
"typeOfService": "转账"
}
}
}
},
"aggs": { ---GROUP BY
"class_buckets": { ---HAVING
"filter": {
"range": {
"duration": {
"gte": 600
}
}
},
"aggs": {
"class_count": {
"terms": {
"field": "classfication_f"
},
"aggs": {
"count": {
"value_count": {
"field": "classfication_f"
}
},
"count_filter":{
"bucket_selector": { ------HAVING
"buckets_path": {
"count":"count"
},
"script": "params.count>=1000"
}
}
}
}
}
}
},
"from": 0, ---LIMIT
"size": 10,
"sort": [ ---ORDER BY
{
"starttime": {
"order": "desc"
}
}
]
}
(1)列的选择
通过"_source"这个key来选择需要搜索的数据属性字段,可以通过includes和excludes来指定需要搜索和不需要搜索的字段。
"_source": [field1, field2,……,fieldn] # 默认搜索包含该列表字段的数据
类似于:select field1, field2,……,fieldn from index
# 查询所有的记录,默认返回10条
{
"query": {
"match_all": {}
}
}
(2)条件过滤的条件构成(match、term和range)
一般在where的条件语句中,用得比较多的是精确查找"=",或者模糊匹配like,以及属性值的多项选择in()这类关键词。类似的,在ES中则是通过match来完成对应like的模糊匹配,通过term来完成对应"="和in的精确匹配。
-
精确匹配
首先来看精确匹配,在ES中,精确匹配是通过term来完成的。term过滤:term主要用于精确匹配,如字符串、数值、日期等(不适合情况:1.列中除英文字符外有其它值 2.字符串值中有冒号或中文 3.系统自带属性如_version)。例子如下
# 精确匹配insertTime是2021年5月26日的document
query = {
"query": {
"term": {"insertTime": "2021-05-26"}
}
}
另一个就是类似于SQL中IN()用法的terms,用于某个字段满足多个值的情况,例子如下:
# date为2014-09-01和2014-10-03的值都会被查找出来
{
"query": {
"terms": {"date": ["2014-09-01","2014-10-03"]}
}
}
-
模糊匹配
对于模糊匹配match也有几种情况,match是匹配给出的字符串中任意一个匹配到就可以了,match_phrase则是匹配给定的整个字符串(将这个字符串当做一个整体的短语),match_phrase_prefix是做前缀匹配,multi_match是做多个字段的搜索,具体例子如下:
# 在full_name字段做匹配,匹配包含John或者Smith的document
{
"query" : {
"match" : {
"full_name" : "John Smith"
}
}
}
# 使用"operator", 匹配同时包含John和Smith的document
{
"query": {
"match" : {
"full_name" : {
"query" : "John Smith",
"operator" : "and"
}
}
}
}
* ES默认的是对英文字符串用空格划分,对中文字符串直接做字的切分
# 下面的例子则是匹配包含“银”或者“行”的文档
{
"query" : {
"match" : {
"full_name" : "银行"
}
}
}
# 匹配包含“银行”这个字符串的document
{
"query" : {
"match_phrase" : {
"full_name" : "银行"
}
}
}
# 在两个字段中做匹配
{
"query" : {
"multi_match": {
"query": "John Smith",
"fields": ["first_name", "full_name"]
}
}
}
-
范围匹配range
对于字符串或者数值类的字段,match和term可以完成模糊或者精确的匹配,在SQL中还有一种常用的条件,数值型数据可以根据范围进行过滤,在ES中,则是通过range这个关键词来实现,例子如下:
{
"query": {
"range":{
"age":{ //查询age字段
"gte":60, //大于60
"lt":70 //小于70
}
}
}
}
-
空值判断
ES中的空值判断是通过“exist”和"missing"关键词实现的,例子如下:
# 查询字段user不为空的字段
{
"query": {
"exists" : { "field" : "user" }
}
}
# age字段为空的所有数据
{
"query": {
"missing":{ "field":"age"}
}
}
(3)过滤条件的组合
一般where中的各个条件是用and或or来连接,在ES中则是将这些条件做了一个聚合,必须满足的条件放在"must"这个键的值,可以满足也可以不满足的条件则放在"should"这个键值对,一定不可以的条件放在"must_not"这个键值对,最后所有的条件作为"bool"这个键的值。示例如下:
"query": {
"bool": {
"must": [
{}
],
"must_not": [
{}
],
"should": [
{"bool": {
"must": [
{"term": {
"callNumber": {
"value": "95533"
}
}}
]
}}
]
}
}
(4)聚合统计分析
{"aggs":{rename:{ # 聚合后生成的新列的名称"aggs_function":{ # 使用的聚合函数的名称"terms": field_name # 要做聚合的字段名称}}}}
例子如下:
# 分组统计,即group by……count
{
"size": 0,
"aggs": {
"group_by_state": {
"terms": {
"field": "state"
}
}
}
}
* 类似SQL
SELECT state, COUNT(*) FROM customer GROUP BY state
# avg求平均值
GET /lockie_test/_search
{
"size": 0,
"aggs": {
"avg_dealDl": {
"avg": {
"field": "dealDl"
}
}
}
}
# 分组聚合求平均值
GET /lockie_test/_search
{
"size": 0,
"aggs": {
"group_by_time": {
"terms": {
"field": "balanceTime",
"order": {
"avg_dealDl": "asc"
}
},
"aggs": {
"avg_dealDl": {
"avg": {
"field": "dealDl"
}
}
}
}
}
}
可以使用的聚合函数如下(图片来源同上):
参考:Elasticsearch 2.20入门篇:聚合操作、ES简单实用DSL查询
(5)term和range一起使用
{
"query": {
"bool": {
"filter": [
{
"term": {
"name": "kevin"
}
},
{
"range": {
"age": {
"gte": 20,
"lte": 30
}
}
}
]
}
}
}
(6) text字段的term使用
GET /index/_search
{
"query":{
"term":{
"field_name.keyword": "xxx"
}
}
}
GET bank/_search
{
"query":{
"match":{
"address.keyword": "467 Hutchinson"
}
}
}
3、python连接ES并做查询
elasticsearch 库
pip install elasticsearch
from elasticsearch import Elasticsearch
ES_CONF = {
"host": "192.168.11.22",
"port": 9200,
"user": "elastic",
"passwd": "1234"
}
index = "test_index"
def process_func():
# 从ES读取数据
es = Elasticsearch(
[ES_CONF["host"]],
http_auth=(ES_CONF["user"], ES_CONF["passwd"]),
port=ES_CONF["port"]
)
# 条件查询
query_json = {
"size": 1000,
"query": {
"range": {
"time": {
"gte": "2020-12-20",
"lte": "2020-12-21"
}
}
},
"sort": {"time": {"order": "asc"}}
}
result = es.search(index=index, body=query_json)["hits"]["hits"]
参考文献
更多推荐
所有评论(0)