最近这段时间被es所困扰,庆幸最终学会了python操作es,怕以后忘记了,把代码放出来:

1.python读取es数据

from elasticsearch7 import Elasticsearch, helpers


def read_es(host, port, index, query=""):
    # url = {"host": host, "port": port, "timeout": 1500}
    es = Elasticsearch(['url'],
                       http_auth=('user', 'password'),
                       sniff_on_start=False,
                       sniff_on_connection_fail=True,
                       sniffer_timeout=600,
                       sniff_timeout=50,
                       request_timeout=5000)
    print(11111)
    # if es.ping():
    #     print("Successfully connect!")
    # else:
    #     print("Failed.....")
    #     exit()
    if query == "":  # query为es的搜索条件
        query = {
            "query": {
                "match_all": {}
            },
            # "size":1000
        }
    # query = {
    #     "query": {
    #         "term": {
    #             "uid": {
    #                 "value": "1234"
    #             }
    #         }
    #
    #     }
    # }
    res = helpers.scan(es, index=index, scroll="20m", query=query)
    return res


data = read_es("duankou", '9200', "table_name", query="")
print(data)
for i in data:
    print(i)  # i是一个字典类型

2.python删除es数据:


from elasticsearch7 import Elasticsearch
from elasticsearch7.helpers import bulk

def tmp_update_if():
    es = Elasticsearch([url],
                       http_auth=('user', 'password'),
                       sniff_on_start=False,
                       sniff_on_connection_fail=True,
                       sniffer_timeout=600,
                       sniff_timeout=50,
                       request_timeout=5000)
    ids = ['1401',
    '1402',
    '1403',
    '1404',
    '1405',
    '1406']
    query={
        "query":{
            "terms":{
                "_id":ids
            }
        }
    }

    es.delete_by_query(index='dm_msd_foundation', body=query)
    print('---- finish -')
if __name__=="__main__":
    tmp_update_if()

3.python读取txt文档,再写入数据:

import json
from elasticsearch7 import Elasticsearch
from elasticsearch7.helpers import bulk
actions=[]
fi_data=[]
def get_data():
    f=open('test.txt',encoding='utf-8')
    f=list(f)
    info=[]
    if len(f)==237:
       b=len(f)-1
    else:
        b=len(f)
    for i in range(b):
        # print(f[i])
        a=f[i].split('\t')
        a[-1]=a[-1].replace('\n','')
        info.append(a)
    print(len(info))
    for i in range(1,b):
        sub_data = {}
        key_name=info[0]
        data=info[i]
        # print(key_name)
        # print(data)
        for j in range(len(data)):
            sub_data[key_name[j]]=data[j]
        fi_data.append(sub_data)
    print(fi_data)
    return fi_data


def write_es(fi_data):
    es = Elasticsearch([url],
                       http_auth=('user', 'password'),
                       sniff_on_start=False,
                       sniff_on_connection_fail=True,
                       sniffer_timeout=600,
                       sniff_timeout=50,
                       request_timeout=5000)
    for elem in fi_data:
        action = {
            "_index": "dm_msd_foundation",
            "_type": "index",
            "_id": elem["id"],  # 将id作为索引的唯一标志
            "_source": elem
        }
        actions.append(action)
    bulk(es, actions, index="dm_msd_foundation", raise_on_error=True)


if __name__=="__main__":
    write_es(get_data())

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐