强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

Python 编程教程 / 17 - 数据库

第 17 章:数据库

掌握 SQLite、SQLAlchemy ORM、数据库迁移和异步数据库操作。


17.1 SQLite(标准库)

17.1.1 基本操作

import sqlite3

# 连接数据库(文件不存在会自动创建)
conn = sqlite3.connect("app.db")
conn.row_factory = sqlite3.Row  # 返回字典式行对象

# 创建表
conn.execute("""
    CREATE TABLE IF NOT EXISTS users (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        name TEXT NOT NULL,
        email TEXT UNIQUE NOT NULL,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
""")
conn.commit()

# 插入
conn.execute("INSERT INTO users (name, email) VALUES (?, ?)", ("Alice", "[email protected]"))
conn.commit()

# 查询
rows = conn.execute("SELECT * FROM users").fetchall()
for row in rows:
    print(dict(row))

# 参数化查询(防 SQL 注入)
user = conn.execute("SELECT * FROM users WHERE email = ?", ("[email protected]",)).fetchone()
print(user["name"])

# 关闭
conn.close()

17.1.2 上下文管理器

import sqlite3

with sqlite3.connect("app.db") as conn:
    conn.execute("INSERT INTO users (name, email) VALUES (?, ?)", ("Bob", "[email protected]"))
    # 自动提交(如果无异常)或回滚(如果有异常)

17.2 SQLAlchemy ORM

17.2.1 安装与配置

$ pip install sqlalchemy

17.2.2 声明式模型

from datetime import datetime
from sqlalchemy import create_engine, String, ForeignKey, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship, Session

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(100))
    email: Mapped[str] = mapped_column(String(255), unique=True)
    created_at: Mapped[datetime] = mapped_column(default=func.now())

    # 关系
    posts: Mapped[list["Post"]] = relationship(back_populates="author")

    def __repr__(self) -> str:
        return f"User(id={self.id}, name={self.name!r})"

class Post(Base):
    __tablename__ = "posts"

    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(200))
    content: Mapped[str]
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))

    author: Mapped["User"] = relationship(back_populates="posts")

17.2.3 CRUD 操作

# 创建引擎
engine = create_engine("sqlite:///app.db", echo=True)

# 创建表
Base.metadata.create_all(engine)

# 使用 Session
with Session(engine) as session:
    # Create
    user = User(name="Alice", email="[email protected]")
    session.add(user)
    session.commit()

    # Read
    user = session.query(User).filter_by(name="Alice").first()
    users = session.query(User).filter(User.name.like("A%")).all()

    # Update
    user.name = "Alice Updated"
    session.commit()

    # Delete
    session.delete(user)
    session.commit()

17.2.4 查询构建

from sqlalchemy import select, and_, or_

with Session(engine) as session:
    # 基本查询
    stmt = select(User).where(User.name == "Alice")
    users = session.scalars(stmt).all()

    # 条件组合
    stmt = select(User).where(
        and_(
            User.name.like("A%"),
            or_(User.email.like("%@gmail.com"), User.email.like("%@outlook.com")),
        )
    )

    # 排序和分页
    stmt = select(User).order_by(User.created_at.desc()).offset(10).limit(10)

    # 聚合
    stmt = select(func.count(User.id))
    count = session.scalar(stmt)

    # 关联查询
    stmt = select(User, Post).join(Post, User.id == Post.user_id)
    results = session.execute(stmt).all()

17.3 连接池

from sqlalchemy import create_engine

# 配置连接池
engine = create_engine(
    "postgresql://user:password@localhost:5432/mydb",
    pool_size=10,           # 连接池大小
    max_overflow=20,        # 最大溢出连接
    pool_timeout=30,        # 获取连接超时
    pool_recycle=3600,      # 连接回收时间
    pool_pre_ping=True,     # 使用前检查连接
)

17.4 数据库迁移(Alembic)

# 安装
$ pip install alembic

# 初始化
$ alembic init alembic

# 创建迁移
$ alembic revision --autogenerate -m "add users table"

# 执行迁移
$ alembic upgrade head

# 回滚
$ alembic downgrade -1

# 查看历史
$ alembic history

alembic.ini 关键配置:

[alembic]
script_location = alembic
sqlalchemy.url = sqlite:///app.db

17.5 异步 ORM(SQLAlchemy 2.0+)

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
import asyncio

async_engine = create_async_engine("sqlite+aiosqlite:///app.db")
async_session = async_sessionmaker(async_engine, class_=AsyncSession)

async def main():
    async with async_session() as session:
        user = User(name="Alice", email="[email protected]")
        session.add(user)
        await session.commit()

        # 查询
        result = await session.execute(select(User).where(User.name == "Alice"))
        user = result.scalar_one()
        print(user)

asyncio.run(main())

17.6 数据库选型

数据库类型适用场景Python 驱动
SQLite嵌入式本地开发、小型应用sqlite3 (标准库)
PostgreSQL关系型生产环境首选psycopg2, asyncpg
MySQL关系型Web 应用pymysql, aiomysql
MongoDB文档型灵活 schemapymongo, motor
Redis键值型缓存、会话redis

17.7 注意事项

🔴 注意

  • 永远使用参数化查询,防止 SQL 注入
  • 使用连接池管理数据库连接
  • 生产环境使用 PostgreSQL 而非 SQLite
  • 数据库迁移脚本要纳入版本控制

💡 提示

  • 使用 SQLAlchemy 2.0+ 的新式声明模型
  • 使用 Alembic 管理数据库迁移
  • 使用 async_session 支持异步操作
  • echo=True 仅在开发时启用,生产环境关闭

📌 业务场景

from sqlalchemy import create_engine, select
from sqlalchemy.orm import Session

class UserRepository:
    def __init__(self, engine):
        self.engine = engine

    def get_by_id(self, user_id: int) -> User | None:
        with Session(self.engine) as session:
            return session.get(User, user_id)

    def get_by_email(self, email: str) -> User | None:
        with Session(self.engine) as session:
            return session.scalars(
                select(User).where(User.email == email)
            ).first()

    def create(self, name: str, email: str) -> User:
        with Session(self.engine) as session:
            user = User(name=name, email=email)
            session.add(user)
            session.commit()
            session.refresh(user)
            return user

17.8 扩展阅读