强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

PHP 完全指南 / 第 17 章 — PDO 数据库

第 17 章 — PDO:预处理语句、事务与连接池

17.1 PDO 概述

PDO(PHP Data Objects)是 PHP 的统一数据库抽象层,支持 MySQL、PostgreSQL、SQLite、SQL Server 等多种数据库。

特性PDOmysqli
数据库支持12 种仅 MySQL
命名参数
面向对象
预处理语句
事务支持
错误模式异常返回码

17.2 连接数据库

<?php
declare(strict_types=1);

// MySQL
$pdo = new PDO(
    dsn: 'mysql:host=localhost;port=3306;dbname=myapp;charset=utf8mb4',
    username: 'root',
    password: 'secret',
    options: [
        PDO::ATTR_ERRMODE            => PDO::ERRMODE_EXCEPTION,
        PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC,
        PDO::ATTR_EMULATE_PREPARES   => false,
        PDO::MYSQL_ATTR_INIT_COMMAND => "SET NAMES utf8mb4",
    ]
);

// PostgreSQL
$pdo = new PDO('pgsql:host=localhost;port=5432;dbname=myapp');

// SQLite
$pdo = new PDO('sqlite:' . __DIR__ . '/database.sqlite');

// 内存 SQLite(测试用)
$pdo = new PDO('sqlite::memory:');

连接工厂

<?php
class DatabaseFactory
{
    private static ?PDO $instance = null;

    public static function connect(array $config): PDO
    {
        if (self::$instance !== null) {
            return self::$instance;
        }

        $dsn = match ($config['driver']) {
            'mysql'      => sprintf('mysql:host=%s;port=%d;dbname=%s;charset=utf8mb4', $config['host'], $config['port'], $config['database']),
            'pgsql'      => sprintf('pgsql:host=%s;port=%d;dbname=%s', $config['host'], $config['port'], $config['database']),
            'sqlite'     => sprintf('sqlite:%s', $config['path']),
            default      => throw new InvalidArgumentException("Unsupported driver: {$config['driver']}"),
        };

        self::$instance = new PDO($dsn, $config['username'] ?? '', $config['password'] ?? '', [
            PDO::ATTR_ERRMODE            => PDO::ERRMODE_EXCEPTION,
            PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC,
            PDO::ATTR_EMULATE_PREPARES   => false,
        ]);

        return self::$instance;
    }
}

17.3 查询数据

<?php
// 简单查询
$stmt = $pdo->query('SELECT * FROM users LIMIT 10');
$users = $stmt->fetchAll();

// fetch — 获取一行
$user = $stmt->fetch();

// fetchColumn — 获取单列
$count = $pdo->query('SELECT COUNT(*) FROM users')->fetchColumn();

// fetchAll + fetch mode
$users = $pdo->query('SELECT * FROM users')->fetchAll(PDO::FETCH_OBJ);
// 每行作为 stdClass 对象

$users = $pdo->query('SELECT * FROM users')->fetchAll(PDO::FETCH_KEY_PAIR);
// 以第一列为键,第二列为值

$users = $pdo->query('SELECT id, name FROM users')->fetchAll(PDO::FETCH_UNIQUE);
// 以第一列作为唯一键

// 遍历结果(推荐,内存友好)
$stmt = $pdo->query('SELECT * FROM users');
foreach ($stmt as $row) {
    echo $row['name'];
}

17.4 预处理语句(Prepared Statements)

预处理语句是防止 SQL 注入的核心手段。

<?php
// 命名参数
$stmt = $pdo->prepare('SELECT * FROM users WHERE email = :email AND status = :status');
$stmt->execute(['email' => '[email protected]', 'status' => 'active']);
$user = $stmt->fetch();

// 位置参数
$stmt = $pdo->prepare('SELECT * FROM users WHERE id = ? AND deleted_at IS NULL');
$stmt->execute([42]);
$user = $stmt->fetch();

// 绑定参数类型
$stmt = $pdo->prepare('INSERT INTO users (name, age, email) VALUES (?, ?, ?)');
$stmt->bindValue(1, 'Alice', PDO::PARAM_STR);
$stmt->bindValue(2, 30, PDO::PARAM_INT);
$stmt->bindValue(3, '[email protected]', PDO::PARAM_STR);
$stmt->execute();

// bindParam — 引用绑定(用于流/LOB)
$stmt = $pdo->prepare('INSERT INTO documents (content) VALUES (?)');
$fp = fopen('/tmp/large-file.txt', 'r');
$stmt->bindParam(1, $fp, PDO::PARAM_LOB);
$stmt->execute();
fclose($fp);

17.5 插入、更新与删除

<?php
// 插入
$stmt = $pdo->prepare(
    'INSERT INTO users (name, email, age) VALUES (:name, :email, :age)'
);
$stmt->execute([
    'name'  => 'Alice',
    'email' => '[email protected]',
    'age'   => 30,
]);

// 获取最后插入的 ID
$lastId = $pdo->lastInsertId();

// 批量插入
$users = [
    ['Bob', '[email protected]', 25],
    ['Eve', '[email protected]', 28],
    ['Dave', '[email protected]', 35],
];

$stmt = $pdo->prepare('INSERT INTO users (name, email, age) VALUES (?, ?, ?)');
foreach ($users as $user) {
    $stmt->execute($user);
}

// 更新
$stmt = $pdo->prepare('UPDATE users SET name = :name WHERE id = :id');
$affected = $stmt->execute(['name' => 'Bob Updated', 'id' => 1]);
echo $stmt->rowCount();  // 受影响的行数

// 删除
$stmt = $pdo->prepare('DELETE FROM users WHERE id = :id');
$stmt->execute(['id' => 1]);

17.6 事务(Transactions)

<?php
declare(strict_types=1);

function transferMoney(PDO $pdo, int $fromId, int $toId, float $amount): bool
{
    $pdo->beginTransaction();

    try {
        // 扣款
        $stmt = $pdo->prepare(
            'UPDATE accounts SET balance = balance - :amount WHERE id = :id AND balance >= :amount'
        );
        $stmt->execute(['amount' => $amount, 'id' => $fromId]);

        if ($stmt->rowCount() === 0) {
            throw new RuntimeException('Insufficient balance');
        }

        // 入账
        $stmt = $pdo->prepare(
            'UPDATE accounts SET balance = balance + :amount WHERE id = :id'
        );
        $stmt->execute(['amount' => $amount, 'id' => $toId]);

        // 记录流水
        $stmt = $pdo->prepare(
            'INSERT INTO transactions (from_id, to_id, amount, created_at)
             VALUES (:from, :to, :amount, NOW())'
        );
        $stmt->execute([
            'from'   => $fromId,
            'to'     => $toId,
            'amount' => $amount,
        ]);

        $pdo->commit();
        return true;

    } catch (\Throwable $e) {
        $pdo->rollBack();
        error_log("Transfer failed: {$e->getMessage()}");
        return false;
    }
}

嵌套事务(Savepoints)

<?php
function complexOperation(PDO $pdo): void
{
    $pdo->beginTransaction();

    try {
        $pdo->exec('INSERT INTO orders (status) VALUES ("pending")');

        // 保存点
        $pdo->exec('SAVEPOINT before_payment');

        try {
            $pdo->exec('INSERT INTO payments (status) VALUES ("processing")');
            // 支付成功
        } catch (\Throwable $e) {
            // 回滚到保存点
            $pdo->exec('ROLLBACK TO SAVEPOINT before_payment');
            // 继续处理
        }

        $pdo->exec('UPDATE orders SET status = "completed"');
        $pdo->commit();
    } catch (\Throwable $e) {
        $pdo->rollBack();
        throw $e;
    }
}

17.7 连接池

PHP-FPM 的进程模型不支持持久数据库连接的连接池。但可以使用以下方案:

17.7.1 持久连接

<?php
$pdo = new PDO(
    'mysql:host=localhost;dbname=myapp',
    'root',
    'secret',
    [
        PDO::ATTR_PERSISTENT => true,  // 持久连接
    ]
);

17.7.2 Swoole 连接池

<?php
use Swoole\Coroutine\Channel;

class ConnectionPool
{
    private Channel $pool;

    public function __construct(
        private readonly string $dsn,
        private readonly string $user,
        private readonly string $pass,
        private readonly int $maxSize = 10,
    ) {
        $this->pool = new Channel($maxSize);
    }

    public function get(): PDO
    {
        if ($this->pool->isEmpty()) {
            return new PDO($this->dsn, $this->user, $this->pass);
        }
        return $this->pool->pop(0.01);
    }

    public function put(PDO $connection): void
    {
        if ($this->pool->length() < $this->maxSize) {
            $this->pool->push($connection);
        }
    }
}

17.8 数据库迁移

<?php
declare(strict_types=1);

class Migration
{
    private PDO $pdo;
    private string $migrationDir;

    public function __construct(PDO $pdo, string $migrationDir)
    {
        $this->pdo = $pdo;
        $this->migrationDir = $migrationDir;

        $this->pdo->exec('
            CREATE TABLE IF NOT EXISTS migrations (
                id INT AUTO_INCREMENT PRIMARY KEY,
                name VARCHAR(255) NOT NULL,
                executed_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
            )
        ');
    }

    public function migrate(): void
    {
        $applied = $this->getApplied();
        $files = glob($this->migrationDir . '/*.sql');
        sort($files);

        foreach ($files as $file) {
            $name = basename($file);
            if (in_array($name, $applied)) continue;

            $sql = file_get_contents($file);
            $this->pdo->beginTransaction();
            try {
                $this->pdo->exec($sql);
                $this->pdo->prepare('INSERT INTO migrations (name) VALUES (?)')
                    ->execute([$name]);
                $this->pdo->commit();
                echo "Applied: {$name}\n";
            } catch (\Throwable $e) {
                $this->pdo->rollBack();
                echo "Failed: {$name}: {$e->getMessage()}\n";
                throw $e;
            }
        }
    }

    private function getApplied(): array
    {
        return $this->pdo->query('SELECT name FROM migrations')
            ->fetchAll(PDO::FETCH_COLUMN);
    }
}

17.9 业务场景:分页查询

<?php
function paginate(PDO $pdo, string $table, int $page = 1, int $perPage = 15): array
{
    $offset = ($page - 1) * $perPage;

    $total = $pdo->query("SELECT COUNT(*) FROM {$table}")->fetchColumn();

    $stmt = $pdo->prepare("SELECT * FROM {$table} ORDER BY id DESC LIMIT :limit OFFSET :offset");
    $stmt->bindValue(':limit', $perPage, PDO::PARAM_INT);
    $stmt->bindValue(':offset', $offset, PDO::PARAM_INT);
    $stmt->execute();

    return [
        'data'         => $stmt->fetchAll(),
        'current_page' => $page,
        'per_page'     => $perPage,
        'total'        => (int)$total,
        'last_page'    => (int)ceil($total / $perPage),
    ];
}

17.10 扩展阅读


上一章第 16 章 — 错误处理 下一章第 18 章 — 文件系统