用时 sqlalchemy与 Pandas read_sql_query(query, con)方法,它将创建一个 SQLDatabase具有属性的对象 connectable至 self.connectable.execute(query) .和 SQLDatabase.connectable被初始化为 con只要it is an instance of sqlalchemy.engine.Connectable (即 Engine and Connection )。

案例一:路过时Engine对象为 con
正如您的问题中的示例代码:

from sqlalchemy import create_engine
import pandas as pd
engine = create_engine('...')
df = pd.read_sql_query(query, con=engine)

在内部,pandas 只使用 result = engine.execute(query) , which means :

Where above, the execute() method acquires a new Connection on its own, executes the statement with that object, and returns the ResultProxy. In this case, the ResultProxy contains a special flag known as close_with_result, which indicates that when its underlying DBAPI cursor is closed, the Connection object itself is also closed, which again returns the DBAPI connection to the connection pool, releasing transactional resources.



在这种情况下,您不必担心 Connection自身,自动关闭,但会保留engine的连接池.

因此,您可以使用以下方法禁用池:
engine = create_engine('...', poolclass=NullPool)

或 dispose引擎完全带有 engine.dispose()在末尾。

但是关注了Engine Disposal doc (the last paragraph) ,这两个是可选的,您不必同时使用它们。所以在这种情况下,对于 read_sql_query 的简单一次性使用和清理,我认为这应该足够了:
# Clean up entirely after every query.
engine = create_engine('...')
df = pd.read_sql_query(query, con=engine)
engine.dispose()

案例二:路过时Connection对象为 con :
connection = engine.connect()
print(connection.closed) # False
df = pd.read_sql_query(query, con=connection)
print(connection.closed) # False again
# do_something_else(connection)
connection.close()
print(connection.closed) # True
engine.dispose()

You should do this whenever you want greater control over attributes of the connection, when it gets closed, etc. For example, a very import example of this is a Transaction, which lets you decide when to commit your changes to the database. (from this answer)



但是对于 Pandas ,我们无法控制 read_sql_query ,connection的唯一用处是它允许您在我们明确关闭它之前做更多有用的事情。

所以一般来说:

我想我想使用以下模式,这使我可以更好地控制连接并留下 future 的可扩展性:
engine = create_engine('...')
# Context manager makes sure the `Connection` is closed safely and implicitly
with engine.connect() as conn:
    df = pd.read_sql_query(query, conn)
    print(conn.in_transaction()) # False
    # do_something_with(conn)
    trans = conn.begin()
    print(conn.in_transaction()) # True
    # do_whatever_with(trans)
    print(conn.closed) # False
print('Is Connection with-OUT closed?', conn.closed) # True
engine.dispose()

但是对于简单的用例,例如您的示例代码,我认为这两种方法对于清理 DB IO 资源都同样干净和简单。
一、工具
需要使用到的库

import pandas as pd
import pymysql
from sqlalchemy import create_engine
二、建立数据库连接
使用pymysql库
# 建立数据库表连接
conn = pymysql.connect('localhost','user_name','password','database_name',charset='utf8')
# 关闭连接
conn.close()
使用sqlalchemyl库
# 建立数据库表连接
db = create_engine('dialect+driver://user_name:password@localhost:port/database')
conn = db.connect()
# 关闭连接
conn.close()
db.dispose()
三、读取数据到DataFrame
pandas.read_sql_query函数
# pd.read_sql_query(sql, con, index_col=None, coerce_float=True, params=None, parse_dates=None, chunksize=None)
# sql查询语句
sql_query = "select * from <table_name>;"
# 将数据读取到DataFrame
df = pd.read_sql_query(sql=sql_query, con=conn)
pandas.read_sql_table函数(只适用于SQLAlchemy连接)
# 读取整个table,可以不需要sql语句
# pd.read_sql_table(table_name, con, schema=None, index_col=None, coerce_float=True, parse_dates=None, columns=None, chunksize=None)
df = pd.read_sql_table(table_name, con=conn)
pandas.read_sql函数
pd.read_sql函数综合了pd.read_sql_query和pd.read_sql_table,但只有在SQLAlchemy连接时,第一个参数才能用table名称。

# pd.read_sql(sql or table, con, index_col=None, coerce_float=True, params=None, parse_dates=None, columns=None, chunksize=None)
df = pd.read_sql(sql, con=conn)
现在通常使用的是SQLAlchemy连接池,所以直接用pd.read_sql更方便。

四、DataFrame写入MySQL
pandas.DataFrame.to_sql函数
Databases supported by SQLAlchemy [1] are supported. Tables can be newly created, appended to, or overwritten.

# 需要使用sqlalchemy连接池
# 如果数据表不存在,也可直接写入数据,不需要事前创建数据表
engine=create_engine('dialect+driver://username:password@localhost:port/database')
conn=engine.connect()
df.to_sql(name=table_name, con=conn, if_exists='fail')
if_exists参数
if_exists参数默认为fail,表示如果表table_name已存在,则不写入;其他可选值:
replace: 替代原有数据
append: 添加数据到已有的表

index参数
index默认是True,则DataFrame的index也会作为单独一列写入mysql。如果需要反复读写,建议index=False。

dtype参数
需要使用SQLAlchemy的数据类型,要提前导入:

from sqlalchemy.types import VARCHAR, Float, Integer, Date, Numeric, String, Text
五、总结
如果只是用pandas读写mysql数据,那么用sqlalchemy连接MySQL数据库会更方便直接,而且可以不需要使用sql语句。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐