python执行sql 语句

数据etl 过程中会涉及到调度,也就是每天要定时执行的任务,这些任务执行过程中其实是通过底层的脚本代码来进行数据的清洗转换等处理的。而脚本代码中肯定会涉及到调用sql 语句的情况,最近项目正好用到python 脚本调用sql 进行数据处理的需求。

直接上代码

import configparser
import pymysql

# 读取配置文件
conf = configparser.RawConfigParser()
conf.read("D:\PycharmProjects\economic_relation\\venv\Include\control\conf.ini")
# 获取源数据库参数
sourceDBUrl  = str(conf.get('soureDB', 'sourcedburl'))
sourceDBUser = str(conf.get('soureDB', 'sourcedbuser'))
sourceDBKey =  str(conf.get('soureDB', 'sourcedbkey'))
sourceDataBse = str(conf.get('soureDB', 'sourcedatabse'))

#链接源数据库
conn = pymysql.connect(host=sourceDBUrl, user=sourceDBUser, passwd=sourceDBKey, db=sourceDataBse, charset='utf8')
cur = conn.cursor(cursor=pymysql.cursors.DictCursor)

#读取sql文件
# file=open("D:\PycharmProjects\economic_relation\\venv\Include\sql\\test_create",mode='r',encoding='utf-8')
sql_path="D:\PycharmProjects\economic_relation\\venv\Include\sql\\create_table"

def execute_sql_file(sql_path):
    with open(sql_path,'r+',encoding='utf8') as f:
        # 将sql 用; 分割 并多行合并成一行
        sql_list = f.read().split(';')[:-1]
        sql_list = [x.replace('\n', ' ') if '\n' in x else x for x in sql_list]
        #执行每一条sql
    for sql_item  in sql_list:
        print ("sql语句:",sql_item)
        try:
            effect_row = cur.execute(sql_item)
            conn.commit()
            print('effect rows is {}'.format(effect_row))
        except Exception as e:
            print("error:",e)
#执行
execute_sql_file(sql_path)
#关闭资源
cur.close()
conn.close()

配置文件

在这里插入图片描述

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐