Python 之SQLAlchemy使用详细说明
测试有道 2024-10-07 13:07:01 阅读 92
目录
1、SQLAlchemy
1.1、ORM概述
1.2、SQLAlchemy概述
1.3、SQLAlchemy的组成部分
1.4、SQLAlchemy的使用
1.4.1、安装
1.4.2、创建数据库连接
1.4.3、执行原生SQL语句
1.4.4、映射已存在的表
1.4.5、创建表
1.4.5.1、创建表的两种方式
1、使用 Table 类直接创建表
2、使用声明式方式创建模型类
1.4.5.2、约束
1、创建约束示列
2、外键约束
3、删除/更新行为
1.4.5.3、多表关系
2、一对一关系
3、一对多/多对一关系
4、多对多关系
1.4.5.4、scoped_session实现线程安全
1.4.5.5、新增数据
1.4.5.6、修改数据
1.4.5.7、删除数据
1.4.5.8、查询数据
1、测试数据准备
2、基础查询
2.1、查询多个字段(查询指定字段)
2.2、去除重复记录
2.3、调试小技巧
3、条件查询
3.1、常用的比较运算符
3.2、常见的逻辑运算符
3.3、综合示例
4、聚合函数
4.1、常见的聚合函数
4.2、综合示列
5、分组查询
5.1、综合示例
6、排序查询
6.1、排序方式
7、 分页查询
7.1、综合示例
1.4.5.9、多表查询
1、多表查询概述
1.1、测试数据准备
1.2、概述
1.3、多表查询的分类
2、内连接
3、外连接
4、自连接查询
5、联合查询
6、子查询
6.1、概述
6.2、标量子查询
6.3、列子查询
6.4、行子查询
6.5、表子查询
1、SQLAlchemy
1.1、ORM概述
定义:ORM(Object-Relational Mapping)模型,即对象关系映射,是一种程序设计技术,用于实现面向对象编程语言里不同类型系统的数据之间的转换。在面向对象的编程语言中,如Java、Python等,数据通常被组织成复杂的对象关系。简单来说就是ORM模型把面向对象编程与操作数据库之间建立了映射。设置开发者操作数据库无需维护和编写SQL语句,而是基于面对对象的方式操作数据库。
映射关系:数据库中的表>编程语言中的类,表中的字段>类中的属性,表之间的关系>类之间的关系。
使用ORM模型的优势在于:
提高开发效率:开发者可以使用面向对象的方式来操作数据库,无需编写大量的SQL语句。易于维护:由于ORM模型提供了清晰的映射规则,使得代码更加清晰、易于理解和维护。跨数据库平台:ORM框架通常支持多种数据库平台,使得应用程序可以轻松地在不同的数据库之间迁移。
当然ORM也存在一下劣势:
性能开销:ORM框架在将对象的操作转换为SQL语句并执行这些语句时,可能会产生一定的性能开销。复杂查询的复杂性:在处理复杂的查询时,ORM的语法可能会变得复杂且难以维护。
1.2、SQLAlchemy概述
在Python语言中实现ORM系统的就是SQLAlchemy,它具备以下特点:
对象关系映射(ORM):SQLAlchemy 允许开发者将数据库表映射为 Python 类,将表的行映射为 Python 对象,从而简化了数据库操作。开发者可以像操作 Python 对象一样来操作数据库记录,无需编写大量的 SQL 语句。动态 SQL 生成:SQLAlchemy 提供了表达式语言,允许开发者在运行时动态地构建 SQL 语句。这使得开发者能够灵活地根据应用程序的需求来生成和执行 SQL 语句。支持多种数据库:SQLAlchemy 支持多种关系型数据库,如 MySQL、PostgreSQL、SQLite、Oracle 等。开发者可以轻松地切换数据库后端,而无需修改太多代码。连接池和事务管理:SQLAlchemy 提供了连接池和事务管理的功能,以确保数据库连接的稳定性和事务的原子性。这有助于开发者编写高效、可靠的数据库应用程序。关系映射:SQLAlchemy 支持各种关系数据库中的关系类型,如一对一、一对多、多对多等,并提供了相应的 API 来处理这些关系。
然而,ORM模型也存在一些缺点:
性能开销:ORM框架在将对象的操作转换为SQL语句并执行这些语句时,可能会产生一定的性能开销。复杂查询的复杂性:在处理复杂的查询时,ORM的语法可能会变得复杂且难以维护。
注意:在处理复杂的SQL查询时,由于ORM框架效率低下,所以这个时候可以编写SQL语句执行原生SQL语句。
1.3、SQLAlchemy的组成部分
1、核心架构(Core):
引擎(Engine):负责与数据库的通信,管理连接池和事务。它是SQLAlchemy与数据库交互的入口点。连接(Connection):代表与数据库的单个连接会话。它是执行SQL语句的直接通道。会话(Session):在ORM中使用,代表与数据库的持久化会话。它用于管理对象的持久化,包括添加、修改、删除和查询对象。元数据(Metadata):用于定义和存储关于数据库结构的信息,如表和列的定义。表(Table):表示数据库中的一个表。列(Column):表示表中的一个列。类型(Types):用于定义列的数据类型。表达式构造器(Expression Language):用于构建SQL表达式,如select(), insert(), update(), delete()等。
2、ORM架构:
声明基类(Declarative Base):用于定义ORM模型。模型(Model):表示数据库中的一个表,由Python类定义。属性(Attributes):表示模型的属性,与数据库表的列相对应。关系(Relationships):表示模型之间的关系,如一对多、多对多等。
4、数据库连接池(Connection Pooling):管理数据库连接的池化,确保高效的数据库连接复用。
5、Dialect:选择连接数据库的DB API种类,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作。
6、架构和类型(Schema/Types):定义数据库的架构和数据类型。
7、SQL表达式语言(SQL Expression Language):
选择(SELECT):使用select()构造器来构建查询语句。该语言允许用户以非常灵活和直观的方式构造SQL语句,而无需直接编写SQL字符串。
1.4、SQLAlchemy的使用
SQLAlchemy官方文档:Dialects — SQLAlchemy 2.0 Documentation
1.4.1、安装
pip install sqlalchemy
1.4.2、创建数据库连接
注意:sqlalchemy没有提供直接连接数据库的操作,所以需要借助第三方库来连接数据库,操作数据库。以 MySQL 为例,sqlalchemy就是借助pymsql库来实现对数据的连接和操作。
连接不同/相同的数据库借助不同的第三方库如下:
MySQL-Python
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
pymysql
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
MySQL-Connector
mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
cx_Oracle
oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
创建连接:
<code>from sqlalchemy import create_engine
from urllib import parse
user = "root" # 用户名
password = "" # 密码
pwd = parse.quote_plus(password) # 解决密码中含@符导致报错
host = "172.22.70.174" # 数据库主机地址
# 第一步: 创建engine
engine = create_engine(
url=f"mysql+pymysql://{user}:{pwd}@{host}:3306/test?charset=utf8",
max_overflow=10, # 超过连接池大小外最多创建的连接
pool_size=10, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 对线程池中的线程进行一次连接的回收的时间,如果是3600,表示1个小时后对连接进行回收
)
1.4.3、执行原生SQL语句
需求:查询表t_student的全部数据,执行的SQL语句是:select * from t_student
from sqlalchemy import create_engine
from urllib import parse
import threading
user = "root" # 用户名
password = "" # 密码
pwd = parse.quote_plus(password) # 解决密码中含@符导致报错
host = "172.22.70.174" # 数据库主机地址
# 第一步: 创建engine
engine = create_engine(
url=f"mysql+pymysql://{user}:{pwd}@{host}:3306/test?charset=utf8",
max_overflow=2, # 超过连接池大小外最多创建的连接
pool_size=3, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 对线程池中的线程进行一次连接的回收的时间,如果是3600,表示1个小时后对连接进行回收
)
# 第二步:使用
def test_execute():
# conn = engine.connect() # 创建一个新的连接
conn = engine.raw_connection() # 从连接池中取一个连接
cursor = conn.cursor() # 创建游标
sql = "select * from t_student" # 定义执行的SQL语句
cursor.execute(sql) # 执行SQL语句
print(cursor.fetchall()) # 获取执行的结果并打印置控制台
# 测试配置是否生效
if __name__ == '__main__':
for i in range(20):
t = threading.Thread(target=test_execute)
t.start()
1.4.4、映射已存在的表
说明:使用ORM映射已存在的表时,只能映射其对应的字段,对于每个字段的约束最好和原表保持一致,映射已存在的表时不能新增字段,新增外键约束,新增索引操作。如果需要进行这些操作可以数据库中执行相关sql语句或者使用SQLAlchemy的迁移工具(如Alembic)
注意:
在某些情况下,即使SQLAlchemy模型中的约束与原始数据库表不完全一致,应用程序可能仍然能够正常运行。但是,这可能会导致数据完整性问题或难以预测的行为。因此,最佳实践是尽可能保持SQLAlchemy模型中的约束与原始数据库表一致。
示例:
需求:创建Student表模型映射数据库中的t_student表
import datetime
from sqlalchemy.orm import declarative_base , sessionmaker
from sqlalchemyBaseUse import engine # 这里的engine就是上面创建连接中创建的engine
from sqlalchemy import Column, Integer, String, Text, DateTime, Index
# 声明ORM基类
Base = declarative_base()
class Student(Base):
__tablename__ = 't_student'
id = Column(Integer, primary_key=True)
stuno = Column(String(10), comment="学号")code>
name = Column(String(10), comment="姓名")code>
gender = Column(String(1), comment="性别")code>
age = Column(Integer, comment="年龄")code>
idcard = Column(String(18), comment="身份证")code>
entrydate = Column(DateTime, default=datetime.datetime.now, comment="入学时间")code>
addr = Column(String(50), comment="家庭地址")code>
def init_db():
# 创建继承base类的表的映射关系
Base.metadata.create_all(engine)
def drop_db():
# 删除继承base类的表,注意:除了删除表的映射关系,数据库中的表和数据都会被删减,生产中谨慎操作
Base.metadata.drop_all(engine)
if __name__ == '__main__':
init_db()
1.4.5、创建表
1.4.5.1、创建表的两种方式
1、使用 Table 类直接创建表
编程风格:这是一种更接近于 SQL 风格的方法,因为它直接定义了表的列和约束,没有额外的类定义。用法:你需要显式地创建 Table 对象,并指定列和约束。这种方法在不需要 ORM 功能,只需要直接操作数据库表的场景中特别有用。灵活性:由于这种方法没有与 ORM 类绑定,因此它更加灵活,可以更容易地用于更复杂的数据库操作或与其他数据库工具集成。
示例:
from sqlalchemy import create_engine, Column, Integer, String, MetaData, Table, CheckConstraint
# 假设已经有了一个引擎和元数据
engine = create_engine('sqlite:///:memory:')
metadata = MetaData()
# 创建一个表,并添加一个检查约束
my_table = Table('my_table', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(50)),
Column('age', Integer),
CheckConstraint("age >= 0 AND age <= 150", name='age_check') code>
)
# 创建表(包括检查约束)
metadata.create_all(engine)
2、使用声明式方式创建模型类
编程风格:这是一种面向对象的方法,通过定义类来定义表结构。这种方式更加符合 Python 的编程习惯,并且与 ORM 紧密集成。用法:你需要继承一个由 declarative_base() 创建的基类,并在子类中定义字段和关系。字段通常使用 Column 类定义,而关系则使用 relationship() 函数定义。SQLAlchemy 的 ORM 层会将这些类定义转换为实际的数据库表。灵活性:虽然这种方法不如直接使用 Table 类那么灵活,但它提供了更高级别的抽象和更强大的 ORM 功能。通过 ORM,你可以使用 Python 对象来操作数据库记录,而无需编写繁琐的 SQL 语句。
示例:
在 SQLAlchemy 的声明式(Declarative)映射中,__table_args__ 是一个类变量,用于指定表级别的参数和选项,这些参数和选项在创建表时会被应用到数据库表上。__table_args__ 通常是一个元组,其中包含多个 SQLAlchemy 提供的表级构造器,如 UniqueConstraint、ForeignKeyConstraint、Index、CheckConstraint 等。
from sqlalchemy import Column, Integer, String, CheckConstraint
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class MyModel(Base):
__tablename__ = 'my_table'
id = Column(Integer, primary_key=True)
name = Column(String(50))
age = Column(Integer)
__table_args__ = (
CheckConstraint("age >= 0 AND age <= 150", name='age_check'), code>
)
1.4.5.2、约束
概念:约束是作用于表中字段上的规则,用于限制存储在表中的数据。
目的:保证数据库中数据的正确、有效性和完整性。
分类:
约束
| 描述
| 关键字
|
非空约束
| 限制该字段的数据不能为null
| NOT NULL
|
唯一约束
| 保证该字段的所有数据都是唯一、不重复的
| UNIQUE
|
主键约束
| 主键是一行数据的唯一标识,要求非空且唯一
| PRIMARY KEY
|
默认约束
| 保存数据时,如果未指定该字段的值,则采用默认值
| DEFAULT
|
检查约束(8.0.16版本之后)
| 保证字段值满足某一个条件
| CHECK
|
外键约束
| 用来让两张表的数据之间建立连接,保证数据的一致性和完整性
| FOREIGN KEY
|
注意:约束是作用于表中字段上的,可以在创建表/修改表的时候添加约束。
1、创建约束示列
需求如下:
要求创建一张名为t_user的数据表,各个字段的规则如下:
字段名
| 字段含义
| 字段类型
| 约束条件
| 约束关键字
|
id
| ID唯一标识
| int
| 主键,并且自动增长
| PRIMARY KEY,AUTO_INCREMENT
|
name
| 姓名
| varchar(10)
| 不为空,并且唯一
| NOT NULL , UNIQUE
|
age
| 年龄
| int
| 大于0,并且小于等于150
| CHECK
|
status
| 状态
| char(1)
| 如果没有指定该值,默认为1
| DEFAULT
|
gender
| 性别
| char(1)
| 无
|
from sqlalchemy.orm import declarative_base , sessionmaker
from sqlalchemyBaseUse import engine
from sqlalchemy import Column, Integer, String, CheckConstraint
# 声明ORM基类
Base = declarative_base()
class UserModel(Base):
__tablename__ = "t_user"
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(10), nullable=True, unique=True, comment="姓名")code>
age = Column(Integer, comment="年龄")code>
status = Column(String(1), default=1, comment="状态")code>
gender = Column(String(1), comment="性别")code>
# 添加age的检查约束
# 注意__table_args__是元祖数据类型,如果只有一个数据的时候,注意后面的逗号不能少
__table_args__ = (
CheckConstraint("age >0 AND age <= 150", name="age_check"),code>
)
def init_db():
# 创建继承base类的表的映射关系
Base.metadata.create_all(engine)
if __name__ == '__main__':
init_db()
2、外键约束
作用:外键用来让两张表的数据之间建立连接,从而保证数据的一致性和完整性。
mysql 语法:
1、添加外键
CREATE TABLE 表名(
字段名 数据类型,
...
[CONSTRAINT] [外键名称] FOREIGN KEY (外键字段名) REFERENCES 主表 (主表列名)
);
或
ALTER TABLE 表名 ADD CONSTRAINT 外键名称 FOREIGN KEY (外键字段名)REFERENCES 主表 (主表列名) ;
2、删除外键
ALTER TABLE 表名 DROP FOREIGN KEY 外键名称;
在 SQLAlchemy 中,创建外键约束通常不需要直接使用 __table_args__,因为 SQLAlchemy 提供了 ForeignKey 和 relationship 这两个工具来定义关系和外键约束。
示例:
需求:创建一张部门表和员工表,建立外键约束关系一个员工对应一个部门。
部门表 t_departments
字段名
| 字段含义
| 字段类型
| 约束条件
| 约束关键字
|
id
| ID唯一标识
| int
| 主键,并且自动增长
| PRIMARY KEY,AUTO_INCREMENT
|
name
| 部门名称
| varchar(50)
| 非空约束
| NOT NULL
|
员工表 t_employees
字段名
| 字段含义
| 字段类型
| 约束条件
| 约束关键字
|
id
| ID唯一标识
| int
| 主键,并且自动增长
| PRIMARY KEY,AUTO_INCREMENT
|
name
| 姓名
| varchar(50)
| 非空约束
| NOT NULL
|
age
| 年龄
| int
| ||
dept_id
| 部门id
| int
| 外键约束
| FOREIGN KEY
|
1、创建ORM表模型如下:
from sqlalchemy.orm import declarative_base, relationship
from sqlalchemyBaseUse import engine
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index, CheckConstraint
# 声明ORM基类
Base = declarative_base()
# 创建部门表
class Department(Base):
__tablename__ = 't_departments'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(50), nullable=False, comment="部门名称")code>
# 定义与 Employee 的一对多关系
employees = relationship("Employee", back_populates="department")code>
# 创建员工表
class Employee(Base):
__tablename__ = 't_employees'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(50), nullable=False, comment="姓名")code>
age = Column(Integer, comment="年龄")code>
department_id = Column(Integer, ForeignKey('t_departments.id', ondelete="CASCADE"), nullable=False)code>
# 定义与 Department 的多对一关系
department = relationship("Department", back_populates="employees")code>
def init_db():
# 创建继承base类的表的映射关系
Base.metadata.create_all(engine)
if __name__ == '__main__':
init_db()
2、往数据库中插入测试数据如下:
# 部门表测试数据
INSERT INTO t_departments (id, name) VALUES (1, '研发部'), (2, '市场部'),(3, '财务部');
# 员工表测试数据
INSERT INTO t_employees (id, name, age, department_id)
VALUES (1, '张三', 22, 1), (2, '李四', 33, 1), (3, 'TOM', 30, 2), (4, '小艺', 25, 2), (5, '小李', 36, 3);
3、展示通过关联关系查询,如:查询员工姓名为张三对应的部门信息
def query():
Session = sessionmaker(bind=engine)
session = Session()
# 查询姓名为张三的员工
employee = session.query(Employee).filter_by(name="张三").first()code>
# 如果找到了员工,则获取其部门信息
if employee:
department = employee.department
print(f"员工姓名: {employee.name}, 部门名称: {department.name}")
else:
print("没有找到姓名为'张三'的员工")
if __name__ == '__main__':
# init_db()
query()
4、展示通过关联关系查询,如:查询研发部所有员工信息
<code>def query2():
Session = sessionmaker(bind=engine)
session = Session()
# 直接通过部门对象获取其员工
department = session.query(Department).filter_by(name='研发部').first()code>
if department:
for employee in department.employees: # 使用了Department 模型中有一个名为 'employees' 的反向关联
print(f"员工姓名: {employee.name}, 员工ID: {employee.id}")
if __name__ == '__main__':
# init_db()
query2()
还是上面4的示例,如果没有使用反向关联,在正常情况下我们应该如何查询呢?
思考:如果正常在数据库中查询时,利用的外键关系进行查询,可以使用内连接查询,对应的SQL语句如下:
select e.*, d.name from t_departments d inner join t_employees e on e.department_id=d.id where d.name='研发部';
那么把SQL语句对应到ORM中就是(使用子查询):
1、先在t_departments表中把研发部对应的部门id查询出来2、然后在t_employees表中把上面查询出来的部门id对应的员工信息查询出来 实现代码如下:
<code>def query3():
Session = sessionmaker(bind=engine)
session = Session()
# 1、先查询研发部信息
department = session.query(Department).filter_by(name='研发部').first()code>
# 如果没有找到研发部,则退出查询
if not department:
print("没有找到研发部")
session.close()
exit()
# 2、根据研发部的部门id查询所有员工
employees = session.query(Employee).filter_by(department_id=department.id).all()
# 打印研发部所有员工的信息
for employee in employees:
print(f"员工姓名: {employee.name}, 员工ID: {employee.id}")
if __name__ == '__main__':
# init_db()
query3()
综上:使用反向关联可以直接查询所需要的数据,效率更高,代码更加简洁。
5、relationship的使用说明
可以看到上面两张表中分别创建了一个反向关联如下:
t_departments: employees = relationship("Employee", back_populates="department")
t_employees :department = relationship("Department", back_populates="employees")
参数说明:
relationship的第一个参数:对应关联的模型类的类名(字符串形式),如在t_departments表中建立一个与t_employees的关联,而t_employees表对应的数据模型类为:Employee,所以第一个参数就是Employeerelationship的第二个参数:是一个可选参数,反向引用另一个数据模型类里的属性,引用的属性必须在关联的数据模型类中显示存在。如: employees = relationship("Employee", back_populates="department") 这意味着在 Employee 类中,必须要有一个名为 department 的属性,该属性可以自动访问到关联的 Department 对象。
3、删除/更新行为
添加了外键之后,再删除父表数据时产生的约束行为,就称为删除/更新行为。具体的删除/更新行为有以下几种:
行为
| 说明
|
NO ACTION
| 当在父表中删除/更新对应记录时,首先检查该记录是否有对应外键,如果有则不允许删除/更新。 (与 RESTRICT 一致) 默认行为
|
RESTRICT
| 当在父表中删除/更新对应记录时,首先检查该记录是否有对应外键,如果有则不允许删除/更新。 (与 NO ACTION 一致) 默认行为
|
CASCADE
| 当在父表中删除/更新对应记录时,首先检查该记录是否有对应外键,如果有,则同时删除/更新外键在子表中的记录。
|
SET NULL
| 当在父表中删除对应记录时,首先检查该记录是否有对应外键,如果有则设置子表中该外键值为null(这就要求该外键允许取null)。
|
SET DEFAULT
| 父表有变更时,子表将外键列设置成一个默认的值 (Innodb不支持)
|
对应SQL语法:
ALTER TABLE 表名 ADD CONSTRAINT 外键名称 FOREIGN KEY (外键字段) REFERENCES 主表名 (主表字段名) ON UPDATE CASCADE ON DELETE CASCADE;
示例:
在上面的t_employees表中我们创建了一个外键,并指定了外键的删除行为为CASCADE,也就是如果父表t_departments发生了删除,那么对应子表t_employees中外键关联的数据也会被删除。
<code>department_id = Column(Integer, ForeignKey('t_departments.id', ondelete="CASCADE"), nullable=False)code>
注意事项:
在本例中,如果ondelete="SET NULL"或onupdate="SET NULL" 会出现建立外键失败的错误,因为在t_employees表中的外键字段department_id设置了非空约束(nullable=False),这就和删除更新行为“SET NULL”互相冲突,所以会出现创建外键失败。
1.4.5.3、多表关系
1、relationship 的作用与使用
relationship 是 SQLAlchemy 库中的一个重要功能,它用于在模型(或称为“表”)之间建立关联关系。以下是对 relationship 的详细说明,包括其作用和使用方式:
作用:
定义模型之间的关系:relationship 允许你定义模型之间的一对一、一对多、多对一和多对多关系。这种关系映射了数据库中表之间的关系,使得你可以以面向对象的方式处理数据。自动关联查询:通过 relationship 定义的关联关系,SQLAlchemy 可以在查询时自动进行关联查询,从而简化了开发过程。反向引用:使用 backref 参数,你可以在一个关系上定义反向引用,这使得从一个对象可以轻松地访问与之相关的另一个对象。
使用方式:
1、引入必要的模块:
<code>
from sqlalchemy.orm import relationship
2、定义模型:
在模型中,使用 Column 定义字段,使用 ForeignKey 定义外键,外键也可以不用定义使用 relationship 定义与其他模型的关系。3、建立关联关系:
一对一关系:通过在从表模型中增加字段和 relationship 对象,并使用 uselist=False 参数指定。一对多关系:在“多”的表中设置外键,并使用 relationship 函数指向“一”的表。多对多关系:创建一个包含两个外键的“中间表”,并在两个需要做多对多关系的模型中分别使用 relationship 函数,并通过 secondary 参数指定中间表。4、参数设置:
backref:定义反向引用,使得从另一个模型可以方便地访问当前模型。uselist:对于一对一关系,设置为 False 表示关联的对象不是列表,而是单个对象。cascade:定义级联操作,如 "all, delete-orphan" 表示当父对象被删除时,所有相关的子对象也将被删除。lazy:是用于控制关系加载方式的。这个参数决定了当从数据库中查询一个对象时,相关的对象是如何被加载的。lazy 参数可以接受几个不同的值,每个值都对应着不同的加载策略:select (默认值): 使用单独的 SELECT 语句来加载相关对象。这是默认的加载策略,因为它简单且直观。但是,如果你知道你会经常访问相关的对象,并且想要减少数据库的查询次数,那么使用其他的加载策略可能会更有效。order_by:指定关联对象的排序方式。joined: 使用 JOIN 来加载主对象和相关对象。这通常会导致更大的查询,但可以减少查询的数量,特别是当你需要访问相关的对象时。dynamic: 这会返回一个可以发出额外查询的查询对象。这意味着你可以根据需要来动态地加载相关的对象,而不是立即加载它们。这允许你执行更复杂的查询或操作相关的对象集合。noload: 不加载相关的对象。这可以用于在你知道不需要相关的对象时节省数据库查询。immediate: 这会立即加载关系,而不是在首次访问相关对象时。然而,这种策略在 SQLAlchemy 的当前版本中并不常用,因为它可能会导致意外的行为。subquery: 使用子查询来加载相关对象。这种策略在某些情况下可能比默认的 select 策略更有效,因为它可以在一个查询中加载多个相关的对象。5、查询:
通过 relationship 定义的关联关系,你可以使用 SQLAlchemy 的查询 API 进行复杂的关联查询。
2、一对一关系
实现:在任意一方添加外键,关联另一方的主键,并设置外键为唯一的(UNIQUE),一般用作单表的拆分【注:这里的实现是指在MySQL中的实现方式,但是在ORM中实现思路是一样的】
示列:用户 与 用户详情的关系
关系:一对一关系,用于单表拆分,将一张表的基础字段放在一张表中,其他详情字段放在另一张表中,用来提升操作效率。
1、方式一:使用back_populates参数实现反向关联
# 声明ORM基类
Base = declarative_base()
class User(Base):
__tablename__ = 't_user'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(50), nullable=False, comment="姓名")code>
# 定义一个关系来访问UserDetail对象
detail = relationship("UserDetail", uselist=False, back_populates="user")code>
class UserDetail(Base):
__tablename__ = 't_user_details'
id = Column(Integer, primary_key=True, autoincrement=True)
address = Column(String(100), comment="地址")code>
phone = Column(String(20), comment="电话")code>
# 外键列,引用t_user表的id
user_id = Column(Integer, ForeignKey('t_user.id'), nullable=False, unique=True)
# 定义一个关系来回引用User对象
user = relationship("User", back_populates="detail")code>
2、方式二:使用backref参数实现反向关联
说明:backref 参数是 relationship() 函数的一个非常有用的功能,它允许我们自动创建反向关系,而无需在另一个模型类中显式定义它。使用 backref 可以简化代码并减少冗余。
在这个例子中,User 类中的 detail 关系使用了 backref="user" 参数。这会在 UserDetail 类的实例上自动创建一个名为 user 的属性,该属性是一个指向与之关联的 User 对象的引用。因此,我们不再需要在 UserDetail 类中显式定义 user 关系。
现在,如果你有一个 UserDetail 对象 ud,你可以通过 ud.user 访问与之关联的 User 对象。同样地,如果你有一个 User 对象 u,你可以通过 u.detail 访问与之关联的 UserDetail 对象(假设它存在)。
<code>class User(Base):
__tablename__ = 't_user'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(50), nullable=False, comment="姓名")code>
# 使用 backref 自动创建反向关系
detail = relationship("UserDetail", uselist=False, backref="user")code>
class UserDetail(Base):
__tablename__ = 't_user_details'
id = Column(Integer, primary_key=True, autoincrement=True)
address = Column(String(100), comment="地址")code>
phone = Column(String(20), comment="电话")code>
# 外键列,引用t_user表的id
user_id = Column(Integer, ForeignKey('t_user.id'), nullable=False, unique=True)
# 注意:这里我们不再需要显式定义 user 关系,因为 backref 已经为我们做了
# 定义一个关系来回引用User对象
# user = relationship("User", back_populates="detail")code>
3、backref与back_populates的区别
backref
backref 是一个方便的工具,用于在 relationship() 中自动创建反向的 relationship()。它通常用于一对多或多对一关系,其中反向关系是一个简单的属性。当你使用 backref 时,SQLAlchemy 会为你自动创建一个反向的 relationship(),并附加到目标模型类上。你不需要在目标模型类中显式定义这个关系。
back_populates
back_populates 是一个更底层的参数,用于在定义双向关系时手动指定反向的 relationship() 属性的名称。当你需要在两个模型类中都明确控制关系的各个方面时,或者当你正在处理更复杂的关系(如多对多)时,back_populates 会很有用。使用 back_populates 时,你需要在两个模型类中都显式定义 relationship(),并使用 back_populates 参数来指示哪个属性是反向关系。
3、一对多/多对一关系
实现:在多的一方建立外键,指向一的一方的主键
示例:部门表和员工表关系
关系:一个员工对应一个部门,一个部门对应多个员工
class Department(Base):
__tablename__ = 't_departments'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(50), nullable=False, comment="部门名称")code>
# 定义与 Employee 的一对多关系
employees = relationship("Employee", back_populates="department")code>
class Employee(Base):
__tablename__ = 't_employees'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(50), nullable=False, comment="姓名")code>
age = Column(Integer, comment="年龄")code>
department_id = Column(Integer, ForeignKey('t_departments.id', ondelete="CASCADE"), nullable=False)code>
# 定义与 Department 的多对一关系
department = relationship("Department", back_populates="employees")code>
4、多对多关系
实现:建立第三张中间表,中间表至少包含两个外键,分别关联两方的主键
示列:学生 与 课程的关系
关系:一个学生可以选修多门课程,一门课程也可让多个学生选择
学生表 t_student
字段名
| 字段含义
| 字段类型
| 约束条件
| 约束关键字
|
id
| ID唯一标识
| int
| 主键,并且自动增长
| PRIMARY KEY,AUTO_INCREMENT
|
name
| 姓名
| varchar(10)
| 非空约束
| NOT NULL
|
stuno
| 学号
| varchar(10)
| 非空约束
| NOT NULL
|
课程表 t_course
字段名
| 字段含义
| 字段类型
| 约束条件
| 约束关键字
|
id
| ID唯一标识
| int
| 主键,并且自动增长
| PRIMARY KEY,AUTO_INCREMENT
|
name
| 课程名称
| varchar(10)
| 非空约束
| NOT NULL
|
学生课程关系表 t_student_course
字段名
| 字段含义
| 字段类型
| 约束条件
| 约束关键字
|
id
| ID唯一标识
| int
| 主键,并且自动增长
| PRIMARY KEY,AUTO_INCREMENT
|
studentid
| 学生ID
| int
| 非空约束,外键约束
| NOT NULL, FOREIGN KEY
|
courseid
| 课程ID
| int
| 非空约束,外键约束
| NOT NULL, FOREIGN KEY
|
方式一:关联表不直接映射到ORM类,使用back_populates参数实现反向关联
在多对多关联中,通常不需要直接对关联表进行ORM映射(注意:在ORM映射中实体表不一定需要创建外键约束),因为relationship()函数已经足够处理这种关系。
from sqlalchemy.orm import declarative_base, sessionmaker, relationship
from sqlalchemyBaseUse import engine
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, Table
# 声明ORM基类
Base = declarative_base()
# 关联表(不直接映射到ORM类)
student_course = Table('t_student_course', Base.metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column('studentid', Integer, ForeignKey('t_student.id'), nullable=False),
Column('courseid', Integer, ForeignKey('t_course.id'), nullable=False),
)
class Student(Base):
__tablename__ = 't_student'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(10), nullable=False, comment="姓名")code>
stuno = Column(String(10), nullable=False, comment="学号")code>
# 使用relationship()定义多对多关系
courses = relationship("Course",
# 指定中间关联表
secondary="t_student_course",code>
back_populates="students",code>
lazy='dynamic') # 使用lazy='dynamic'可以返回查询对象code>
class Course(Base):
__tablename__ = 't_course'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(10), nullable=False, comment="课程名称")code>
# 使用relationship()定义反向多对多关系
students = relationship("Student",
# 指定中间关联表
secondary="t_student_course",code>
back_populates="courses")code>
def init_db():
# 创建继承base类的表的映射关系
Base.metadata.create_all(engine)
if __name__ == '__main__':
init_db()
方式二:关联表映射到ORM类 使用backref参数实现反向关联
因为backref会自动创建反向关联,所以只需要在除了关联表外的其中一个表里面建立反向关联即可,需要借助参数secondary。
class Student(Base):
__tablename__ = 't_student'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(10), nullable=False, comment="姓名")code>
stuno = Column(String(10), nullable=False, comment="学号")code>
# 与StudentCourse定义多对多关系
courses = relationship("Course",
secondary="t_student_course", # 关联表的名称code>
backref="students") # 为Course模型创建反向引用code>
class Course(Base):
__tablename__ = 't_course'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(10), nullable=False, comment="课程名称")code>
class StudentCourse(Base):
__tablename__ = 't_student_course'
id = Column(Integer, primary_key=True, autoincrement=True)
studentid = Column(Integer, ForeignKey('t_student.id'), nullable=False)
courseid = Column(Integer, ForeignKey('t_course.id'), nullable=False)
__table_args__ = (
# 确保student_id和course_id的组合是唯一的
UniqueConstraint('studentid', 'courseid', name='_student_course_uc'),code>
)
示例:查询姓名为张三选择的所有课程名称
1、插入测试数据
insert into t_student values (null, '张三', '2000100101'),(null, '李四',
'2000100102'),(null, '小五', '2000100103'),(null, '小七', '2000100104');
insert into t_course values (null, 'Java'), (null, 'PHP'), (null , 'MySQL') , (null, 'Hadoop');
insert into t_student_course values (null,1,1),(null,1,2),(null,1,3),(null,2,2), (null,2,3),(null,3,4);
2、代码实现
def query():
Session = sessionmaker(bind=engine)
session = Session()
stu = session.query(Student).filter_by(name="张三").first()code>
if stu:
for course in stu.courses:
print(course.name)
if __name__ == '__main__':
query()
1.4.5.4、scoped_session实现线程安全
在SQLAlchemy中,scoped_session是一个工厂,它产生线程局部(thread-local)的Session对象。也就是在一个线程中,多次调用scoped_session工厂将返回同一个Session实例,而在另一个线程中,你会得到一个不同的实例。这有助于实现线程安全的数据库会话管理,因为每个线程都有自己的会话,从而避免了并发问题。
使用scoped_session来实现线程安全的步骤:
首先,你需要一个Session类,这通常是通过sessionmaker创建的。然后,你使用scoped_session来包装这个Session类。其次你可以通过scoped_session工厂来获取会话,并在需要时使用它。最后在不需要时,关闭它。【注意:当线程结束时,scoped_session会自动关闭并清理其内部的Session实例。 所以,通常不需要在代码中显式地关闭或清理会话。】
<code>from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session,declarative_base
# 假设你有一个Base类和你的模型定义...
Base = declarative_base()
# 创建一个引擎
engine = create_engine('sqlite:///example.db')
# 创建一个Session类
Session = sessionmaker(bind=engine)
# 使用scoped_session来包装Session类
# 这将确保每次在同一个线程中调用scoped_session()时,都会返回相同的Session实例
scoped_session = scoped_session(Session)
# 在你的代码中...
def some_function():
# 获取一个会话
session = scoped_session()
# 使用会话进行查询、添加、更新或删除操作...
# 例如: result = session.query(MyModel).filter_by(some_column='value').first() code>
# 提交事务(如果需要)
session.commit()
# 关闭会话(通常不需要,因为scoped_session会在线程结束时自动关闭会话)
# 但如果你想在函数结束时立即关闭它,可以调用remove()方法
# scoped_session.remove()
1.4.5.5、新增数据
需求:在表t_student中新增数据
数据模型:
class Student(Base):
__tablename__ = 't_student'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(10), nullable=False, comment="姓名")code>
stuno = Column(String(10), nullable=False, comment="学号")code>
# 使用relationship()定义多对多关系
courses = relationship("Course",
secondary="t_student_course",code>
back_populates="students",code>
lazy='dynamic') # 使用lazy='dynamic'可以返回查询对象code>
新增数据示例:
from sqlalchemy.orm import declarative_base, sessionmaker, scoped_session
from sqlalchemyBaseUse import engine
from many_to_many_relationship import Student
# 创建ORM基类
Base = declarative_base()
# 创建会话类
Session = sessionmaker(bind=engine)
# 使用scoped_session创建线程安全会话
session = scoped_session(Session)
# # 新增一条数据
# # 创建一个新的Student实例
# stu1 = Student(name="Tom", stuno="2000100105")code>
# # 将新实例添加到会话中
# session.add(stu1)
# # 提交会话,将数据保存到数据库
# session.commit()
# 批量添加数据
session.add_all([
Student(name="Jack", stuno="2000100106"),code>
Student(name="小爱", stuno="2000100107"),code>
Student(name="大胖", stuno="2000100108"),code>
])
session.commit()
1.4.5.6、修改数据
修改数据的流程:先查询出需要修改的数据,然后修改数据,最后提交修改。
需求:修改表t_student的数据
1、修改id=5的数据,修改为name="Nicky", stuno="2000100109"2、修改name="大胖"的数据,修改为name="小胖"3、修改id>6的stuno,每个stuno前面都加123
注意:如果stuno的长度是10,需要增加一下长度,不然新增超过长度,会出现添加失败的错误。修改字段数据类型的SQL语句如下(需要执行SQL语句):
ALTER TABLE t_student MODIFY stuno varchar(30);
<code>from sqlalchemy.orm import declarative_base, sessionmaker, scoped_session
from sqlalchemyBaseUse import engine
from many_to_many_relationship import Student
# 创建ORM基类
Base = declarative_base()
# 创建会话类
Session = sessionmaker(bind=engine)
# 使用scoped_session创建线程安全会话
session = scoped_session(Session)
# 1、修改id=5的数据,修改为name="Nicky", stuno="2000100109"code>
# # 查询出id=5的数据
# stu = session.get(Student, 5)
# # 修改数据
# if stu:
# stu.name = "Nicky"
# stu.stuno = "2000100109"
# # 提交修改的数据
# session.commit()
# # 2、修改name="大胖"的数据,修改为name="小胖"code>
# session.query(Student).filter_by(name="大胖").update({"name":"小胖"})code>
# # 提交修改的数据
# session.commit()
# 3、修改id>6的stuno,每个stuno前面都加123
session.query(Student).filter(Student.id > 6).update({"stuno": "123" + Student.stuno})
session.commit()
1.4.5.7、删除数据
删除数据流程:先查询出数据,然后再删除数据
需求:删除表t_student的数据
1、删除id=5的数据2、删除name="小胖"的数据
<code>from sqlalchemy.orm import declarative_base, sessionmaker, scoped_session
from sqlalchemyBaseUse import engine
from many_to_many_relationship import Student
# 创建ORM基类
Base = declarative_base()
# 创建会话类
Session = sessionmaker(bind=engine)
# 使用scoped_session创建线程安全会话
session = scoped_session(Session)
# 1、删除id=5的数据
# stu_info = session.get(Student, 5)
# if stu_info:
# session.delete(stu_info)
# session.commit()
# 2、删除name="小胖"的数据code>
session.query(Student).filter_by(name="小胖").delete()code>
session.commit()
1.4.5.8、查询数据
1、测试数据准备
创建一张t_student2表,表的数据模型如下:注意:如果数据库的编码类型不是utf8mb4,表生成后,需要执行以下SQL语句修改表的编码为utf8mb4,不然插入中文会出现报错。
ALTER TABLE t_student2 CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
from sqlalchemy.orm import declarative_base , sessionmaker, scoped_session
from sqlalchemyBaseUse import engine
from sqlalchemy import Column, Integer, String, DateTime
# 声明ORM基类
Base = declarative_base()
Session = sessionmaker(bind=engine)
session = scoped_session(Session)
class Student(Base):
__tablename__ = 't_student2'
id = Column(Integer, primary_key=True)
stuno = Column(String(10), comment="学号")code>
name = Column(String(10), comment="姓名")code>
gender = Column(String(1), comment="性别")code>
age = Column(Integer, comment="年龄")code>
idcard = Column(String(18), comment="身份证")code>
entrydate = Column(DateTime, comment="入学时间")code>
addr = Column(String(50), comment="家庭地址")code>
def init_db():
# 创建继承base类的表的映射关系
Base.metadata.create_all(engine)
def basic_query():
pass
if __name__ == '__main__':
init_db()
往表中插入测试数据,如下:
# 向表中插入数据
insert into t_student2 values('1','1','小洋洋','女',18,'12345678901234957','2023-02-03',"北京"),
('2','2','小芳','女',18,'123456789012345789','2023-02-03',"北京"),
('3','3','小枫','男',22,'123456789012345123','2023-01-03',"上海"),
('4','4','小敏','女',20,'123456789012345345','2022-01-03',"北京"),
('5','5','小李','男',20,'12345678901234534X','2022-01-03',"上海"),
('6','6','王小敏','女',16,'123456789012345345','2022-01-03',"成都"),
('7','7','大刘','男',25,'123456789012345102','2022-01-03',"深圳"),
('8','8','林逸','男',17,'12345678901234534X','2022-01-03',"北京"),
('9','9','莫小迪','女',21,'123456789012345302','2022-01-03',"成都"),
('10','10','林仙仙','女',16,'123456789012345330','2022-01-03',"深圳"),
('11','11','叶小辰','男',18,'123456789012345352','2022-01-03',"成都"),
('12','12','韩跑跑','男',24,'12345678901234554X','2022-01-03',"北京");
2、基础查询
2.1、查询多个字段(查询指定字段)
# 指定字段查询
stus = session.query(Student.name, Student.age).all()
for name, age in stus:
print(name, age)
# 全表查询
stus = session.query(Student).all()
for stu in stus:
print(stu.name)
2.2、去除重复记录
# 查询单个字段的不重复值
# 查询User表中所有不重复的name字段值
unique_names = session.query(distinct(User.name)).all()
# 遍历结果
for name in unique_names:
print(name)
# 查询多个字段的不重复组合
# 查询User表中所有不重复的name和age组合
unique_name_age_combinations = session.query(distinct(User.name, User.age)).all()
# 遍历结果
for name, age in unique_name_age_combinations:
print(name, age)
2.3、调试小技巧
在编写完查询代码后,可以先打印出代码对应转化的SQL语句,检查SQL语句是否正确,然后再执行。
示例:
# 指定字段查询
# 此时语句结尾不加.all(),输出的就是SQL语句
stus = session.query(Student.name, Student.age)
print(stus)
3、条件查询
3.1、常用的比较运算符
比较运算符
| 作用
|
>
| 大于
|
>=
| 大于等于
|
小于
| |
小于等于
| |
=
| 等于
|
<> 或 !=
| 不等于
|
IN(...)
| 在in之后的列表中的值,多选一
|
LIKE 占位符
| 模糊匹配(_匹配单个字符, %匹配任意个字符)
|
IS NULL
| 是NULL
|
3.2、常见的逻辑运算符
逻辑运算符
| 作用
|
AND
| 并且 (多个条件同时成立)
|
OR
| 或者 (多个条件任意一个成立)
|
NOT
| 非 , 不是
|
3.3、综合示例
<code>def conditional_query():
# 1、查询年龄大于20的学生
stus = session.query(Student).filter(Student.age > 20).all()
# 2、查询年龄不等于20的学生
stus2 = session.query(Student).filter(Student.age != 20).all()
# 或者使用<>, 注意python3.X版本不支持使用<>,官方推荐使用!=作为不等于运算符。
# stus2_1 = session.query(Student).filter(Student.age <> 20).all()
# 3、查询年龄为18或20或25的学生信息
stus3 = session.query(Student).filter(Student.age.in_([18, 20, 25])).all()
# 4、查询年龄不为18或20或25的学生信息
stus4 = session.query(Student).filter(~Student.age.in_([18, 20, 25])).all()
# 5、查询家庭住址为空的学生信息
stus5 = session.query(Student).filter(Student.addr == None).all()
# 或者使用is_()方法
stus5_1 = session.query(Student).filter(Student.addr.is_(None)).all()
# 6、查询家庭住址不为空的学生信息
stus6 = session.query(Student).filter(Student.addr != None).all()
# 或者使用isnot_()方法
stus6_1 = session.query(Student).filter(Student.addr.isnot(None)).all()
# 7、查询姓林,名字是两个字的学生信息
stus7 = session.query(Student).filter(Student.name.like("林_")).all()
# 8、查询身份证号最后一位是X的学生信息
stus8 = session.query(Student).filter(Student.idcard.like("%X")).all()
# 9、查询年龄在18岁(包含)到25岁(包含)之间的学生信息
stus9 = session.query(Student).filter(Student.age >=18, Student.age <= 25).all()
# 或者使用and_()方法
stus9_1 = session.query(Student).filter(and_(Student.age >= 18, Student.age <= 25)).all()
# 10、查询年龄为18或20或25的学生信息
stus10 = session.query(Student).filter(or_(Student.age==18, Student.age==20, Student.age==25)).all()
4、聚合函数
说明:将一列数据作为一个整体,进行纵向计算 。
语法:
SELECT 聚合函数(字段列表) FROM 表名 ;
注意 : NULL值是不参与所有聚合函数运算的。
4.1、常见的聚合函数
函数
| 作用
|
count
| 统计数量
|
max
| 最大值
|
min
| 最小值
|
avg
| 平均值
|
sum
| 求和
|
说明:SQLAlchemy 提供了一组内置的函数,这些函数可以在 func 命名空间中直接使用,类似于 SQL 中的聚合函数,如 COUNT(), SUM(), AVG(), MAX(), MIN() 等。
4.2、综合示列
def aggregate_query():
# 1、统计学生总人数
count = session.query(func.count(Student.id)).scalar()
# 2、统计学生的平均年龄
avg = session.query(func.avg(Student.age)).scalar()
# 3、统计学生的最大年龄
max = session.query(func.max(Student.age)).scalar()
# 4、统计学生的最小年龄
min = session.query(func.min(Student.age)).scalar()
# 统计男生的总年龄
count_man = session.query(func.sum(Student.age)).filter_by(gender="男").scalar()code>
5、分组查询
1、where与having区别
执行时机不同:where是分组之前进行过滤,不满足where条件,不参与分组;而having是分组之后对结果进行过滤。判断条件不同:where不能对聚合函数进行判断,而having可以。
注意事项:
分组之后,查询的字段一般为聚合函数和分组字段,查询其他字段无任何意义。执行顺序: where > 聚合函数 > having 。支持多字段分组, 具体语法为 : group by columnA,columnB
在 SQLAlchemy 中,使用 group_by() 方法来执行分组查询。分组查询通常与聚合函数(如 COUNT(), SUM(), AVG(), MAX(), MIN() 等)一起使用,以便对每个分组进行计算。
5.1、综合示例
def group_by_query():
# 1、根据性别分组,统计男学生和女学生的数量
# select sex, count(*) from t_student2 group by sex;
results = session.query(Student.gender, func.count(Student.id).label("stu_nums")).group_by(Student.gender).all()
# for gender, stu_nums in results:
# print(gender, stu_nums)
# 2、根据性别分组 , 统计男学生和女学生的平均年龄
# select sex,avg(age) from t_student2 group by sex;
results2 = session.query(Student.gender, func.avg(Student.age).label("avg_age")).group_by(Student.gender).all()
# for gender, avg_age in results2:
# print(gender, avg_age)
# 3、查询年龄小于25的学生 , 并根据家庭地址分组 , 获取学生数量大于等于3的家庭地址
# select addr,count(*) addr_num from t_student2 where age<25 group by addr having addr_num>=3;
# 方式一: 通过having实现
results3 = session.query(Student.addr, func.count(Student.id)).filter(Student.age < 25).group_by(Student.addr)\
.having(func.count(Student.id) >= 3).all()
# 方式二: 通过子查询实现,在子句中使用 filter()和比较运算符来实现它。
# 3.1、 先查询年龄小于 25 的学生,并根据家庭地址分组,获取学生数量大于等于 3 的家庭地址
subquery = session.query(Student.addr, func.count(Student.id).label("stu_nums")).filter(Student.age < 25).\
group_by(Student.addr).subquery()
# 3.2、使用子查询和 filter 子句来过滤出学生数量大于等于 3 的家庭地址
results3_1 = session.query(subquery.c.addr, subquery.c.stu_nums).filter(subquery.c.stu_nums >= 3).all()
# for addr, stu_nums in results3:
# print(addr, stu_nums)
# 统计不同家庭地址男女生的数量
# select addr,gender,count(*) from t_student2 group by addr,gender;
results4 = session.query(Student.addr, Student.gender, func.count(Student.id).label("stu_nums")).group_by(Student.addr, Student.gender).all()
print(results4)
6、排序查询
6.1、排序方式
ASC : 升序(默认值)DESC: 降序
注意:
如果是升序, 可以不指定排序方式ASC ;如果是多字段排序,当第一个字段值相同时,才会根据第二个字段进行排序 ;
def order_by_query():
# 1、根据年龄对学生进行升序排序
# select *from t_student order by age asc;
res = session.query(Student).order_by(Student.age.asc()).all()
# 或者
# select *from t_student order by age;
res_1 = session.query(Student).order_by(Student.age).all()
# for res in res:
# print(res.age)
# 2、根据年龄对学生进行升序排序 , 年龄相同 , 再按照入学时间进行降序排序
# select *from t_student order by age,entrydate desc;
res2 = session.query(Student).order_by(Student.age, Student.entrydate.desc()).all()
# for res in res2:
# print(res.age,res.entrydate)
7、 分页查询
注意:
起始索引从0开始,起始索引 = (查询页码 - 1)* 每页显示记录数。分页查询是数据库的方言,不同的数据库有不同的实现,MySQL中是LIMIT。如果查询的是第一页数据,起始索引可以省略,直接简写为 limit 10。
说明:在 SQLAlchemy 中,分页查询通常通过使用 offset() 和 limit() 方法来实现。这两个方法分别用于设置索引偏移量和限制查询结果的条数。
7.1、综合示例
def limit_query():
# 1、查询第1页学生数据, 每页展示10条记录
# select *from t_student limit 0,10;
res = session.query(Student).offset(0).limit(10).all()
# 或者
# select *from t_student limit 10;
res_1 = session.query(Student).limit(10).all()
# for stu in res:
# print(stu.id)
# 2、查询第2页学生数据, 每页展示10条记录
# 说明:起始索引=(页码-1)*页展示记录数
# select *from t_student limit 10,10;
res2 = session.query(Student).offset(10).limit(10).all()
# for stu in res2:
# print(stu.id)
1.4.5.9、多表查询
1、多表查询概述
1.1、测试数据准备
说明:先建立t_emp员工表和t_dept部门表两张表,并插入对应数据
1、创建对应的ORM模型如下:
from sqlalchemy.orm import declarative_base , sessionmaker, scoped_session
from sqlalchemyBaseUse import engine
from sqlalchemy import Column, Integer, String, DateTime, distinct, or_, and_, func, ForeignKey
# 声明ORM基类
Base = declarative_base()
Session = sessionmaker(bind=engine)
session = scoped_session(Session)
# 部门表
class Departments(Base):
__tablename__ = 't_dept'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(50), nullable=False, comment="部门名称")code>
# 员工表
class Employes(Base):
__tablename__ = "t_emp"
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(50), nullable=False, comment="姓名")code>
age = Column(Integer, comment="年龄")code>
job = Column(String(20), comment="职位")code>
salary = Column(Integer, comment="薪资")code>
entrydate = Column(DateTime, comment="入职时间")code>
managerid = Column(Integer, comment="直属领导ID")code>
dept_id = Column(Integer, ForeignKey("t_dept.id"))
def init_db():
# 创建继承base类的表的映射关系
Base.metadata.create_all(engine)
if __name__ == '__main__':
init_db()
注意:如果创建的表的编码不是utf8mb4,需要修改为utf8mb4,不然插入中文的数据会出现编码错误,执行以下SQL语句:
ALTER TABLE t_dept CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
ALTER TABLE t_emp CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
2、插入测试数据
INSERT INTO t_dept (name) VALUES ('研发部'), ('市场部'),('财务部'), ('销售部'), ('总经办'), ('人事部');
INSERT INTO t_emp (id, name, age, job,salary, entrydate, managerid, dept_id) VALUES
(1, '大刘', 28, '总裁',40000, '2000-01-01', null,5),
(2, '夏析', 20, '项目经理',20000, '2005-12-05', 1,1),
(3, '李兴', 33, '开发', 8000,'2000-11-03', 2,1),
(4, '张敏', 30, '开发',11000, '2002-02-05', 2,1),
(5, '林夕', 43, '开发',10500, '2004-09-07', 3,1),
(6, '小美', 19, '程序员鼓励师',6600, '2004-10-12', 2,1),
(7, '林逸', 60, '财务总监',8500, '2002-09-12', 1,3),
(8, '李媛', 19, '会计',48000, '2006-06-02', 7,3),
(9, '林妙妙', 23, '出纳',5250, '2009-05-13', 7,3),
(10, '赵芳', 20, '市场部总监',12500, '2004-10-12', 1,2),
(11, '张三', 56, '职员',3750, '2006-10-03', 10,2),
(12, '李四', 19, '职员',3750, '2007-05-09', 10,2),
(13, '王二', 19, '职员',5500, '2009-02-12', 10,2),
(14, '周鑫', 88, '销售总监',14000, '2004-10-12', 1,4),
(15, '刘达', 38, '销售',4600, '2004-10-12', 14,4),
(16, '老钱', 40, '销售',4600, '2004-10-12', 14,4),
(17, '小六', 42, null,2000, '2011-10-12', 1,null);
1.2、概述
说明:多表查询就是指从多张表中查询数据。
操作:要执行多表查询,就只需要使用逗号分隔多张表,如: select * from t_emp , t_dept;
具体的执行结果如下:
解释:可见查询结果中包含了大量的结果集,总共102条记录,而这其实就是员工表emp所有的记录 (17) 与 部门表dept所有记录(6) 的所有组合情况,这种现象称之为笛卡尔积。
笛卡尔积: 笛卡尔乘积是指在数学中,两个集合A集合 和 B集合的所有组合情况。
而在多表查询中,需要消除无效的笛卡尔积,只保留两张表关联部分的数据;使用SQL语句,消除多表查询的笛卡尔积:
select *from t_emp,t_dept where t_emp.dept_id = t_dept.id;
1.3、多表查询的分类
连接查询
内连接:相当于查询A、B交集部分数据外连接:
右外连接:查询右表所有数据,以及两张表交集部分数据左外连接:查询左表所有数据,以及两张表交集部分数据自连接:当前表与自身的连接查询,自连接必须使用表别名子查询
2、内连接
说明:内连接查询的是两张表交集部分的数据
内连接的语法分为两种:
隐式内连接显式内连接
语法:
1、隐式内连接
SELECT 字段列表 FROM 表1 , 表2 WHERE 条件 ... ;
2、显示内连接(inner join)
SELECT 字段列表 FROM 表1 [ INNER ] JOIN 表2 ON 连接条件 ... ;
注意:SQL语句中的内连接的inner关键可以省略
说明:在 SQLAlchemy 中,内连接(INNER JOIN)是默认的连接类型,当你使用 join() 方法而不指定 isouter=True 时,你就是在执行内连接。内连接只返回满足连接条件的行,即两个表中都存在匹配项的行。
示例:
1、查询每一个员工的姓名 , 及关联的部门的名称 (使用隐式内连接实现)
使用filter()函数实现,在filter()函数中声明关联关系
<code>res = session.query(Employes, Departments).filter(Employes.dept_id == Departments.id).all()
2、查询每一个员工的姓名 , 及关联的部门的名称 (使用显式内连接实现)
使用join()函数实现,当你想要连接两个模型时,应该首先指定主模型(在 session.query() 中列出的第一个模型),然后使用 join() 方法连接第二个模型,并指定连接条件。
res2 = session.query(Employes, Departments).join(Departments, Employes.dept_id == Departments.id).all()
3、查询每一个员工的姓名 , 及关联的部门的名称(使用relationship实现内连接查询)
首先在Departments类中新建relationship反向关联关系
employes = relationship("Employes", back_populates="departments")code>
在Employes类中新建relationship反向关联关系
departments = relationship("Departments", back_populates="employes")code>
重新执行ORM映射
def init_db():
# 创建继承base类的表的映射关系
Base.metadata.create_all(engine)
if __name__ == '__main__':
init_db()
代码实现如下:
res3 = session.query(Employes).join(Employes.departments)
3、外连接
外连接分为两种,分别是:
左外连接右外连接
1、左外连接
说明:左外连接相当于查询表1(左表)的所有数据,当然也包含表1和表2交集部分的数据。
语法:
SELECT 字段列表 FROM 表1 LEFT [ OUTER ] JOIN 表2 ON 条件 ... ;
2、右外连接
说明:右外连接相当于查询表2(右表)的所有数据,当然也包含表1和表2交集部分的数据。
语法:
SELECT 字段列表 FROM 表1 RIGHT [ OUTER ] JOIN 表2 ON 条件 ... ;
说明:SQLAlchemy 支持左外连接(LEFT OUTER JOIN)和右外连接(RIGHT OUTER JOIN)。不过,在 SQLAlchemy 的 ORM 层,通常更常见的是使用左外连接,因为 SQLAlchemy 更多地是按照关系型数据库的标准来设计的,而标准 SQL 中右外连接并不如左外连接那样常见。
但是,你可以使用 SQLAlchemy 的 Core 表达式语言来执行右外连接。以下是如何在 SQLAlchemy 中使用左外连接和右外连接的示例:
注意事项:左外连接和右外连接是可以相互替换,只需要调整在连接查询时SQL中,表结构的先后顺序就可以。在实际开发使用时,左外连接常用。
示例:
1、查询t_emp表的所有数据, 和对应的部门信息 (左外连接)
# 1、查询t_emp表的所有数据, 和对应的部门信息(左外连接)
# select e.*,d.name from t_emp e left outer join t_dept d on e.dept_id = d.id;
# 注意:outerjoin()函数中的连接条件可以省略,Employes.dept_id == Departments.id
res = session.query(Employes, Departments.name).outerjoin(Departments).all()
# print(res)
2、查询t_dept表的所有数据, 和对应的员工信息(右外连接)
# 查询t_dept表的所有数据, 和对应的员工信息(右外连接)
# select d.*,e.* from t_emp e right outer join t_dept d on d.id = e.dept_id;
# 使用join()函数实现,isouter=True使用左外连接,左外连接和右外连接可以相互转换,也就是表的位置不同
# join()函数中的第一个参数Employes表示左外连接关联的表是t_emp,主表是t_dept
res2 = session.query(Employes, Departments).join(Employes, isouter=True).all()
# print(res2)
4、自连接查询
说明:自连接查询,顾名思义,就是自己连接自己,也就是把一张表连接查询多次。
连接方式:对于自连接查询,可以是内连接查询,也可以是外连接查询。
注意事项:在自连接查询中,必须要为表起别名,不然不清楚所指定的条件、返回的字段,到底是哪一张表的字段。
语法:
SELECT 字段列表 FROM 表A 别名A JOIN 表A 别名B ON 条件 ... ;
说明:同样的在SQlAlchemy中实现自连接查询,也需要为表起别名的方式,起别名使用aliased()函数实现。
示列:
1、查询员工 及其 所属领导的名字
分析:可将t_emp表看作两个表,要求查询每个员工及其对应的领导姓名,可见这是一个交集关系,需要使用内连接。
# select a.name '员工姓名', b.name '领导姓名' from t_emp a join t_emp b on a.managerid = b.id;
# 或
# select a.name '员工姓名', b.name '领导姓名' from t_emp a, t_emp b where a.managerid = b.id;
# 为表Employes创建别名
user = aliased(Employes)
manager = aliased(Employes)
# 使用Inner join 的连接方式
res = session.query(user.name.label("employee_name"), manager.name.label("manager_name")).join(manager, user.managerid == manager.id).all()
print(res)
2、查询所有员工 t_emp a 及其领导的名字 t_emp b, 如果员工没有领导, 也需要查询出来 。
分析:如果员工没有领导, 也需要查询出来,由此可见需要使用外连接,一般使用左外连接
# 为表Employes创建别名
user = aliased(Employes)
manager = aliased(Employes)
# select a.name '员工姓名', b.name '领导姓名' from t_emp a left join t_emp b on a.managerid = b.id;
res2 = session.query(user.name.label("employee_name"), manager.name.label("manager_name")).join(manager, user.managerid == manager.id, isouter=True).all()
print(res2)
5、联合查询
关键字:union
说明:
对于union查询,就是把多次查询的结果合并起来,形成一个新的查询结果集。对于联合查询的多张表的列数必须保持一致,字段类型也需要保持一致。union all 会将全部的数据直接合并在一起。union 会对合并之后的数据去重。
语法:
SELECT 字段列表 FROM 表A ...
UNION [ ALL ]
SELECT 字段列表 FROM 表B ....;
说明:在 SQLAlchemy 中,可以使用 union() 或者 union_all()方法来执行联合查询。
示例:
1、将薪资低于 5000 的员工 , 和 年龄大于 45 岁的员工全部查询出来.
分析:当前对于上述需求,可以直接使用多条件查询,使用逻辑运算符 and 连接即可。也可以通过union/union all来联合查询.
SQL实现如下:
select * from t_emp where salary < 5000
union all
select * from t_emp where age > 45;
ORM实现如下:
query1 = session.query(Employes).filter(Employes.salary < 5000)
query2 = session.query(Employes).filter(Employes.age > 45)
union_query = query1.union(query2).all()
for un in union_query:
print(un.name)
6、子查询
6.1、概述
1、概念
SQL语句中嵌套SELECT语句,称为嵌套查询,又称子查询子查询外部的语句可以是INSERT / UPDATE / DELETE / SELECT 的任何一个。
如:
SELECT * FROM t1 WHERE column1 = ( SELECT column1 FROM t2 );
2、分类
分类依据:根据子查询结果
分为:
标量子查询(子查询结果为单个值)列子查询(子查询结果为一列)行子查询(子查询结果为一行)表子查询(子查询结果为多行多列)
分类依据2:根据子查询位置
分为:
WHERE之后FROM之后SELECT之后
6.2、标量子查询
说明:子查询返回的结果是单个值(数字、字符串、日期等)
常用操作符:= <> > >= <
示列:
1、查询“销售部”所有员工的信息
SQL实现如下:
分析:可先将查询分为两步
1、先查询出销售部的部门id
select id from t_dept where name = '销售部';
2、然后根据查出的部门id再查询销售部的所有员工信息
select e.* from t_emp e where dept_id = (select id from t_dept where name = '销售部');
或者使用关联查询
select e.* from t_emp e join t_dept d on e.dept_id = d.id where d.name='销售部';
ORM实现如下:
使用 .scalar_subquery() 方法明确地产生一个标量子查询注意:代码可能会出现警告但是可以正常执行,因为SQLAlchemy 建议在 in_() 方法中明确传递一个 select() 构造,而不是直接将子查询对象传递给 in_() 方法。
<code># 1.1、先查询出销售部的部门id
subquery = session.query(Departments.id).filter(Departments.name=="销售部").scalar_subquery()
# 可能会出现警告,可以将子查询转换成 select 语句
# subquery = select([Departments.id]).where(Departments.name == "销售部")
# 1.2、然后根据查出的部门id再查询销售部的所有员工信息
res = session.query(Employes).filter(Employes.dept_id.in_(subquery)).all()
print(res)
2、查询在员工“林逸”之后入职的所有员工信息
SQL实现如下:
分析:可先将查询分为两步
1、先查询出林逸的入职时间
select entrydate from t_emp where name = '林逸';
2、然后根据入职时间再查找出在林逸入职时间之后的员工信息
select e.* from t_emp e where entrydate > (select entrydate from t_emp where name = '林逸');
ORM实现如下:
subquery2 = session.query(Employes.entrydate).filter_by(name="林逸").scalar_subquery()code>
res2 = session.query(Employes).filter(Employes.entrydate > subquery2)
print(res2)
6.3、列子查询
说明:子查询返回的结果是一列(可以是多行)
常用操作符:IN 、NOT IN 、 ANY 、SOME 、 ALL
操作符
| 描述
|
IN
| 在指定的集合范围之内,多选一
|
NOT IN
| 不在指定的集合范围之内
|
ANY
| 子查询返回列表中,有任意一个满足即可
|
SOME
| 与ANY等同,使用SOME的地方都可以使用ANY
|
ALL
| 子查询返回列表的所有值都必须满足
|
示列:
1、查询 "销售部" 和 "市场部" 的所有员工信息
SQL实现如下:
分析:可先将查询分为两步
1、先在部门表中查询出销售部和市场部的部门id
select id from t_dept where name = '销售部' or name = '市场部';
2、然后根据1查出的结果,使用关键字in查询出对应销售部和市场部的员工信息1
select * from t_emp where dept_id in (select id from t_dept where name = '销售部' or name = '市场部');
ORM实现如下:
subquery = session.query(Departments.id).filter(or_(Departments.name=="销售部", Departments.name=="市场部")).subquery()
res = session.query(Employes).filter(Employes.id.in_(subquery))
2、 查询比“财务部”所有人工资都高的员工信息
SQL实现如下:
分析:可先将查询分为两步
1、先查询出财务部所有人员的工资信息
select salary from t_emp e join t_dept d on e.dept_id = d.id where d.name = '财务部';
2、然后根据1查出的结果,使用关键字all查询出比财务部所有人工资都高的员工信息
select * from t_emp where salary > all (select salary from t_emp e join t_dept d on e.dept_id = d.id where d.name = '财务部');
或
select * from t_emp where salary > all ( select salary from t_emp where dept_id =(select id from t_dept where name = '财务部') );
ORM实现如下:
使用 filter() 方法结合 all_() 函数来实现
subquery2 = session.query(Employes.salary).join(Departments).filter(Departments.name=="财务部").scalar_subquery()
res2 = session.query(Employes).filter(Employes.salary > all_(subquery2)).all()
3、 查询比“研发部”其中任意一人工资都高的员工信息
SQL实现如下:
分析:可先将查询分为两步
1、先查询出研发部所有人员的工资信息
select salary from t_emp where dept_id = (select id from t_dept where name = '研发部');
2、然后根据1查出的结果,使用关键字any查询出比研发部任意一人工资都高的员工信息
select * from t_emp where salary > any (select salary from t_emp where dept_id = (select id from t_dept where name = '研发部'));
ORM实现如下:
使用 filter() 方法结合 any_() 函数来实现
subquery3 = session.query(Employes.salary).join(Departments).filter(Departments.name == "研发部").scalar_subquery()
res3 = session.query(Employes).filter(Employes.salary > any_(subquery3)).all()
6.4、行子查询
说明:子查询返回的结果是一行(可以是多列)
常用的操作符:= 、<> 、IN 、NOT IN
示列:
1、 查询与 "张敏" 的薪资及直属领导相同的员工信息
SQL实现如下:
分析:可先将查询分为两步
1、先查询出张敏的薪资和直属领导的id
select salary,managerid from t_emp where name = '张敏';
2、然后根据1查出的结果,使用 = 查询与"张敏"的薪资及直属领导相同的员工信息
select * from t_emp where (salary,managerid) = (select salary,managerid from t_emp where name = '张敏');
ORM实现如下:
使用 filter() 方法结合 tuple_() 函数来实现
subquery = session.query(Employes.salary, Employes.managerid).filter_by(name="张敏").subquery()code>
res = session.query(Employes).filter(tuple_(Employes.salary,Employes.managerid).in_(subquery)).all()
6.5、表子查询
说明:子查询返回的结果是多行多列
常用的操作符:IN
示列:
1、查询与 "林夕" , "林妙妙" 的职位和薪资相同的员工信息
SQL实现如下:
分析:可先将查询分为两步
1、先查询林夕和林妙妙的职位与薪资信息
select job, salary from t_emp where name in ('林夕','林妙妙');
2、然后根据1查出的结果,使用 in 查询与"林夕","林妙妙"的职位和薪资相同的员工信息
select * from t_emp where (job,salary) in (select job, salary from t_emp where name in ('林夕','林妙妙'));
ORM实现如下:
使用 filter() 方法结合 tuple_() 函数来实现
subquery = session.query(Employes.job, Employes.salary).filter(Employes.name.in_(["林夕", "林妙妙"])).subquery()
res = session.query(Employes).filter(tuple_(Employes.job, Employes.salary).in_(subquery)).all()
2、查询入职日期是 "2002-09-12" 之后的员工信息 , 及其部门信息
SQL实现如下:
分析:可先将查询分为两步
1、先查询出入职日期是 "2002-09-12" 之后的员工信息
select * from t_emp where entrydate > "2002-09-12";
2、根据1查询出的表信息,在查询对应的部门信息
select e.*, d.* from (select * from t_emp where entrydate > "2002-09-12") e join t_dept d on e.dept_id = d.id;
ORM实现如下:
使用join()实现
subquery2 = session.query(Employes).filter(Employes.entrydate > "2002-09-12").subquery()
res2 = session.query(subquery2, Departments).join(Departments, subquery2.c.dept_id == Departments.id).all()
声明
本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。