![cover](https://img-blog.csdnimg.cn/1645a7ea90f644e2be19965a050378fe.png)
Python封装MySQL数据库操作(pymysql)
Python操作MySQL的封装
·
Python封装MySQL数据库操作(pymysql)
# 连接MySQL
class DbManager(object):
# 构造函数
def __init__(self):
self.conn = None
self.cur = None
self.POOL = PooledDB(
creator=pymysql,
maxconnections=20, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=5, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=0, # 链接池中最多闲置的链接,0和None不限制
maxusage=1, # 一个链接最多被重复使用的次数,None表示无限制
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
setsession=[],
host="your host",
port="your port",
user="your user",
password="your password",
database="your db",
charset="utf8",
)
# 连接数据库
def connectDatabase(self):
try:
self.conn = self.POOL.connection()
self.cur = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
return True
except:
Logger().logger.error("connectDatabase failed")
return False
# 关闭数据库
def close(self):
if self.conn and self.cur:
self.cur.close()
self.conn.close()
return True
# 基本的执行SQL方法,下面几乎都调用这个
def execute(self, sql, params=None, exe_many=False):
res = self.connectDatabase()
if not res:
return False
cnt = 0
try:
if self.conn and self.cur:
if exe_many:
cnt = self.cur.executemany(sql, params)
else:
cnt = self.cur.execute(sql, params)
self.conn.commit()
except Exception as e:
Logger().logger.error("execute failed: " + sql)
Logger().logger.error(str(e) + "\n\n")
return False
self.close()
return cnt
################################################################
################ 以下为封装好的执行方法:表、字段方式 ################
################################################################
# 新增并返回新增ID
def table_insert(self, **kwargs):
"""
table:必填,表名,如:table="test_table"
data :必填,更新数据,字典类型,如:data={"aaa": "666'6", "bbb": "888"}
"""
table = kwargs["table"]
data = kwargs["data"]
sql = "insert into %s (" % table
fields = ""
values = []
flag = ""
for k, v in data.items():
fields += "%s," % k
values.append(str(v))
flag += "%s,"
fields = fields.rstrip(",")
values = tuple(values)
flag = flag.rstrip(",")
sql += fields + ") values (" + flag + ");"
Logger().logger.info("sql:\n{} [{}]\n".format(sql, values))
try:
self.execute(sql, values)
# 获取自增id
res = self.cur.lastrowid
return res
except:
self.conn.rollback()
# 修改数据并返回影响的行数
def table_update(self, **kwargs):
"""
table:必填,表名,如:table="test_table"
data :必填,更新数据,字典类型,如:data={"aaa": "666'6", "bbb": "888"}
where:必填,更新条件,字典类型用于=,如:where={"aaa": 333, "bbb": 2};字符串类型用于非等于判断,如:where="aaa>=333"
"""
table = kwargs["table"]
data = kwargs["data"]
where = kwargs["where"]
sql = "update %s set " % table
values = []
for k, v in data.items():
sql += "{}=%s,".format(k)
values.append(str(v))
sql = sql.rstrip(",")
sql += " where 1=1 "
if type(where) == dict:
for k, v in where.items():
sql += " and {} in (%s)".format(k)
values.append(str(v))
elif type(where) == str:
sql += " and %s" % where
sql += ";"
values = tuple(values)
Logger().logger.info("sql:\n{} [{}]\n".format(sql, values))
try:
self.execute(sql, values)
rowcount = self.cur.rowcount
return rowcount
except:
self.conn.rollback()
# 删除并返回影响行数
def table_delete(self, **kwargs):
"""
table:必填,表名,如:table="test_table"
where:必填,删除条件,字典类型用于=,如:where={"aaa": 333, "bbb": 2};字符串类型用于非等于判断,如:where="aaa>=333"
"""
table = kwargs["table"]
where = kwargs["where"]
sql = "delete from %s where 1=1" % (table)
values = []
if type(where) == dict:
for k, v in where.items():
sql += " and {} in (%s)".format(k)
values.append(str(v))
elif type(where) == str:
sql += " and %s" % where
sql += ";"
values = tuple(values)
Logger().logger.info("sql:\n{} [{}]\n".format(sql, values))
try:
self.execute(sql, values)
rowcount = self.cur.rowcount
return rowcount
except:
self.conn.rollback()
# 查一条数据
def table_select_one(self, **kwargs):
"""
table:必填,表名,如:table="test_table"
where:必填,查询条件,字典类型用于=,如:where={"aaa": 333, "bbb": 2};字符串类型用于非等于判断,如:where="aaa>=333"
field: 非必填,查询列名,字符串类型,如:field="aaa, bbb",不填默认*
order: 非必填,排序字段,字符串类型,如:order="ccc"
sort: 非必填,排序方式,字符串类型,如:sort="asc"或者"desc",不填默认asc
"""
table = kwargs["table"]
field = "field" in kwargs and kwargs["field"] or "*"
where = kwargs["where"]
order = "order" in kwargs and "order by " + kwargs["order"] or ""
sort = kwargs.get("sort", "asc")
if order == "":
sort = ""
sql = "select %s from %s where 1=1 " % (field, table)
values = []
if type(where) == dict:
for k, v in where.items():
sql += " and {} in (%s)".format(k)
values.append(str(v))
elif type(where) == str:
sql += " and %s" % where
sql += " %s %s limit 1;" % (order, sort)
values = tuple(values)
Logger().logger.info("sql:\n{} [{}]\n".format(sql, values))
try:
self.execute(sql, values)
data = self.cur.fetchone()
return data
except:
self.conn.rollback()
# 查批量数据
def table_select_many(self, **kwargs):
"""
table:必填,表名,如:table="test_table"
where:必填,查询条件,字典类型用于=,如:where={"aaa": 333, "bbb": 2};字符串类型用于非等于判断,如:where="aaa>=333"
field: 非必填,查询列名,字符串类型,如:field="aaa, bbb",不填默认*
order: 非必填,排序字段,字符串类型,如:order="ccc"
sort: 非必填,排序方式,字符串类型,如:sort="asc"或者"desc",不填默认asc
offset:非必填,偏移量,如翻页,不填默认0
limit: 非必填,条数,不填默认100
"""
table = kwargs["table"]
field = "field" in kwargs and kwargs["field"] or "*"
order = "order" in kwargs and "order by " + kwargs["order"] or ""
sort = kwargs.get("sort", "asc")
if order == "":
sort = ""
where = kwargs["where"]
offset = kwargs.get("offset", 0)
limit = kwargs.get("limit", 100)
sql = "select %s from %s where 1=1 " % (field, table)
values = []
if type(where) == dict:
for k, v in where.items():
sql += " and {} in (%s)".format(k)
values.append(str(v))
elif type(where) == str:
sql += " and %s" % where
values = tuple(values)
sql += " %s %s limit %s, %s;" % (order, sort, offset, limit)
Logger().logger.info("sql:\n{} [{}]\n".format(sql, values))
try:
self.execute(sql, values)
data = self.cur.fetchall()
return data
except:
self.conn.rollback()
# 查条数
def table_count(self, **kwargs):
"""
table:必填,表名,如:table="test_table"
where:必填,查询条件,字典类型用于=,如:where={"aaa": 333, "bbb": 2};字符串类型用于非等于判断,如:where="aaa>=333"
"""
table = kwargs["table"]
where = kwargs["where"]
sql = "select count(1) as count from %s where 1=1 " % (table)
values = []
if type(where) == dict:
for k, v in where.items():
sql += " and {} in (%s)".format(k)
values.append(str(v))
elif type(where) == str:
sql += " and %s;" % where
values = tuple(values)
Logger().logger.info("sql:\n{} [{}]\n".format(sql, values))
try:
self.execute(sql, values)
data = self.cur.fetchone()
return data
except:
self.conn.rollback()
if __name__ == "__main__":
# 示例
mysqldb = DbManager()
# ###################### ↓↓↓ 表、字段方式操作示例 ↓↓↓ ######################
# 插入数据
i = mysqldb.table_insert(table="test_table", data={"aaa": '123"456', "bbb": "987'654", "ccc": 666})
print("自增ID:", i)
# 更新数据
c = mysqldb.table_update(table="test_table", data={"aaa": "666'6", "bbb": "888"}, where={"ccc": 3})
print("更新行数:", c)
# 删除数据
c = mysqldb.table_delete(table="test_table", where={"aaa": "666'6", "bbb": 777})
print("删除行数:", c)
# 查询一条
s = mysqldb.table_select_one(table="test_table", field="aaa, bbb", where={"aaa": 333, "bbb": 1}, order="ccc", sort="asc")
print(s)
# 批量查询,默认查询100条
# 完整参数示例
l = mysqldb.table_select_many(table="test_table", field="aaa, bbb", where="aaa>333", order="ccc", sort="desc", offset=25, limit=20)
print(l)
# 必填参数示例
l = mysqldb.table_select_many(table="test_table", field="aaa, bbb", where={"aaa": 333, "bbb": 2})
print(l)
# 查条数
count = mysqldb.table_count(table="test_table", where={"aaa": 333, "bbb": 2})
print(count)
更多推荐
所有评论(0)