ES数据大批量导入MySQL
import osimport pymysqlfrom elasticsearch import Elasticsearchimport datetimeimport requestsimport jsonclass DBHelper:def __init__(self, dbName, env):if env == "dev":self.host = "192.168.1.19"self.por
·
1、大批量导入的问题
ES大批量导入,一般默认为10条,设置size的话,一般最大也是1w条,如果几十万以上,那么效率就很低了,所以需要scroll方法进行导入。
2、导入的代码示例
import os
import pymysql
from elasticsearch import Elasticsearch
import datetime
import requests
import json
class DBHelper:
def __init__(self, dbName, env):
if env == "dev":
self.host = "192.168.1.19"
self.port = 3306
self.user = "root"
self.password = "password"
self.name = dbName
else:
self.host = "test.ali"
self.port = 3306
self.user = "clod"
self.password = "passowrd"
self.name = dbName
def db_config(self):
config = {
'host': self.host,
'port': self.port,
'user': self.user,
'password': self.password,
'db': self.name,
'charset': 'utf8mb4',
'cursorclass': pymysql.cursors.DictCursor,
}
return config
def connectionbase(self, config):
connection = pymysql.connect(**config)
return connection
def to_mysql(db, question, answer, env):
db = DBHelper(db, env)
connection = db.connectionbase(db.db_config())
sql = "INSERT INTO library(question,answer,create_time) VALUES('" + str(question) + "','" + str(
answer) + "','" + str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + "')"
with connection.cursor() as cursor:
cursor.execute(sql)
connection.commit()
cursor.close()
connection.close()
def del_mysql(db, env):
db = DBHelper(db, env)
connection = db.connectionbase(db.db_config())
sql = "truncate table library"
with connection.cursor() as cursor:
cursor.execute(sql)
connection.commit()
cursor.close()
connection.close()
def get_all_es_data(es):
result = []
hits_all = []
data = es.search(index="index", doc_type="medical", scroll='10m', timeout='10s', size=10000)
hits = data["hits"]["hits"]
hits_all.extend(hits)
if not hits:
print("result is empty!")
scroll_id = data["_scroll_id"]
total = data["hits"]["total"]
print("ES中共计读取数据量:{}".format(total))
page = total if int(total / 10000) == 0 else int(total / 10000)
for i in range(page):
res = es.scroll(scroll_id=scroll_id, scroll='10m')
hitscroll = res["hits"]["hits"]
if len(hitscroll) > 0:
hits_all.extend(hitscroll)
for hit in hits_all:
source = hit["_source"]
result.append(source)
print("已获取数据量:{}".format(len(result)))
return result
def read_es_to_mysql(host, port, user, password, env):
if user is None or password is None:
es = Elasticsearch(hosts="192.168.1.10", port=9203)
result = get_all_es_data(es)
start_idx = 0
cnt = len(result)
del_mysql("pla", env)
for source in result:
question = source["question"]
answer = source["answer"]
if question is None or answer is None or len(question) == 0 or len(answer) == 0:
continue
question = question.replace(",", ",")
answer = answer.replace(",", ",")
to_mysql("plat", question, answer, env)
start_idx += 1
if start_idx % 1000 == 0:
print("数据库写入进度:{:.2f}%".format(start_idx * 100.0 / cnt))
else:
es = Elasticsearch(hosts=host, port=port, http_auth=(user, password))
result = get_all_es_data(es)
start_idx = 0
cnt = len(result)
del_mysql("plat", env)
for source in result:
question = source["question"]
answer = source["answer"]
if question is None or answer is None or len(question) == 0 or len(answer) == 0:
continue
question = question.replace(",", ",").replace("'","")
answer = answer.replace(",", ",").replace("'","")
to_mysql("pla", question, answer, env)
start_idx += 1
if start_idx % 1000 == 0:
print("数据库写入进度:{:.2f}%".format(start_idx * 100.0 / cnt))
if __name__ == '__main__':
#read_es_to_mysql(None, None, None, None, "dev")
read_es_to_mysql("es-cn-com", "9200", "clod", "password", "prod")
更多推荐
已为社区贡献2条内容
所有评论(0)