面对大量数据时,不能一次性返回所有数据,也不能同时使用from、size多次返回数据,会对服务器造成较大压力,可以使用scroll轻松解决。

示例代码:   【grade库中共有数据1000*10000条数据】

from elasticsearch import Elasticsearch
import time

es = Elasticsearch(hosts='http://127.0.0.1:9200')

query = {
    "query": {
        "bool": {
            "must": [
                {"match": {
                    "name": "张三"
                }
                }
            ],
            "filter": [
                {"range": {
                    "id": {
                        "gte": 0
                    }
                }}
            ]
        }
    },
    "size": 50000
}
start_time = time.time()

res = es.search(index='grade', scroll='1m', body=query)
sid = res['_scroll_id']
sid_list = [sid]
scroll_size_max = res['hits']['total']['value']
count = 0
print(scroll_size_max)

save_data = []
while count < scroll_size_max:
    for data in res['hits']['hits']:
        print(count, data)
        save_data.append(data['_source'])
        count += 1
    res = es.scroll(scroll_id=sid, scroll='2m')
    # print(sid)
    # print(res)
    sid = res['_scroll_id']
    # print(sid)
    sid_list.append(sid)
print(sid_list)
# for sid_del in sid_list[1]:
#     print(sid_del)
#     es.clear_scroll(scroll_id=sid_del)
#     print(1)

print(save_data[-10:])
print(len(save_data))
end_time = time.time()
print(f"耗时:{end_time - start_time}")

运行结果:

当上述代码中使用es.clear_scroll(scroll_id=sid_del)删除游标时,可能遇到下述问题。

 elasticsearch.exceptions.NotFoundError: NotFoundError(404, '{"succeeded":true,"num_freed":0}')

上述错误是scroll_id进行了多次删除,当删除一次时运行就正常了。

示例代码:

from elasticsearch import Elasticsearch
import time

es = Elasticsearch(hosts='http://127.0.0.1:9200')

query = {
    "query": {
        "bool": {
            "must": [
                {"match": {
                    "name": "张三"
                }
                }
            ],
            "filter": [
                {"range": {
                    "id": {
                        "gte": 0
                    }
                }}
            ]
        }
    },
    "size": 50000
}
start_time = time.time()

res = es.search(index='grade', scroll='1m', body=query)
sid = res['_scroll_id']
scroll_size_max = res['hits']['total']['value']
count = 0
print(scroll_size_max)

save_data = []
while count < scroll_size_max:
    for data in res['hits']['hits']:
        print(count, data)
        save_data.append(data['_source'])
        count += 1
    res = es.scroll(scroll_id=sid, scroll='2m')
    sid = res['_scroll_id']
    # print(sid)

# 清除scroll_id
es.clear_scroll(scroll_id=sid)

print(save_data[-10:])
print(len(save_data))
end_time = time.time()
print(f"耗时:{end_time - start_time}")

运行结果:

面对大量数据,除了使用scroll(),还可以使用scan()方法,详见博文: python在es中scan()用法详解_IT之一小佬的博客-CSDN博客

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐