【Python系列】SQLAlchemy 基本介绍

CSDN 2024-09-16 11:05:02 阅读 82

💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。

img

推荐:kwan 的首页,持续学习,不断总结,共同进步,活到老学到老导航

檀越剑指大厂系列:全面总结 java 核心技术,jvm,并发编程 redis,kafka,Spring,微服务等常用开发工具系列:常用的开发工具,IDEA,Mac,Alfred,Git,typora 等数据库系列:详细总结了常用数据库 mysql 技术点,以及工作中遇到的 mysql 问题等新空间代码工作室:提供各种软件服务,承接各种毕业设计,毕业论文等懒人运维系列:总结好用的命令,解放双手不香吗?能用一个命令完成绝不用两个操作数据结构与算法系列:总结数据结构和算法,不同类型针对性训练,提升编程思维,剑指大厂

非常期待和您一起在这个小小的网络世界里共同探索、学习和成长。💝💝💝 ✨✨ 欢迎订阅本专栏 ✨✨

博客目录

一.基础介绍1. SQLAlchemy 的起源2. SQLAlchemy 的核心组件2.1 核心 SQL 工具包2.2 ORM 层

3. SQLAlchemy 的优势3.1 灵活性3.2 跨数据库支持3.3 强大的社区支持

二.实战步骤1.数据库配置2.model3.连接配置4.调用 SQL

一.基础介绍

SQLAlchemy 是一个 Python 的 SQL 工具包和对象关系映射(ORM)工具,它提供了一个高层的 ORM 以及底层的 SQL 表达式语言。SQLAlchemy 是开源的,并且可以在商业和非商业项目中免费使用。它支持多种数据库系统,包括 PostgreSQL、MySQL、SQLite 等。

在这里插入图片描述

1. SQLAlchemy 的起源

SQLAlchemy 最初由 Michael Bayer 在 2005 年创建,目的是提供一个全面的 SQL 工具包和 ORM 解决方案,以满足 Python 社区的需求。随着时间的推移,SQLAlchemy 不断发展和完善,成为了 Python 数据库编程领域中最受欢迎的库之一。

2. SQLAlchemy 的核心组件

2.1 核心 SQL 工具包

SQLAlchemy 的核心 SQL 工具包提供了构建 SQL 查询的功能,它允许开发者以 Pythonic 的方式编写 SQL 语句。这包括了对数据库表的创建、数据的增删改查等操作。

2.2 ORM 层

ORM(Object-Relational Mapping)层是 SQLAlchemy 的另一个重要组成部分,它允许开发者使用 Python 类和对象来表示数据库中的表和行。ORM 层抽象了数据库操作,使得开发者可以不必编写 SQL 语句,而是通过操作 Python 对象来间接地与数据库交互。

3. SQLAlchemy 的优势

3.1 灵活性

SQLAlchemy 提供了灵活的 SQL 构建工具,开发者可以自由地编写 SQL 语句,同时也可以利用 ORM 层提供的抽象来简化数据库操作。

3.2 跨数据库支持

SQLAlchemy 支持多种数据库系统,这意味着开发者可以使用相同的代码库来操作不同的数据库,而不需要为每种数据库编写特定的代码。

3.3 强大的社区支持

由于 SQLAlchemy 的流行,它拥有一个活跃的社区,开发者可以在社区中找到大量的资源和帮助,包括文档、教程和第三方库。

二.实战步骤

1.数据库配置

<code># 数据库

database:

TYPE: mysql

DATABASE_URL: mysql://root:xxx@xxxx:9306/test?serverTimezone=Asia/Shanghai

USERNAME: root

PASSWORD: xxx

HOST: xxxx

PORT: 9306

DBNAME: test

MAX_OVERFLOW: 60

POOL_TIMEOUT: 120

POOL_SIZE: 30

URL_PROPERTY: ?charset=utf8

ECHO: True

2.model

from datetime import datetime

import pytz

from sqlalchemy import String, Column, Text, DateTime, JSON

from sqlalchemy.ext.asyncio import AsyncAttrs

from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, attributes

def get_beijing_now():

# 获取当前系统时区

return datetime.now(pytz.timezone('Asia/Shanghai'))

# 基类

class Base(AsyncAttrs, DeclarativeBase):

id: Mapped[int] = mapped_column(primary_key=True)

create_time = Column(DateTime, default=get_beijing_now, nullable=False)

update_time = Column(DateTime, default=get_beijing_now, onupdate=get_beijing_now, nullable=False)

def to_dict(self):

"""

转为字典输出

:return:

"""

return { -- -->c.name: getattr(self, c.name) for c in self.__table__.columns}

@repr_generator

class AlchemyEntitySchemas(Base):

__tablename__ = "entity_schemas"

name = Column(String(255), nullable=False, comment='名称')code>

3.连接配置

from sqlalchemy.pool import QueuePool

from sqlalchemy import create_engine

from sqlalchemy.orm import sessionmaker

from sqlalchemy.exc import SQLAlchemyError

from sqlalchemy.sql import text

from base.config import get_config_key

from urllib.parse import quote_plus as urlquote

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, AsyncEngine, async_sessionmaker

class Database:

def __init__(self, url, pool_size=30, pool_timeout=1200, max_overflow=60, echo=False):

try:

self.engine = create_engine(url, poolclass=QueuePool, pool_size=pool_size,

max_overflow=max_overflow, pool_timeout=pool_timeout,

echo=echo, pool_recycle=7200, pool_pre_ping=True, echo_pool=echo)

self.Session = sessionmaker(bind=self.engine, expire_on_commit=False, autocommit=False, autoflush=False)

print("Database connected successfully.")

except SQLAlchemyError as e:

print(f"Error connecting to the database: { -- -->e}")

def get_session(self):

return self.Session()

@staticmethod

def close_session(_session):

_session.close()

@staticmethod

def execute_query(query, _session):

try:

result = _session.execute(query)

return result.fetchall()

except SQLAlchemyError as e:

print(f"Error executing query: { e}")

return None

finally:

Database.close_session(_session)

class SyncDatabase:

async_engine: AsyncEngine = None

async_session = None

def __init__(self, url, pool_size=30, pool_timeout=1200, max_overflow=60, echo=False):

self.url = url

self.max_overflow = max_overflow

self.pool_timeout = pool_timeout

self.pool_size = pool_size

self.echo = echo

self.connect()

def connect(self):

"""创建数据库引擎和会话类"""

try:

self.async_engine = create_async_engine(self.url, echo=self.echo, pool_size=pool_size,

max_overflow=max_overflow, pool_timeout=pool_timeout,

pool_recycle=7200,

pool_pre_ping=True, echo_pool=self.echo)

self.async_session = async_sessionmaker(bind=self.async_engine, class_=AsyncSession, expire_on_commit=False,

autocommit=False, autoflush=False)

print("Database connected successfully.")

except SQLAlchemyError as e:

print(f"Error connecting to the database: { e}")

def get_db_url():

userName = get_config_key("database", "USERNAME")

password = get_config_key("database", "PASSWORD")

dbHost = get_config_key("database", "HOST")

dbPort = get_config_key("database", "PORT")

dbName = get_config_key("database", "DBNAME")

urlProperty = get_config_key("database", "URL_PROPERTY")

if dbName is None:

return f'mysql+pymysql://{ userName}:{ urlquote(password)}@{ dbHost}:{ dbPort}{ urlProperty}'

else:

return f'mysql+pymysql://{ userName}:{ urlquote(password)}@{ dbHost}:{ dbPort}/{ dbName}{ urlProperty}'

def get_sync_db_url():

userName = get_config_key("database", "USERNAME")

password = get_config_key("database", "PASSWORD")

dbHost = get_config_key("database", "HOST")

dbPort = get_config_key("database", "PORT")

dbName = get_config_key("database", "DBNAME")

urlProperty = get_config_key("database", "URL_PROPERTY")

if dbName is None:

return f'mysql+aiomysql://{ userName}:{ urlquote(password)}@{ dbHost}:{ dbPort}{ urlProperty}'

else:

return f'mysql+aiomysql://{ userName}:{ urlquote(password)}@{ dbHost}:{ dbPort}/{ dbName}{ urlProperty}'

url = get_db_url()

max_overflow = get_config_key("database", "MAX_OVERFLOW")

pool_timeout = get_config_key("database", "POOL_TIMEOUT")

pool_size = get_config_key("database", "POOL_SIZE")

echo = get_config_key("database", "ECHO")

# sqlalchemy实际操作对象,导入的时候应该导入这个对象

get_sqlalchemy_db = Database(url, pool_size=pool_size, pool_timeout=pool_timeout, max_overflow=max_overflow, echo=echo)

# 异步的

SYNC_DB_URI = get_sync_db_url()

_async_engine = create_async_engine(SYNC_DB_URI, echo=echo, pool_size=pool_size,

max_overflow=max_overflow, pool_timeout=pool_timeout, pool_recycle=7200,

pool_pre_ping=True, echo_pool=echo)

# 异步IO的 sqlalchemy实际操作对象,导入的时候应该导入这个对象

async_session_factory = async_sessionmaker(bind=_async_engine, class_=AsyncSession, expire_on_commit=False,

autocommit=False, autoflush=False)

在这里插入图片描述

4.调用 SQL

<code>@staticmethod

async def find_by_name(name: str):

"""

根据名称查询

"""

db = get_sqlalchemy_db

try:

with Session(db.engine) as session:

stmt = select(AlchemySchemas)

if name:

stmt = stmt.where(AlchemySchemas.name == name)

schemas_infos = session.scalars(stmt).all()

return [schemas_info.to_dict() for schemas_info in schemas_infos] if schemas_infos else None

except SQLAlchemyError as e:

print(f"An error occurred: { -- -->e}")

return None

finally:

db.close_session(session)

觉得有用的话点个赞 👍🏻 呗。

❤️❤️❤️本人水平有限,如有纰漏,欢迎各位大佬评论批评指正!😄😄😄

💘💘💘如果觉得这篇文对你有帮助的话,也请给个点赞、收藏下吧,非常感谢!👍 👍 👍

🔥🔥🔥Stay Hungry Stay Foolish 道阻且长,行则将至,让我们一起加油吧!🌙🌙🌙

img



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。