前言

提示:这里可以添加本文要记录的大概内容:
例如:随着人工智能的不断发展,机器学习这门技术也越来越重要,很多人都开启了学习机器学习,本文就介绍了机器学习的基础内容。


提示:以下是本篇文章正文内容,下面案例可供参考

一、python3 操作sqlite,mysql,redis,mongodb

知识图谱,其python中对应的模块关系如下:
python3-sqlilite:sqlite3
python3-mysql:pymysql
python3-redis:redis
python3-mongodb:pymongo
为了解决python中掺入SQL语句等编程复杂度、维护等原因,python在最顶层做了ORM的封装,sqlalchemy模块。
在这里插入图片描述

二、python 标准库 sqlite3模块

1.引入库

sqlite3 python标准库,直接 import sqlite3 即可。
SQLite,是一款轻型的数据库,是遵守ACID的关系型数据库管理系统,它的设计目标是嵌入式的。所以考虑链接池子,得不偿失,暂时不做考虑。


【Sqlite3使用模板】
  • 链接数据库:conn = sqlite3.connect(db_path)
  •               数据库存储在磁盘中:conn = sqlite3.connect(db_path)
                  数据库创建在内存中:conn = sqlite3.connect(":memory:"))
  • 创建游标对象:cursor = conn.cursor()
  • 执行SQL语句:cu = cursor.execute('sql')
  •               执行一条SQl语句:cu = cursor.execute('sql')
                  执行多条SQl语句:cu = cursor.executemany('sql')
                  执行多条SQl脚本:cu = cu.executescript(f.read())
  • 获取查询结果:并将游标往后移动:list =cu.fetchall()
  •               获取一条记录:tuple =cu.fetchone()
                  获取多条记录:list=cu.fetchmany(n)
                  获取所有记录:list=cu.fetchall()
  • 关闭游标对象:cursor.close()
  • 提交数据:conn.commit()
  • 关闭数据库链接:conn.close()

2.代码示例

python代码示例:

# -*- coding:utf-8 -*-

#python3 中内置sqlite3模块

import os, sqlite3
#https://www.cnblogs.com/yuxc/archive/2011/08/18/2143606.html

current_workspace = os.path.join(os.path.dirname(os.path.abspath(__file__)))
print(current_workspace)
db_path = os.path.join(current_workspace, 'student.db')
sql_script_path = os.path.join(current_workspace, 'students_insert.sql')

#基本用法
def createStudenteTable():
    #清理数据库,测试目的
    if os.path.exists(db_path):
        os.remove(db_path)
        print("clear db!")
    else:
        pass
 ##Step1、连接数据库
    #创建在磁盘中的数据库,不存在则创建,存在则连接

    conn = sqlite3.connect(db_path)
    #创建在内存中
    # conn = sqlite3.connect(':memory:')

  ##Step2、创建游标对象
    cu = conn.cursor() #创建游标对象
  ##Step3、执行SQl语句
    create_student_sql = 'CREATE TABLE student(id int PRIMARY KEY, name varchar(128) NOT NULL,gender char(16), class varchar(128))'
    cu.execute(create_student_sql)
    print('*****************************excute successful!*****************************')

    students_info = [(100001, 'Lili', 'Man', '102'), (100002, 'Mary', 'Woman', '101')]
    students_info_sql = 'INSERT INTO student VALUES(?,?,?,?);'
    cu.executemany(students_info_sql, students_info)
    print('*****************************executemany successful!*****************************')

    with open(sql_script_path, 'r') as f:
        cu.executescript(f.read())
        print('*****************************executescript successful!*****************************')

    cusor_query = cu.execute('SELECT * FROM student;')
    print(type(cusor_query)) #<class 'sqlite3.Cursor'>
    print(cusor_query.fetchone())#return <class 'tuple'>,返回第一条记录,游标往后移动
    print(cusor_query.fetchmany())#return <class 'list'>, 返回一条记录,游标往后移动
    print(cusor_query.fetchall())#return <class 'list'>, 返回所有记录,游标往后移动
    print('*****************************excute select successful!*****************************')
    print(cusor_query.fetchall()) #游标已经到最后,返回空的列表
    

  ##Step4、关闭游标
    cu.close()
  ##Step5、提交数据
    conn.commit()
  ##Step5、关闭连接    
    conn.close()
  
#升级:解决sql执行结果判断问题
#python3中提供 try: ... catch...方法解决
def createStudentTableAdjustResult():
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    #使用 try ... catch ...捕获结果,判断执行成功
    try:
        print("Try:>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
        cu = cursor.execute('SELECT * FROM student;')
        print(cu)
    except BaseException as e:
        print("Except:>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
        print(e)
    else:
        print("Else:Get results>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
        values = cursor.fetchall()
        print(values)
    finally:
        print("Finally:>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
        cursor.close()
        conn.close()

if __name__ == '__main__':
    createStudenteTable()
    createStudentTableAdjustResult()

sql脚本:

INSERT INTO student VALUES(100003, 'Mary2', 'Woman', '102');
INSERT INTO student VALUES(100004, 'Mary3', 'Woman', '103');
INSERT INTO student VALUES(100005, 'Mary4', 'Woman', '104');
INSERT INTO student VALUES(100006, 'Mary5', 'Woman', '105');
INSERT INTO student VALUES(100007, 'Mary6', 'Woman', '106');
INSERT INTO student VALUES(100008, 'Mary7', 'Woman', '107');
INSERT INTO student VALUES(100009, 'Mary8', 'Woman', '108');
INSERT INTO student VALUES(1000010, 'Mary9', 'Woman', '109');

三、python 三方库 pymysql模块

1.引入库

pymysql python三方库,通过:pip install pymysql 。
MySQL是一个开放源码的小型关联式数据库管理系统,开发者为瑞典MySQL AB公司。MySQL被广泛地应用在Internet上的中小型网站中。由于其体积小、速度快、总体拥有成本低,尤其是开放源码这一特点,许多中小型网站为了降低网站总体拥有成本而选择了MySQL作为网站数据库。
MySql服务为单独的服务器,与程序隔离,调用的时候需要和数据库进行连接,所以请必须考虑性能问题,这就要重点关注连接池。


【pymysql使用模板】,其步骤与sqlite一样,如下为差异点
  • cu.executemany(sql,values),其中sql语句字符串中需要用 %s 占位符符号;sqlite中使用的是 ? 占位符号。
  • cu.executescript(f.read()),pymysql中没有该方法,若需要执行sql脚本,则按照文件读取的方式,excute()方法执行。
  • 针对查询语句执行结果差异:cu.execute('SELECT * FROM student;'),sqlite中返回的是【sqlite3.Cursor object at 0x00000276D43DE7A0】对象;而在pymysql中返回的是当前查询到数据记录条目数量,一个整数类型,行数。

pymysql详情查看:https://blog.csdn.net/weixin_40976261/article/details/89057633

2.代码示例

python代码示例,基本用法:

# -*- coding:utf-8 -*-

#python MySql操作——pyMySql

import os, pymysql
current_workspace = os.path.join(os.path.dirname(os.path.abspath(__file__)))
print(current_workspace)
sql_script_path = os.path.join(current_workspace, 'students_insert.sql')

#pymysql 基本用法
def create_student_table():
    conn = pymysql.Connect(
        host='localhost',
        user='root',
        passwd='111111',
        db='demo',
        port=3306,
        charset='utf8',
    )
    print(conn)

    cu = conn.cursor()
    print(cu)
    ##清空数据表格
    cu.execute('DELETE FROM student;')
    print('clear student successful!')

    ##插入数据:特别注意,pymsql中使用%s占位符;sqlite中使用?占位符
    students_info = [(1000010, 'Lili', 'Man', '102'), (1000011, 'Mary', 'Woman', '101')]
    students_info_sql = 'INSERT INTO student VALUES(%s,%s,%s,%s);'
    cu.executemany(students_info_sql, students_info)
    print('*****************************executemany successful!*****************************')

    ##pymsql 没有executescript方法
    # with open(sql_script_path, 'r') as f:
    #     cu.executescript(f.read())
    #     print('*****************************executescript successful!*****************************')

    ##查询数据
    sql = 'SELECT * FROM student;'
    cu_query = cu.execute(sql) #返回执行条目数量,即sql操作了几行数据
    print(cu_query)

    # result = cu_query.fetchall() #注意该位置和sqlite的区别:mysql 从cu中读取记录
    # print(result)
    cu_info= cu.fetchall() #返回多条记录
    print(cu_info)

    cu.close()
    conn.commit()
    conn.close()

#升级:数据库连接池
def create_student_table_by_pool():
    pass

def main():
    create_student_table()

if __name__ == '__main__':
    main()

python代码示例,多线程+数据库连接池,建议直接读 pooled_db.py文件,里面有详细的介绍,具体使用方法大概翻译如下:

  1. 首先,您需要通过创建PooledDB实例来设置数据库连接池,并传递以下参数:
    pool = PooledDB(
    creator: DB-API 2数据库模块, 有pgdb, mincached: 初始化连接池的数量,默认为0,表示在启动时不建立连接;
    maxcached:池中最大空闲连接数量,默认值为0或None表示无限制的池大小;
    maxshared:允许的最大共享连接数,默认值为0或None表示所有连接都是专用的,当达到这个最大数量时,如果连接被请求为可共享的,那么连接将被共享;
    maxconnections: 通常允许的最大连接数,默认值0或None表示任意数量的连接; blocking: 确定超过最大值时的行为,如果设置为true,则阻塞并等待,直到连接数量减少,但默认情况下将报告一个错误;
    maxusage: 单个连接的最大重用次数,默认值为0或None表示无限重用, 当达到此连接的最大使用量时,连接将自动重置(关闭并重新打开);
    setsession:用于准备会话的SQL命令的可选列表, e.g. [“set datestyle to german”, …];
    reset:当连接返回到池时应该如何复位, False或None回滚以begin()开始的事务,为了安全起见,默认值True总是发出回滚;
    failures: 一个可选的异常类或一个异常类元组,如果默认值(OperationalError,InternalError)不够,则应该应用连接故障转移机制;
    ping: 一个可选标志,用于控制何时使用ping()方法检查连接(如果该方法可用), 0 = None = never, 1 = default =无论何时从池中获取,2 =创建游标时,4 =执行查询时,7 = always,以及这些值的所有其他位组合; )

    class PooledDB: def init(
    self, creator, mincached=0, maxcached=0,
    maxshared=0, maxconnections=0, blocking=False,
    maxusage=None, setsession=None, reset=True,
    failures=None, ping=1,
    *args, **kwargs):


  2. 数据库连接池的标准写法
    with pool.connection() as db:
    with db.cursor as cur:
    cur.execute(…)
    res = cur.fetchone()

代码示例:

# -*- coding:utf-8 -*-

import pymysql, threading, datetime
from dbutils.pooled_db import PooledDB #提供线程间可共享的数据库连接,并自动管理连接
#from dbutils.persistent_db import PooledDB #提供线程专用的数据库连接,并自动管理连接

# 数据库连接池,一个线程执行
def select_student():
    pool = PooledDB(pymysql, 5, host='localhost', user='root', passwd='111111',db='demo', port=3306, charset='utf8')
    
    with pool.connection() as db:
        with db.cursor() as cur:
            cur.execute("SELECT * FROM student;")
            res = cur.fetchone()
            print(res)

# 多进程情况下,使用连接池
def select_student_by_manythred():
    start_time = datetime.datetime.now()
    # for i in range(10): 
    #     print("start threading %s"%str(i))
    #     t = threading.Thread(target=select_student)
    #     t.start()
    #     t.join() #阻塞线程,必须等子线程完成之后在退出,顺序执行
    #     print("end threading %s"%str(i))
    threads = []
    for i in range(10):
        t = threading.Thread(target=select_student)
        threads.append(t)

    for t in threads:
        print("start threading %s"%str(t))
        t.start()

    for t in threads:
        t.join()
        print("end threading %s"%str(t))

    end_time = datetime.datetime.now()
    print(end_time-start_time)

if __name__ == '__main__':
    
    select_student_by_manythred()
    

四、python 三方库 redis

1.引入库

pymysql python三方库,通过:pip install redis。
redis是一个开源的、使用C语言编写的、支持网络交互的、可基于内存也可持久化的Key-Value数据库。
redis初次使用,涉及的内容太多,详情见另一篇博客:https://blog.csdn.net/qq_40494873/article/details/121730642

2.代码示例

python代码示例,基本用法:

# -*- coding:utf-8 -*-

#redis demo

import redis, time

#redis 连接
def redis_create():
    #class Redis; 初始化Redis
    #def __init__(self, host='localhost', port=6379,db=0, password=None, socket_timeout=None,socket_connect_timeout=None,socket_keepalive=None, socket_keepalive_options=None,connection_pool=None, unix_socket_path=None,encoding='utf-8', encoding_errors='strict',charset=None, errors=None,decode_responses=False, retry_on_timeout=False,ssl=False, ssl_keyfile=None, ssl_certfile=None,ssl_cert_reqs='required', ssl_ca_certs=None,ssl_check_hostname=False,max_connections=None, single_connection_client=False,health_check_interval=0, client_name=None, username=None,retry=None):
    # db = 0,redis中默认为默认自动创建16个数据库
    print("process***: single connection")
    r = redis.Redis(host='localhost', port=6379, db=0, password=None)
    print(r.dbsize()) #获取数据库 db=0
    print(r.keys()) #获取数据库中 数据库所有key值

def redis_create_by_pool():
    #class Connection: 连接池
    #def__init__(self,host='localhost',port=6379,db=0,password=None,socket_timeout=None,socket_connect_timeout=None,socket_keepalive=False,socket_keepalive_options=None,socket_type=0,retry_on_timeout=False,encoding='utf-8',encoding_errors='strict',decode_responses=False,parser_class=DefaultParser,socket_read_size=65536,health_check_interval=0,client_name=None,username=None,retry=None):
    # max_connections = max_connections or 2 ** 31 默认最大连接数量
    print("process***: connection pool")
    pool = redis.ConnectionPool(host='localhost', port=6379, db=0, password=None)
    r = redis.Redis(connection_pool=pool)
    print(r.dbsize())
    return r

# redis 操作 string
def redis_string_demo():
    r = redis_create_by_pool()
    ## set(name, value, ex=None, px=None, nx=False, xx=False)
    ## set 参数是互斥的,注意不要同时指定
    r.set('name', 'Jack')
    print(r.get('name'))

    r.set('age', 32, ex=3)
    print(r.get('age'))
    time.sleep(5)
    print(r.get('age'))    

    r.setnx('name', 'Lilei')
    print(r.get('name'))

    # r.mset(k1='kacy', k2='Tina'), 此种写法目前不支持,报错:TypeError: mset() got an unexpected keyword argument 'key1'
    r.mset({'k1':'V1','K2':'V2', 'k3':'V3'})
    print(r.mget('k1','k2','k3')) #mget 
    print(r.mget(['name','age','k1','k2','k3']))

    r.set('poem','I Love')
    print(r.get('poem')) #byte
    r.setrange('poem', 7, ' You') #修改字符串内容,从指定字符串索引开始向后替换
    print(r.get('poem').decode('utf-8'))

    r.set('poem_sushi','先天下之忧而优,后天下之乐而乐')
    print(r.getbit('poem_sushi', 0)) #获取值的二进制表示中的某位的值

    # r.setbin('poem_sushi', 45, 1)# 'Redis' object has no attribute 'setbin'


def main():
    # redis_create()
    # redis_create_by_pool()
    redis_string_demo()

if __name__ == '__main__':
    main()
    

四、python 三方库 pymongo

1.引入库

pymongo是三方库,使用之前使用,pip install pymongo进行安装。

具体的pymongo简单操作见另一篇博客:https://blog.csdn.net/qq_40494873/article/details/121802830?spm=1001.2014.3001.5501

2.代码示例

总结

提示:这里对文章进行总结:
例如:以上就是今天要讲的内容,本文仅仅简单介绍了pandas的使用,而pandas提供了大量能使我们快速便捷地处理数据的函数和方法。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐