Python 学生管理系统+mysql+Flask
Python 学生管理系统+mysql+Flask。利用web实现对学生信息进行简单地增删查改操作。
Python 学生管理系统
前言
记录国庆假期作业。
作业要求:
现需要设计一个学生管理系统,首先,需要通过键盘对学生个人信息进行录入,并将学生的个人信息存储到mysql数据库中,同时系统还需要实现查询所有入库的学生信息、根据姓名查询某个学生的信息、修改某个学生的信息(其中学号唯一创建后不得修改)以及删除某个学生的信息。其中学生的个人信息包含:学号(长度最长为20且为数字字符)、姓名(只得由中英文字符组成不得包含数字及其他字符)、性别(只能是男女)、年龄(正整数)、班级、专业。
要求:(1)要有系统菜单栏选项(2)对输入数据需要进行要有合理的提示及限定不符合逻辑的异常处理情况。(3)学生信息需要使用面向对象的编程去实现,合理设计学生对象的属性及行为(方法)。(4)管理系统中的每一个功能的操作都要与mysql数据库里的数据表同步。
在做这个作业时,我也刚好在这不久前学了些vue2,所以就用这个作业来记录一下 一个简单的前后端分离版的学生管理系统,并巩固一下vue2。
主要用到的技术有:
Python+mysql(对pymysql进行封装的)+Flask(会点)+vue2+axios+Element-ui+pyecharts(数据可视化只用了一处)。
效果展示:
项目目录
目录很简单,就四个主文件夹,main.py程序入口。
config
用于配置本地mysql数据库的连接信息。
(注意:必须要配置好本地mysql数据库的连接信息后,才能使用且格式如下!!!)
py
这个里面就是我们后端开发的python代码了,至于里面的结构层次我是学着Java SpringBoot里面来做的。
controller:控制层(web层),用来与前端传过来的参数进行操作交互的;
entity:实体类,里面放了一个student学生类;
mapper:数据层(dao层),对数据库进行数据持久化操作,这里就是要实现对学生的信息进行一些增删查改的操作来保存到mysql数据库中;
service:至于业务层,这个作业的功能太简单了,根本都用不了,而且用python的写法也用不了写这一层(感觉),所以这里就没有写;
result:R类统一后端返回给前端的数据格式;
utils:存放工具类的地方,这里面我放了一个自定义的异常类工具,和自己封装好了的mysql工具类(就是对简单地sql增删查改语句进行封装,模仿着mybatis-plus来做的,当然我这个菜鸟封装的功能完全根本比不了mybatis-plus实现的功能,只是学着写着玩,哈哈哈!)
mapper.student_mapper.py
这里面我已经写好了创建tb_student表的语句,所以你们要运行这个项目的话,只需要配置好本地mysql数据库的连接信息即可。
from py.utils.mysql_util import ConnectMysql, MySql
table_name = "tb_student"
sql = f'''
create table {table_name}(
id int NOT NULL AUTO_INCREMENT unique,
sno char(20) NOT NULL unique,
sname varchar(50),
ssex char(2),
sage int ,
sclass varchar(50),
sdept varchar (50),
PRIMARY KEY(id,sno))DEFAULT CHARSET=utf8;
'''
ConnectMysql.create_table(sql)
# 操作tb_student表,返回一个studentMapper对象用来操作tb_student表
studentMapper = MySql(operate_tablename=table_name)
static
用来存放前端静态资源的地方。这个里面有很多的css、js等与这个作业无关,至于为什么会有这么多无关的文件喃,那当然是我直接对一个后端模板(H+)进行修改后得到的,并且我又看不懂前端的内容,所以就没有删除这些无用的文件。
templates
放html页面的地方。
主要代码块
student_controller.py
这里面对部分数据(学生信息)进行了规则校验。
import re
from py.mapper.student_mapper import studentMapper
from py.entity.student_entity import Student
from py.utils.my_exception_util import MyException
from py.result import R
from flask import render_template, Blueprint, request
bp = Blueprint('student_controller', __name__, url_prefix='/')
# 默认访问(首页)
@bp.route('/studentSystem')
def student_system():
return render_template('student_management.html')
# 添加学生
@bp.route('/studentSystem/addStudent',methods=["POST"])
def add_student():
form_data = request.get_json()
try:
if len(form_data) != 6:
raise MyException("数据不能为空!")
if not re.match("^[0-9]{1,20}$", form_data['sno']):
raise MyException("学号长度最长为20且为数字字符!")
if studentMapper.select_by_sql(f"select sno from tb_student where sno = '{form_data['sno']}'"):
raise MyException("该学号已被使用!")
if not form_data['sname'].isalpha():
raise MyException("姓名只能包含中英文,不能包含数字和其他字符。")
if not form_data['ssex'] in ('男', '女'):
raise MyException("性别只能是男或女")
try:
int(form_data['sage'])
except:
raise MyException("年龄只能是正整数")
s = Student(**form_data)
studentMapper.insert((s.get_sno(),s.get_sname(),s.get_ssex(),s.get_sage(),s.get_sclass(),s.get_sdept()))
return R.success(message="添加成功")
except Exception as e:
return R.error(message=f"添加失败\n{str(e)}")
# 查询所有学生
@bp.route('/studentSystem/getAllStudent')
def get_all_student():
try:
return R.success(studentMapper.MapEntity(studentMapper.select_all()))
except Exception as e:
return R.error(message=f"查询失败\n{str(e)}")
# 条件查询,根据学生姓名
@bp.route('/studentSystem/conditionQuery/<snameQuery>')
def get_one_student_by_sname(snameQuery):
try:
sql = f"select * from tb_student where sname like '{snameQuery}%'"
return R.success(studentMapper.MapEntity(studentMapper.select_by_sql(sql)))
except Exception as e:
return R.error(message=f"查询失败\n{str(e)}")
@bp.route('/studentSystem/updateStudent/<id>')
def update_student_by_id(id):
try:
return R.success(studentMapper.MapEntity(studentMapper.select_by_id(id)))
except Exception as e:
return R.error(message=f"编辑失败\n{str(e)}")
# 修改学生通过id
@bp.route('/studentSystem/updateStudent',methods=["POST"])
def update_student_by_id2():
try:
form_data = request.get_json()
if not form_data['sname'].isalpha():
raise MyException("姓名只能包含中英文,不能包含数字和其他字符。")
if not form_data['ssex'] in ('男', '女'):
raise MyException("性别只能是男或女")
try:
int(form_data['sage'])
except:
raise MyException("年龄只能是正整数")
studentMapper.update_by_id(form_data.pop('id'), form_data)
return R.success(message="修改成功")
except Exception as e:
print(e)
return R.error(message=f"修改失败\n{str(e)}")
# 删除学生通过id
@bp.route('/studentSystem/deleteStudent/<id>')
def delete_student_by_id(id):
try:
studentMapper.delete_by_id(id)
return R.success(message="删除成功")
except Exception as e:
print(e)
return R.error(message=f"删除失败\n{str(e)}")
mysql_util.py
import pymysql
# 从配置文件中读取连接mysql的信息
def connect_mysql():
mysql_config = {}
try:
with open("./config/mysql_config.txt", mode="r", encoding="utf-8") as f:
for line in f:
if line.startswith("#"):
continue
r = line.strip().split("=")
mysql_config[r[0]] = r[1]
if r[0].lower() == "port":
mysql_config[r[0].lower()] = int(r[1])
return mysql_config
except Exception as e:
raise Exception(f"配置有文件异常!!!\n{e}")
# 必须先连接数据库
class ConnectMysql:
CONNECT_MYSQL_DB_OBJ = None
@staticmethod
def connect_mysql(my_sqldb_config_param: dict):
print("connect_mysql...")
assert isinstance(my_sqldb_config_param, dict), "请以字典类型的格式传入!"
try:
ConnectMysql.CONNECT_MYSQL_DB_OBJ = pymysql.connect(**my_sqldb_config_param) # 连接数据库,配置参数
print(f"{my_sqldb_config_param['database']}————数据库连接成功")
except Exception as e:
raise Exception(f"数据库连接失败!!!\n请检查数据库配置参数是否正确或检查本地数据库是否已启动!\n{e}")
@staticmethod
def create_table(sql):
try:
cursor = ConnectMysql.CONNECT_MYSQL_DB_OBJ.cursor()
cursor.execute(sql) # 执行sql语句
ConnectMysql.CONNECT_MYSQL_DB_OBJ.commit() # 执行sql语句后,进行提交
cursor.close()
print("表创建成功")
except Exception as e:
ConnectMysql.CONNECT_MYSQL_DB_OBJ.rollback() # 执行sql语句失败,进行回滚
print("表创建失败:",e)
ConnectMysql.connect_mysql(connect_mysql())
# 操作某一个表类
class MySql:
def __init__(self, operate_tablename:str):
self._operate_tablename = operate_tablename
try:
self._conn = ConnectMysql.CONNECT_MYSQL_DB_OBJ
self._cursor = self._conn.cursor() # 创建一个游标,用来执行查询
self._get_field() # 获取此表中的字段名
except Exception as e:
raise Exception(f"数据库连接失败!!!\n请检查表名、配置参数是否正确或检查本地数据库是否已启动!\n{e}")
# 获取_conn对象
@property
def get_connect(self):
return self._conn
# 获取_cursor对象
@property
def get_cursor(self):
return self._cursor
# 获取__desc对象
@property
def get_description(self):
# print(f"{self._operate_tablename}表中的字段属性:",self._desc)
return self._desc
# 获取正在操作的表名
@property
def operate_tablename(self):
return f"正在操作 {self._operate_tablename}表!!!"
# 修改要操作的表
@operate_tablename.setter
def operate_tablename(self,operate_tablename):
assert operate_tablename !="", "请输入要操作的表名!"
print(f"{self._operate_tablename} 表已被更换!")
self._operate_tablename = operate_tablename
self._get_field()
# 获取此表中的字段名
def _get_field(self):
self._cursor.execute(f"select * from {self._operate_tablename}")
self._desc = self._cursor.description
self._field_ = []
for field in self._desc:
self._field_.append(field[0])
# 执行sql语句
def _sql(self,sql,msg=""):
try:
self._cursor.execute(sql) # 执行sql语句
self._conn.commit() # 执行sql语句后,进行提交
if msg:print(f"数据{msg}成功!")
return True
except Exception as e:
if msg:print(f"\033[31m数据{msg}失败!!!\n{e} \033[0m")
self._conn.rollback() # 执行sql语句失败,进行回滚
return False
# 插入数据
def insert(self, *value):
if not isinstance(value[0],tuple): raise Exception("要求传入的参数类型为tuple元组!!!")
if len(value) == 1: value=value[0]
else:value = str(value)[1:-1]
sql = f"insert into {self._operate_tablename}({','.join(self._field_[1:])}) values {value}"
if not self._sql(sql,f"{value}插入"):
print("\n\033[31m:请检查每一条记录字段是否正确!!!\033[0m\n")
# 插入:自定义sql语句插入数据
def insert_by_sql(self, sql):
self._sql(sql,"插入")
# 删除:通过id删除该条数据
def delete_by_id(self,id_:int):
sql = f"delete from {self._operate_tablename} where id = {id_}"
if self._sql(sql):print(f"id={id_}记录,删除成功!")
else:print(f"\n\033[31m:id = {id_}记录,删除失败!!!\033[0m\n")
# 删除:自定义sql语句删除数据
def delete_by_sql(self, sql):
self._sql(sql,"删除")
# 修改:通过id修改数据
def update_by_id(self, id_:int, set_field:dict):
assert isinstance(set_field,dict),"请以字典类型的格式传入!"
tempset_field = []
for i in set_field:
tempset_field.append(f"{i}='{set_field[i]}'")
set_field = ",".join(tempset_field)
sql = f"update {self._operate_tablename} set {set_field} where id = {id_}"
if self._sql(sql):print(f"id={id_}记录,{set_field}修改成功!")
else:print(f"\n\033[31m:id = {id_}记录,{set_field}修改失败!!!\033[0m\n")
# 修改:自定义sql语句修改数据
def update_by_sql(self, sql):
self._sql(sql,"修改")
# 查询:通过id查询一条数据
def select_by_id(self,id_:int,field="*"):
if field != "*": field = ','.join(field)
sql = f"select {field} from {self._operate_tablename} where id={id_}"
self._cursor.execute(sql)
return self._cursor.fetchone()
# 查询:指定查询多少条数数据,可根据简单条件查询(where 字段=”“)
def select_many(self,num:int,query_builder=None,field="*"):
if field != "*": field = ','.join(field)
sql = f"select {field} from {self._operate_tablename}"
if query_builder:
if isinstance(query_builder,dict) and len(query_builder) == 1:
query_builder = list(query_builder.items())[0]
sql = f"select {field} from {self._operate_tablename} where {query_builder[0]}='{query_builder[1]}'"
else: raise Exception("要求输入的条件为dict(字典)类型并且只能有一对键值(:len(dict)=1)!!!")
self._cursor.execute(sql)
return self._cursor.fetchmany(num)
# 查询:所有数据
def select_all(self, field="*"):
if field != "*": field = ','.join(field)
sql = f"select {field} from {self._operate_tablename}"
self._cursor.execute(sql)
return self._cursor.fetchall()
# 查询:自定义sql语句查询数据
def select_by_sql(self, sql):
try:
self._cursor.execute(sql)
return self._cursor.fetchall()
except Exception as e:
print(f"\033[31m:数据查询失败!!!\n{e} \033[0m")
# 实体类映射,返回的记录带有该表中的字段(以字典的形式)
def MapEntity(self, db_return_result: tuple):
assert isinstance(db_return_result, tuple), "请以tuple元组类型的格式传入!"
try:
if not isinstance(db_return_result[0],tuple):
return dict(zip(self._field_, db_return_result))
return [dict(zip(self._field_, i)) for i in db_return_result]
except Exception as e:
print("可将从实例化数据库对象调用的方法直接传入这里!!!")
raise Exception(f"可将从实例化数据库对象调用的方法直接传入这里!!!\n {e}")
# 当对象被销毁时,游标关闭
def __del__(self):
print(f"{self._operate_tablename} >>> _cursor close...")
self._cursor.close()
资源下载
更多推荐
所有评论(0)