Python封装MySQL数据库操作(pymysql)


# 连接MySQL
class DbManager(object):
    # 构造函数
    def __init__(self):
        self.conn = None
        self.cur = None
        self.POOL = PooledDB(
            creator=pymysql,
            maxconnections=20,  # 连接池允许的最大连接数,0和None表示不限制连接数
            mincached=5,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
            maxcached=0,  # 链接池中最多闲置的链接,0和None不限制
            maxusage=1,  # 一个链接最多被重复使用的次数,None表示无限制
            blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
            setsession=[],
            host="your host",
            port="your port",
            user="your user",
            password="your password",
            database="your db",
            charset="utf8",
        )

    # 连接数据库
    def connectDatabase(self):
        try:
            self.conn = self.POOL.connection()
            self.cur = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
            return True
        except:
            Logger().logger.error("connectDatabase failed")
            return False

    # 关闭数据库
    def close(self):
        if self.conn and self.cur:
            self.cur.close()
            self.conn.close()
        return True

    # 基本的执行SQL方法,下面几乎都调用这个
    def execute(self, sql, params=None, exe_many=False):
        res = self.connectDatabase()
        if not res:
            return False
        cnt = 0
        try:
            if self.conn and self.cur:
                if exe_many:
                    cnt = self.cur.executemany(sql, params)
                else:
                    cnt = self.cur.execute(sql, params)
                self.conn.commit()
        except Exception as e:
            Logger().logger.error("execute failed: " + sql)
            Logger().logger.error(str(e) + "\n\n")
            return False
        self.close()
        return cnt

    ################################################################
    ################ 以下为封装好的执行方法:表、字段方式 ################
    ################################################################
    # 新增并返回新增ID
    def table_insert(self, **kwargs):
        """
        table:必填,表名,如:table="test_table"
        data :必填,更新数据,字典类型,如:data={"aaa": "666'6", "bbb": "888"}
        """
        table = kwargs["table"]
        data = kwargs["data"]
        sql = "insert into %s (" % table
        fields = ""
        values = []
        flag = ""
        for k, v in data.items():
            fields += "%s," % k
            values.append(str(v))
            flag += "%s,"
        fields = fields.rstrip(",")
        values = tuple(values)
        flag = flag.rstrip(",")
        sql += fields + ") values (" + flag + ");"
        Logger().logger.info("sql:\n{} [{}]\n".format(sql, values))
        try:
            self.execute(sql, values)
            # 获取自增id
            res = self.cur.lastrowid
            return res
        except:
            self.conn.rollback()

    # 修改数据并返回影响的行数
    def table_update(self, **kwargs):
        """
        table:必填,表名,如:table="test_table"
        data :必填,更新数据,字典类型,如:data={"aaa": "666'6", "bbb": "888"}
        where:必填,更新条件,字典类型用于=,如:where={"aaa": 333, "bbb": 2};字符串类型用于非等于判断,如:where="aaa>=333"
        """
        table = kwargs["table"]
        data = kwargs["data"]
        where = kwargs["where"]
        sql = "update %s set " % table
        values = []
        for k, v in data.items():
            sql += "{}=%s,".format(k)
            values.append(str(v))
        sql = sql.rstrip(",")
        sql += " where 1=1 "
        if type(where) == dict:
            for k, v in where.items():
                sql += " and {} in (%s)".format(k)
                values.append(str(v))
        elif type(where) == str:
            sql += " and %s" % where
        sql += ";"
        values = tuple(values)
        Logger().logger.info("sql:\n{} [{}]\n".format(sql, values))
        try:
            self.execute(sql, values)
            rowcount = self.cur.rowcount
            return rowcount
        except:
            self.conn.rollback()

    # 删除并返回影响行数
    def table_delete(self, **kwargs):
        """
        table:必填,表名,如:table="test_table"
        where:必填,删除条件,字典类型用于=,如:where={"aaa": 333, "bbb": 2};字符串类型用于非等于判断,如:where="aaa>=333"
        """
        table = kwargs["table"]
        where = kwargs["where"]
        sql = "delete from %s where 1=1" % (table)
        values = []
        if type(where) == dict:
            for k, v in where.items():
                sql += " and {} in (%s)".format(k)
                values.append(str(v))
        elif type(where) == str:
            sql += " and %s" % where
        sql += ";"
        values = tuple(values)
        Logger().logger.info("sql:\n{} [{}]\n".format(sql, values))
        try:
            self.execute(sql, values)
            rowcount = self.cur.rowcount
            return rowcount
        except:
            self.conn.rollback()

    # 查一条数据
    def table_select_one(self, **kwargs):
        """
        table:必填,表名,如:table="test_table"
        where:必填,查询条件,字典类型用于=,如:where={"aaa": 333, "bbb": 2};字符串类型用于非等于判断,如:where="aaa>=333"
        field: 非必填,查询列名,字符串类型,如:field="aaa, bbb",不填默认*
        order: 非必填,排序字段,字符串类型,如:order="ccc"
        sort:  非必填,排序方式,字符串类型,如:sort="asc"或者"desc",不填默认asc
        """
        table = kwargs["table"]
        field = "field" in kwargs and kwargs["field"] or "*"
        where = kwargs["where"]
        order = "order" in kwargs and "order by " + kwargs["order"] or ""
        sort = kwargs.get("sort", "asc")
        if order == "":
            sort = ""
        sql = "select %s from %s where 1=1 " % (field, table)
        values = []
        if type(where) == dict:
            for k, v in where.items():
                sql += " and {} in (%s)".format(k)
                values.append(str(v))
        elif type(where) == str:
            sql += " and %s" % where
        sql += " %s %s limit 1;" % (order, sort)
        values = tuple(values)
        Logger().logger.info("sql:\n{} [{}]\n".format(sql, values))
        try:
            self.execute(sql, values)
            data = self.cur.fetchone()
            return data
        except:
            self.conn.rollback()

    # 查批量数据
    def table_select_many(self, **kwargs):
        """
        table:必填,表名,如:table="test_table"
        where:必填,查询条件,字典类型用于=,如:where={"aaa": 333, "bbb": 2};字符串类型用于非等于判断,如:where="aaa>=333"
        field: 非必填,查询列名,字符串类型,如:field="aaa, bbb",不填默认*
        order: 非必填,排序字段,字符串类型,如:order="ccc"
        sort:  非必填,排序方式,字符串类型,如:sort="asc"或者"desc",不填默认asc
        offset:非必填,偏移量,如翻页,不填默认0
        limit: 非必填,条数,不填默认100
        """
        table = kwargs["table"]
        field = "field" in kwargs and kwargs["field"] or "*"
        order = "order" in kwargs and "order by " + kwargs["order"] or ""
        sort = kwargs.get("sort", "asc")
        if order == "":
            sort = ""
        where = kwargs["where"]
        offset = kwargs.get("offset", 0)
        limit = kwargs.get("limit", 100)
        sql = "select %s from %s where 1=1 " % (field, table)
        values = []
        if type(where) == dict:
            for k, v in where.items():
                sql += " and {} in (%s)".format(k)
                values.append(str(v))
        elif type(where) == str:
            sql += " and %s" % where
        values = tuple(values)
        sql += " %s %s limit %s, %s;" % (order, sort, offset, limit)
        Logger().logger.info("sql:\n{} [{}]\n".format(sql, values))
        try:
            self.execute(sql, values)
            data = self.cur.fetchall()
            return data
        except:
            self.conn.rollback()

    # 查条数
    def table_count(self, **kwargs):
        """
        table:必填,表名,如:table="test_table"
        where:必填,查询条件,字典类型用于=,如:where={"aaa": 333, "bbb": 2};字符串类型用于非等于判断,如:where="aaa>=333"
        """
        table = kwargs["table"]
        where = kwargs["where"]
        sql = "select count(1) as count from %s where 1=1 " % (table)
        values = []
        if type(where) == dict:
            for k, v in where.items():
                sql += " and {} in (%s)".format(k)
                values.append(str(v))
        elif type(where) == str:
            sql += " and %s;" % where
        values = tuple(values)
        Logger().logger.info("sql:\n{} [{}]\n".format(sql, values))
        try:
            self.execute(sql, values)
            data = self.cur.fetchone()
            return data
        except:
            self.conn.rollback()

if __name__ == "__main__":
    # 示例
    mysqldb = DbManager()
    # ###################### ↓↓↓ 表、字段方式操作示例 ↓↓↓ ######################
    # 插入数据
    i = mysqldb.table_insert(table="test_table", data={"aaa": '123"456', "bbb": "987'654", "ccc": 666})
    print("自增ID:", i)

    # 更新数据
    c = mysqldb.table_update(table="test_table", data={"aaa": "666'6", "bbb": "888"}, where={"ccc": 3})
    print("更新行数:", c)

    # 删除数据
    c = mysqldb.table_delete(table="test_table", where={"aaa": "666'6", "bbb": 777})
    print("删除行数:", c)

    # 查询一条
    s = mysqldb.table_select_one(table="test_table", field="aaa, bbb", where={"aaa": 333, "bbb": 1}, order="ccc", sort="asc")
    print(s)

    # 批量查询,默认查询100条
    # 完整参数示例
    l = mysqldb.table_select_many(table="test_table", field="aaa, bbb", where="aaa>333", order="ccc", sort="desc", offset=25, limit=20)
    print(l)
    # 必填参数示例
    l = mysqldb.table_select_many(table="test_table", field="aaa, bbb", where={"aaa": 333, "bbb": 2})
    print(l)

    # 查条数
    count = mysqldb.table_count(table="test_table", where={"aaa": 333, "bbb": 2})
    print(count)
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐