1、大批量导入的问题

ES大批量导入,一般默认为10条,设置size的话,一般最大也是1w条,如果几十万以上,那么效率就很低了,所以需要scroll方法进行导入。

2、导入的代码示例


import os

import pymysql
from elasticsearch import Elasticsearch
import datetime
import requests
import json


class DBHelper:
    def __init__(self, dbName, env):
        if env == "dev":
            self.host = "192.168.1.19"
            self.port = 3306
            self.user = "root"
            self.password = "password"
            self.name = dbName
        else:
            self.host = "test.ali"
            self.port = 3306
            self.user = "clod"
            self.password = "passowrd"
            self.name = dbName

    def db_config(self):
        config = {
            'host': self.host,
            'port': self.port,
            'user': self.user,
            'password': self.password,
            'db': self.name,
            'charset': 'utf8mb4',
            'cursorclass': pymysql.cursors.DictCursor,
        }
        return config

    def connectionbase(self, config):
        connection = pymysql.connect(**config)

        return connection


def to_mysql(db, question, answer, env):
    db = DBHelper(db, env)
    connection = db.connectionbase(db.db_config())
    sql = "INSERT INTO library(question,answer,create_time) VALUES('" + str(question) + "','" + str(
        answer) + "','" + str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + "')"
    with connection.cursor() as cursor:
        cursor.execute(sql)
        connection.commit()
        cursor.close()
    connection.close()


def del_mysql(db, env):
    db = DBHelper(db, env)
    connection = db.connectionbase(db.db_config())
    sql = "truncate table library"
    with connection.cursor() as cursor:
        cursor.execute(sql)
        connection.commit()
        cursor.close()
    connection.close()


def get_all_es_data(es):
    result = []
    hits_all = []
    data = es.search(index="index", doc_type="medical", scroll='10m', timeout='10s', size=10000)
    hits = data["hits"]["hits"]
    hits_all.extend(hits)
    if not hits:
        print("result is empty!")
    scroll_id = data["_scroll_id"]
    total = data["hits"]["total"]
    print("ES中共计读取数据量:{}".format(total))
    page = total if int(total / 10000) == 0 else int(total / 10000)
    for i in range(page):
        res = es.scroll(scroll_id=scroll_id, scroll='10m')
        hitscroll = res["hits"]["hits"]
        if  len(hitscroll) > 0:
            hits_all.extend(hitscroll)
    for hit in hits_all:
        source = hit["_source"]
        result.append(source)
    print("已获取数据量:{}".format(len(result)))
    return result


def read_es_to_mysql(host, port, user, password, env):
    if user is None or password is None:
        es = Elasticsearch(hosts="192.168.1.10", port=9203)
        result = get_all_es_data(es)
        start_idx = 0
        cnt = len(result)
        del_mysql("pla", env)
        for source in result:
            question = source["question"]
            answer = source["answer"]
            if question is None or answer is None or len(question) == 0 or len(answer) == 0:
                continue
            question = question.replace(",", ",")
            answer = answer.replace(",", ",")
            to_mysql("plat", question, answer, env)
            start_idx += 1
            if start_idx % 1000 == 0:
                print("数据库写入进度:{:.2f}%".format(start_idx * 100.0 / cnt))
    else:
        es = Elasticsearch(hosts=host, port=port, http_auth=(user, password))
        result = get_all_es_data(es)
        start_idx = 0
        cnt = len(result)
        del_mysql("plat", env)
        for source in result:
            question = source["question"]
            answer = source["answer"]
            if question is None or answer is None or len(question) == 0 or len(answer) == 0:
                continue
            question = question.replace(",", ",").replace("'","")
            answer = answer.replace(",", ",").replace("'","")
            to_mysql("pla", question, answer, env)
            start_idx += 1
            if start_idx % 1000 == 0:
                print("数据库写入进度:{:.2f}%".format(start_idx * 100.0 / cnt))


if __name__ == '__main__':
    #read_es_to_mysql(None, None, None, None, "dev")
    read_es_to_mysql("es-cn-com", "9200", "clod", "password", "prod")
Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐