这篇文章和之前讲的数据库插入数据,遇到重复主键则更新内容,功能是一样的。只是此处使用了python的sqlalchemy来实现。至于如何用原生态SQL实现,请参考以前的文章。

Review:数据库:插入、更新记录(insert into, ..., on duplicate key)

目前,我见到了两种基于sqlalchemy的写法。

一种是SQL风格的upsert。让sqlalchemy输出SQL,直接调用engine.execute(sql);

另一种是ORM风格的upsert。完全用sqlalchemy封装好的类和函数,如session、flush、commit来操作。

第一种,SQL风格的SqlAlchemy。

import os
from sqlalchemy import create_engine
from sqlalchemy.dialects.mysql import insert


def run():

    db_info = "mysql+pymysql://u_test:u_123@192.168.0.2:3306/db_test"
    file_path = "./model.py"
    if not os.path.exists("./model.py"):
        args = 'sqlacodegen --noviews --outfile {file_path} {db_info}'.format(db_info=db_info, file_path=file_path)
        os.system(args)
    try:
        pass
        import model
        test_engine = create_engine(db_info)
        data = {"key_1": 1, "key_2": 2222, "key_3": 3333}
        insert_stmt = insert(model.TestTable).values(**data)
        on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(**data)

        test_engine.execute(on_duplicate_key_stmt)
    except Exception as e:
        print(e)


if __name__ == '__main__':
    run()

---

更新于2021年7月29日13:23:22

理解狭隘了。不仅遇到重复主键可以upsert,唯一索引也可以upsert。

https://docs.sqlalchemy.org/en/13/dialects/mysql.html#insert-on-duplicate-key-update-upsert

第二种,ORM风格的SqlAlchemy。

model_class是orm的model类,不是实例对象。


def save_result_into_db(model_class, result_dict, key, session):
    """
    upsert 数据,重复键 为 key_dict

    :param model_class:
    :param result_dict: 需要更新的字段构成的dict,key跟字段名需要对应
    :param key: 查找记录用的key,字符串或list,且一定是result_dict的key
    :param session:
    :return:

    """

    if isinstance(key, str):
        key_list = key.replace(' ', '').split(',')
    elif isinstance(key, list):
        key_list = key
    else:
        raise ValueError('key 必须是单个字符串,或以“,”分割的字符串,或list,为查找的字段名')

    key_dict = dict()
    for k in key_list:
        if k in result_dict:
            key_dict[k] = result_dict[k]
        else:
            raise ValueError('key 必须是result中的键')

    obj = session.query(model_class).filter_by(**key_dict).first()

    # 如果没有找到,就创建一条记录,否则,就返回已经存在的记录
    if obj is None:

        obj = model_class()
        session.add(obj)  # 加入session后,提交之前,仍然可以改变其属性

    for k, v in result_dict.items():
        if pd.isna(v):
            v = None
        setattr(obj, k, v)

    session.flush()

另外,更加推荐的写法是:在外面new一个obj,成员属性恰好就对应表的字段,也不怕传一个dict进来,key拼写出错。


def upsert_obj_in_db_omk_md(obj, key, session):
    """
    upsert 数据,重复键 为 key_dict

    :param obj:
    :param key: 查找记录用的key,字符串或list,且一定是result_dict的key
    :param session:
    :return:

    """

    if isinstance(key, str):
        key_list = key.replace(' ', '').split(',')
    elif isinstance(key, list):
        key_list = key
    else:
        raise ValueError('key 必须是单个字符串,或以“,”分割的字符串,或list,为查找的字段名')

    key_dict = dict()
    for key in key_list:
        key_dict[key] = eval(f'obj.{key}')
    model_class = obj.__class__
    obj_in_db = session.query(model_class).filter_by(**key_dict).first()

    if obj_in_db is None:
        session.add(obj)  # 加入session后,提交之前,仍然可以改变其属性
    else:
        # 属性全部更新。加入session后,提交之前,仍然可以改变其属性
        for k, v in obj.__dict__.items():
            if k != '_sa_instance_state': # 也可以排除诸如初次创建时间戳的特殊属性,具体看业务 and k != 'CreatedTS':
                setattr(obj_in_db, k, v)

    session.flush()  # 把obj刷入db,未提交(涉及事务、obj的状态,如持久态、游离态等,跟Hibernate一样的…)

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐