摘要1:https://blog.csdn.net/weixin_33804582/article/details/92471702
摘要2:https://www.cnblogs.com/juandx/p/5442752.html
摘要3:https://www.cnblogs.com/wuheng-123/p/9719812.html

model存在外键做join连接

首先创建数据库,在这里一个user对应多个address,因此需要在address上增加user_id这个外键(一对多)。


from sqlalchemy import create_engine
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import ForeignKey
from sqlalchemy.orm import backref
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import relationship, backref
from sqlalchemy.ext.declarative import declarative_base
 
Base = declarative_base()
 
 
class User(Base):
    __tablename__ = 'users'
 
    id = Column(Integer, primary_key=True)
    name = Column(String(32))

    addresses = relationship("Address", order_by="Address.id", backref="user")
 
class Address(Base):
    __tablename__ = 'addresses'
    id = Column(Integer, primary_key=True)
    email_address = Column(String(32), nullable=False)
    user_id = Column(Integer, ForeignKey('users.id'))
 
    #user = relationship("User", backref=backref('addresses', order_by=id))
 
engine  = create_engine('mysql://root:root@localhost:3306/test', echo=True)
#Base.metadata.create_all(engine)

1.如果不使用join的话,可以直接联表查询

>>> session.query(User.name, Address.email_address).filter(User.id==Address.user_id).filter(Address.email_address=='test@test.com').all()
2015-08-19 14:02:02,877 INFO sqlalchemy.engine.base.Engine SELECT users.name AS users_name, addresses.email_address AS addresses_email_address 
FROM users, addresses 
WHERE users.id = addresses.user_id AND addresses.email_address = %s
2015-08-19 14:02:02,878 INFO sqlalchemy.engine.base.Engine ('test@test.com',)
[('jack', 'test@test.com')]

2.使用sqlalchemy中提供了Queqy.join()函数

>>> session.query(User).join(Address).filter(Address.email_address=='test@test.com').first()
2015-08-19 14:06:56,624 INFO sqlalchemy.engine.base.Engine SELECT users.id AS users_id, users.name AS users_name 
FROM users INNER JOIN addresses ON users.id = addresses.user_id 
WHERE addresses.email_address = %s 
 LIMIT %s
2015-08-19 14:06:56,624 INFO sqlalchemy.engine.base.Engine ('test@test.com', 1)
<demo.User object at 0x7f9a74139a10>

model不存在外键做join连接

1.上面的用法的前提是存在外键的情况下,如果没有外键,如何做join连接

from sqlalchemy import create_engine
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import ForeignKey
from sqlalchemy.orm import backref
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import relationship, backref
from sqlalchemy.ext.declarative import declarative_base
 
Base = declarative_base()
 
class User(Base):
    __tablename__ = 'users'
 
    id = Column(Integer, primary_key=True)
    name = Column(String(32)) 
 
class Address(Base):
    __tablename__ = 'addresses'
    id = Column(Integer, primary_key=True)
    email_address = Column(String(32), nullable=False)
    user_id = Column(Integer, ForeignKey('users.id'))
 
    #user = relationship("User", backref=backref('addresses', order_by=id))
 
engine  = create_engine('mysql://root:root@localhost:3306/test', echo=True)
#Base.metadata.create_all(engine)
# 上面model对应示例
query.join(Address, User.id==Address.user_id)


# 其他sql示例
result = session.query(User.username,func.count(Article.id)).join(Article,User.id==Article.uid).\
    group_by(User.id).order_by(func.count(Article.id).desc()).all()
print(result)#[('ketang', 2), ('zhiliao', 1)]

'''
SELECT user.username AS user_username, count(article.id) AS count_1 
FROM user INNER JOIN article ON user.id = article.uid GROUP BY user.id ORDER BY count(article.id)
'''

子查询示例

# 原生sql,子表查询
mysql> SELECT users.*, adr_count.address_count FROM users LEFT OUTER JOIN
    ->     (SELECT user_id, count(*) AS address_count
    ->         FROM addresses GROUP BY user_id) AS adr_count
    ->     ON users.id=adr_count.user_id;
+----+------+---------------+
| id | name | address_count |
+----+------+---------------+
|  1 | jack |             2 |
+----+------+---------------+
1 row in set (0.00 sec)

使用sqlalchemy子查询


# 生成子句,等同于(select user_id ... group_by user_id)
>>> sbq = session.query(Address.user_id, func.count('*').label('address_count')).group_by(Address.user_id).subquery()
 
# 联接子句,注意子句中需要使用c来调用字段内容
>>> session.query(User.name, sbq.c.address_count).outerjoin(sbq, User.id==sbq.c.user_id).all()
2015-08-19 14:42:53,425 INFO sqlalchemy.engine.base.Engine SELECT users.name AS users_name, anon_1.address_count AS anon_1_address_count
FROM users LEFT OUTER JOIN (SELECT addresses.user_id AS user_id, count(%s) AS address_count
FROM addresses GROUP BY addresses.user_id) AS anon_1 ON users.id = anon_1.user_id
2015-08-19 14:42:53,425 INFO sqlalchemy.engine.base.Engine ('*',)
[('jack', 2L)]
>>>
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐