利用python操作ElasticSearch

利用python对ElasticSearch进行操作主要会涉及两个python库elasticsearch和elasticsearch-dsl。除此之外,还会一个更为简单的python库:es-pandas。

其中,elasticsearch-py是一个官方提供的elasticsearch python客户端库,它只是对elasticsearch的rest API接口做了一层简单的封装。相对于这个客户端库,官方还提供了相对高级的elasticsearch-dsl客户端库。而es-pandas主要用于读、写和更新大型pandas DataFrame到ElasticSearch。

本文主要介绍elasticsearch-py库,篇幅有限,详细可以参考官方文档。下面将按照操作的一般流程进行介绍,包括以下几个小节:连接elasticsearch,索引的相关操作,数据查询,批量插入数据至elasticsearch,实例:读取、写入ElasticSearch

(1)导入相关包
from elasticsearch import Elasticsearch 
(2)连接elasticsearch
host = 192.168.1.100   # 随便造的,使用时换上自己的真实host即可
post = 8888
es= Elasticsearch([{'host': host, 'port': post}])  
(3)索引

① 创建索引

es.indices.create("index_name")

如果需要自定义mapping,则指需将mapping定义好后传递给body参数即可,如es.indices.create(index="index_name", body=mapping)

② 删除索引

es.indices.delete("index_name")

③ 重建索引
重建索引主要用于需要将某个索引的全部数据(或者查询的数据)原封不动的迁移到另一个索引当中,包括mapping等都与原来的索引相同。其中,source_index参数为源索引,target_index参数为目标索引。(重建索引迁移数据,不会覆盖原来的数据,例如:将数据集a迁移到索引index中,然后将数据b迁移到索引index中,index中包含a和b的数据,且a和b中_id和索引相同的数据不会重复)

注意: 目标索引不用提前创建(es.indices.create),索引不存在将会自动创建,如果提前创建了反而会因为mapping不一致进而出错。

from elasticsearch import helpers

# 索引重建、数据迁移
body={"query":{"match_all":{}}}  #遍历原索引,可自定义query
helpers.reindex(client=es,
                    source_index='old_index_name',
                    target_index='new_index_name',
                    target_client=es,
                    query=body)

④ 查看当前es中的索引

# 查看所有索引,且展示每个索引的详细结构。
indexs = es.indices.get("*")

# 查看es中的所有索引的名称
index_names = indexs.keys()

# 查看某个索引
index = es.indices.get("index_name")

⑤ 判断某个索引是否存在

# 判断索引是否存在,存在将会返回True
es.indices.exists(index='index_name')
(4)查询数据

elasticsearch python客户端库查询数据主要使用es.search(),将查询语句传递给body参数即可,如果要限制返回的最大数据量,则指需指定size参数值即可,如size=1000

① 查询全部数据——match_all

body={
    "query" : {
        "match_all" : {}
    }
}
# index_name即为es中的索引名
es.search(index="index_name",  doc_type="doc_type_name", body=body,size=1000)

② 等于查询——term、terms

term支持查询某个字段等于某个值这样的业务场景(一对一),而terms支持查询某个字段等于多个值的业务场景(一对多)。

例1:查询lang=python的数据

body={
    "query" : {
        "term" : {
            "lang" : "python"
        }
    }
}
es.search(index="index_name",  doc_type="doc_type_name", body=body)

例2:查询lang=python或者lang=c++的数据

body={
    "query" : {
        "terms" : {
            "lang" : [
                "python", "c++"
            ]
        }
    }
}
es.search(index="index_name",  doc_type="doc_type_name", body=body)

③ 包含查询——match、multi_match

match支持某个字段包含某个关键字,而multi_match支持查询多个字段包含某个关键字。

例1:查询name中包含”李“关键字的数据

body={
    "query" : {
        "match" : {
            "name" : "李"
        }
    }
}
es.search(index="index_name",  doc_type="doc_type_name", body=body)

例2:查询addr和city两个字段中包含”深圳“关键字

body = {
    "query" : {
        "multi_match" : {
            "query" : "深圳",
            "fields" : ["name", "addr"]
        }
    }
}
es.search(index="index_name",  doc_type="doc_type_name", body=body)

④ 复合查询——bool

bool有3类查询关系,must(都满足),should(其中一个满足),must_not(都不满足)。

例:获取name="python"并且age=18的所有数据

body = {
    "query":{
        "bool":{
            "must":[
                {
                    "term":{
                        "name":"python"
                    }
                },
                {
                    "term":{
                        "age":18
                    }
                }
            ]
        }
    }
}
es.search(index="index_name",doc_type="type_name",body=body)

⑤ 切片式查询——from、size

from为从第多少条数据开始查询,size为查询的数据量。

body = {
    "query":{
        "match_all":{}
    }
    "from":2    # 从第二条数据开始
    "size":4    # 获取4条数据
}
# 从第2条数据开始,获取4条数据
es.search(index="index_name",doc_type="type_name",body=body)

⑥ 范围查询——range

range 过滤–按照指定范围查找一批数据,gt : 大于,gte : 大于等于,lt : 小于,lte : 小于等于。例如使用gte和lte,获取的数据范围为:gte<=字段值<=lte。

值得注意的是,针对日期格式的字段,一般需要配合format使用,format的格式为gte和lte的格式,而不是字段值的格式,format的日期格式可以自定义,也可以使用定义好的值,可以参考这篇文章官方指导手册

例:查询@timestamp在2020-07-04 8:12:00 到 2020-07-04 09:20:00时间段的数据。

body = {
    "query":{
        "range":{
            "@timestamp":{
                "gte": "2020-07-04 08:12:00",       
                "lte": "2020-07-04 09:20:00",       
                "format":"yyyy-MM-dd HH:mm:ss"
            }
        }
    }
}
es.search(index="index_name",doc_type="type_name",body=body)

⑦ 通配符查询——wildcard

body = {
    "query":{
        "wildcard":{
            "name":"*id"
        }
    }
}
# 查询name以id为后缀的所有数据
es.search(index="index_name",doc_type="type_name",body=body)

⑧ 排序——sort
排序主要通过sort中order字段限制升序(asc)还是降序(desc)。

body = {
    "query":{
        "match_all":{}
    }
    "sort":{
        "age":{                 # 根据age字段升序排序
            "order":"asc"       # asc升序,desc降序
        }
    }
}
es.search(index="index_name",doc_type="type_name",body=body)

当需要对多个字段进行排序时,需要注意sort下的字段顺序,字段顺序即为排序顺序。

body = {
    "query":{
        "match_all":{}
    }
    "sort":[{
        "age":{                # 先根据age字段升序排序
            "order":"asc"      # asc升序,desc降序
        }
    },{
        "name":{               # 后根据name字段升序排序
            "order":"asc"      # asc升序,desc降序
        }
    }],
}
es.search(index="index_name",doc_type="type_name",body=body)

⑨ 响应过滤——filter_path

# 只需要获取_id数据,多个条件用逗号隔开
es.search(index="index_name",doc_type="type_name",filter_path=["hits.hits._id"])
 
# 获取所有数据
es.search(index="index_name",doc_type="type_name",filter_path=["hits.hits._*"])
(5)删除符合条件的数据

先根据查询语句查询数据,然后使用delete_by_query删除查询到的数据。例如:删除平均加速度大于等于0.4的数据。

body = { "query":{ "range":{ "average_acceleration":{ "gte": 0.4} } } }
es.delete_by_query(index='index_name', body=body)
(6)批量插入数据至ElasticSearch

本小节主要展示将python中pandas数据插入到ElasticSearch,即批量插入数据,批量插入数据需要使用到helpers.bulk。对于单条数据,也可以使用此方法插入。

例如:读取本地csv文件数据(两列column1、column2),并将其插入到ElasticSearch中。

import pandas as pd
from elasticsearch import helpers

df = pd.read_csv('data.csv')

# 将要插入的数据以字典的形式存入列表中,如果是单条数据,则列表长度为1。
action_list = [df.loc[ind, :].to_dict() for ind in df.index]
helpers.bulk(es, action_list, index="index_name", doc_type="doc", raise_on_error=True)

处理思路如下:首先,读取数据。然后将每行数据转换成字典格式存入action_list列表中。最后使用helpers.bulk()将所有数据插入到ElasticSearch中。其中,helpers.bulk中es为上文中连接后的es变量,action_list为要插入的全部数据,index为数据存放的索引。索引的创建方法,可以看第(3)小节。

注意:如果字段中包含有"“和NAN,则可能会导致写入es失败。可以使用None替换”",注意不能使用"None"。

(7)批量更新数据

针对单条数据可以适用update、update_by_query更新。此外,批量更新ElasticSearch中的数据可以使用update、update_by_query等函数结合循环语句进行批量更新,也可以使用helpers.bulk进行一次性批量更新。

例如:需要将某个索引(predict_save_index)中的数据进行,根据_id(存放在update_id_list列表中)对某个字段(pred)的值进行更新(如果该字段不存在,就会自动创建,类似于pandas中数据框给字段赋值一样,字段不存在就会自动创建)。

actions = []
for ind, pred_label in zip(update_id_list, df['pred_label']):           action = {        
            "_op_type": "update",   # 操作命令,这里为更新      
            "index": predict_save_index,  # 数据的索引      
            "_id": ind,        # 要更新的数据 _id
            "doc": {            
                    "pred": pred_label     # 更新pred字段,值为pred_label,如果该字段不存在,将会自动新增该字段
                 }    
              }    
      actions.append(action)

helpers.bulk(es,actions,index=predict_save_index,
                        doc_type="doc", raise_on_error=True)

其中,update_id_list为需要更新数据的_id的列表,df[‘pred_label’]为需要赋给es中pred字段的值,es为已经连接成功的客户端。

(8)实例:读取、写入ElasticSearch

业务场景:需要获取当前时间前5分钟内的数据,且数据量最多不超过10000条,并将其写入到新的索引当中。

读取数据思路:

  • 涉及查询ElasticSearch数据,且为查询某个时间段的数据(范围range)。

写入数据思路:

  • a、读取到的数据转为数据框后,采用第(5)小节的方法,将其批量插入到ElasticSearch中,这个思路其实更适用,因为,在日常工作中,我们往往读取数据后,需要对数据做一定的处理,然后保存处理后的结果到新的索引中,而不是读取后直接存储,这里我为了方便,就把处理数据的过程给省略了。
  • b、直接使用重建索引,进行数据迁移。重建索引进行数据迁移,很适合用于保留原始数据的业务场景,因为在保留原始数据的情况下,插入数据经常会因为原始数据的格式而触发错误,除非新索引与原始数据的格式一致。

下面的代码为思路a,思路b不介绍,其比较简单,直接利用第(3)小节的代码即可。

import pandas as pd
from elasticsearch import Elasticsearch  
from elasticsearch import helpers
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta

class EsData(object):
    """从ES中读取数据"""
    def __init__(self, config_path):
        """
        :param config_path: 配置文件路径
        :param data_col: 要获取的数据列(list)
        :param new_col: 重新定义原始数据列的列名(list)
        """
        self.config = configparser.ConfigParser()
        self.config.read(config_path, encoding="utf-8-sig")  # 配置文件中有中文,则需要指定encoding参数
        self.es_index = self.config.get('ES', 'index')  # 从配置文件中读取指定的索引名称
        self.es = self.es_conn()  # 连接elasticsearch

    def es_conn(self):
        """连接ES"""
        es_host = self.config.get('ES', 'host')  # 从配置文件中读取host,也可以自己指定
        es_port = self.config.get('ES', 'port')
        es = Elasticsearch([{'host': es_host, 'port': es_port, 'timeout': 300}])
        return es
        
    def get_data_on_5minutes_maxsize(self):
        """
        获取当前时间的前5分钟数据,且数据量大小不超过配置文件中设置的max_size
        :return: 索引中符合条件的全部数据(所有列)
        """
        time_diff = 5
        max_size = 10000
        current_time = datetime.now()
        # 起始时间,注意为字符串格式
        gte_time = (current_time - relativedelta(minutes=int(time_diff))).strftime('%Y-%m-%d %H:%M:%S')
        # 结束时间,字符串格式
        lte_time = current_time.strftime('%Y-%m-%d %H:%M:%S')
        # print('gte_time:{} lte_time:{}'.format(gte_time, lte_time))
        body = {
            "query": {
                "range": {
                    "@timestamp": {
                        "gte": gte_time,
                        "lte": lte_time,
                        "format": "yyyy-MM-dd HH:mm:ss||date_time"
                    }
                }
            }
        }
        s_5minutes = self.es.search(index=self.es_index, body=body, size=max_size)
        # 数据存在”_source“中,因此提取该字段的数据,如果想获取其他字段的数据,更换”_source“即可
        df_ontime = pd.DataFrame([hit['_source'] for hit in s_5minutes['hits']['hits']])
        print('es中在{} ~ {}时间间隔的数据共有{}条'.format(gte_time, lte_time, len(df_ontime)))
        return df_ontime
        
    def write_data_to_es(self, df):
        current_date = datetime.now().strftime('%Y-%m-%d')
        
        #创建存储数据的索引名称
        predict_save_index = 'predicted-save-index-' + current_date

        # 判断predict_save_index索引是否存在
        if self.es.indices.exists(predict_save_index):
            pass
        else:
            print('预测结果保存的索引不存在,将重新创建')
            self.es.indices.create(index=predict_save_index)
        
        action_list = [df.loc[ind, :].to_dict() for ind in df.index]
        helpers.bulk(self.es, action_list, index=predict_save_index, doc_type="doc", raise_on_error=True)
            

参考:

[1] https://elasticsearch-py.readthedocs.io/en/master/index.html

[2] https://www.cnblogs.com/remainsu/p/python-cha-xun-elasticsearch-chang-yong-fang-fa-qu.html

[3]https://blog.csdn.net/deardreaming/article/details/52813581

[4] https://www.cnblogs.com/double-orange/p/10075860.html

[5] https://elasticsearch.cn/question/8085

[6] https://zhuanlan.zhihu.com/p/95163799

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐