PHP 完全指南 / 第 17 章 — PDO 数据库
第 17 章 — PDO:预处理语句、事务与连接池
17.1 PDO 概述
PDO(PHP Data Objects)是 PHP 的统一数据库抽象层,支持 MySQL、PostgreSQL、SQLite、SQL Server 等多种数据库。
| 特性 | PDO | mysqli |
|---|---|---|
| 数据库支持 | 12 种 | 仅 MySQL |
| 命名参数 | ✅ | ❌ |
| 面向对象 | ✅ | ✅ |
| 预处理语句 | ✅ | ✅ |
| 事务支持 | ✅ | ✅ |
| 错误模式 | 异常 | 返回码 |
17.2 连接数据库
<?php
declare(strict_types=1);
// MySQL
$pdo = new PDO(
dsn: 'mysql:host=localhost;port=3306;dbname=myapp;charset=utf8mb4',
username: 'root',
password: 'secret',
options: [
PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC,
PDO::ATTR_EMULATE_PREPARES => false,
PDO::MYSQL_ATTR_INIT_COMMAND => "SET NAMES utf8mb4",
]
);
// PostgreSQL
$pdo = new PDO('pgsql:host=localhost;port=5432;dbname=myapp');
// SQLite
$pdo = new PDO('sqlite:' . __DIR__ . '/database.sqlite');
// 内存 SQLite(测试用)
$pdo = new PDO('sqlite::memory:');
连接工厂
<?php
class DatabaseFactory
{
private static ?PDO $instance = null;
public static function connect(array $config): PDO
{
if (self::$instance !== null) {
return self::$instance;
}
$dsn = match ($config['driver']) {
'mysql' => sprintf('mysql:host=%s;port=%d;dbname=%s;charset=utf8mb4', $config['host'], $config['port'], $config['database']),
'pgsql' => sprintf('pgsql:host=%s;port=%d;dbname=%s', $config['host'], $config['port'], $config['database']),
'sqlite' => sprintf('sqlite:%s', $config['path']),
default => throw new InvalidArgumentException("Unsupported driver: {$config['driver']}"),
};
self::$instance = new PDO($dsn, $config['username'] ?? '', $config['password'] ?? '', [
PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC,
PDO::ATTR_EMULATE_PREPARES => false,
]);
return self::$instance;
}
}
17.3 查询数据
<?php
// 简单查询
$stmt = $pdo->query('SELECT * FROM users LIMIT 10');
$users = $stmt->fetchAll();
// fetch — 获取一行
$user = $stmt->fetch();
// fetchColumn — 获取单列
$count = $pdo->query('SELECT COUNT(*) FROM users')->fetchColumn();
// fetchAll + fetch mode
$users = $pdo->query('SELECT * FROM users')->fetchAll(PDO::FETCH_OBJ);
// 每行作为 stdClass 对象
$users = $pdo->query('SELECT * FROM users')->fetchAll(PDO::FETCH_KEY_PAIR);
// 以第一列为键,第二列为值
$users = $pdo->query('SELECT id, name FROM users')->fetchAll(PDO::FETCH_UNIQUE);
// 以第一列作为唯一键
// 遍历结果(推荐,内存友好)
$stmt = $pdo->query('SELECT * FROM users');
foreach ($stmt as $row) {
echo $row['name'];
}
17.4 预处理语句(Prepared Statements)
预处理语句是防止 SQL 注入的核心手段。
<?php
// 命名参数
$stmt = $pdo->prepare('SELECT * FROM users WHERE email = :email AND status = :status');
$stmt->execute(['email' => '[email protected]', 'status' => 'active']);
$user = $stmt->fetch();
// 位置参数
$stmt = $pdo->prepare('SELECT * FROM users WHERE id = ? AND deleted_at IS NULL');
$stmt->execute([42]);
$user = $stmt->fetch();
// 绑定参数类型
$stmt = $pdo->prepare('INSERT INTO users (name, age, email) VALUES (?, ?, ?)');
$stmt->bindValue(1, 'Alice', PDO::PARAM_STR);
$stmt->bindValue(2, 30, PDO::PARAM_INT);
$stmt->bindValue(3, '[email protected]', PDO::PARAM_STR);
$stmt->execute();
// bindParam — 引用绑定(用于流/LOB)
$stmt = $pdo->prepare('INSERT INTO documents (content) VALUES (?)');
$fp = fopen('/tmp/large-file.txt', 'r');
$stmt->bindParam(1, $fp, PDO::PARAM_LOB);
$stmt->execute();
fclose($fp);
17.5 插入、更新与删除
<?php
// 插入
$stmt = $pdo->prepare(
'INSERT INTO users (name, email, age) VALUES (:name, :email, :age)'
);
$stmt->execute([
'name' => 'Alice',
'email' => '[email protected]',
'age' => 30,
]);
// 获取最后插入的 ID
$lastId = $pdo->lastInsertId();
// 批量插入
$users = [
['Bob', '[email protected]', 25],
['Eve', '[email protected]', 28],
['Dave', '[email protected]', 35],
];
$stmt = $pdo->prepare('INSERT INTO users (name, email, age) VALUES (?, ?, ?)');
foreach ($users as $user) {
$stmt->execute($user);
}
// 更新
$stmt = $pdo->prepare('UPDATE users SET name = :name WHERE id = :id');
$affected = $stmt->execute(['name' => 'Bob Updated', 'id' => 1]);
echo $stmt->rowCount(); // 受影响的行数
// 删除
$stmt = $pdo->prepare('DELETE FROM users WHERE id = :id');
$stmt->execute(['id' => 1]);
17.6 事务(Transactions)
<?php
declare(strict_types=1);
function transferMoney(PDO $pdo, int $fromId, int $toId, float $amount): bool
{
$pdo->beginTransaction();
try {
// 扣款
$stmt = $pdo->prepare(
'UPDATE accounts SET balance = balance - :amount WHERE id = :id AND balance >= :amount'
);
$stmt->execute(['amount' => $amount, 'id' => $fromId]);
if ($stmt->rowCount() === 0) {
throw new RuntimeException('Insufficient balance');
}
// 入账
$stmt = $pdo->prepare(
'UPDATE accounts SET balance = balance + :amount WHERE id = :id'
);
$stmt->execute(['amount' => $amount, 'id' => $toId]);
// 记录流水
$stmt = $pdo->prepare(
'INSERT INTO transactions (from_id, to_id, amount, created_at)
VALUES (:from, :to, :amount, NOW())'
);
$stmt->execute([
'from' => $fromId,
'to' => $toId,
'amount' => $amount,
]);
$pdo->commit();
return true;
} catch (\Throwable $e) {
$pdo->rollBack();
error_log("Transfer failed: {$e->getMessage()}");
return false;
}
}
嵌套事务(Savepoints)
<?php
function complexOperation(PDO $pdo): void
{
$pdo->beginTransaction();
try {
$pdo->exec('INSERT INTO orders (status) VALUES ("pending")');
// 保存点
$pdo->exec('SAVEPOINT before_payment');
try {
$pdo->exec('INSERT INTO payments (status) VALUES ("processing")');
// 支付成功
} catch (\Throwable $e) {
// 回滚到保存点
$pdo->exec('ROLLBACK TO SAVEPOINT before_payment');
// 继续处理
}
$pdo->exec('UPDATE orders SET status = "completed"');
$pdo->commit();
} catch (\Throwable $e) {
$pdo->rollBack();
throw $e;
}
}
17.7 连接池
PHP-FPM 的进程模型不支持持久数据库连接的连接池。但可以使用以下方案:
17.7.1 持久连接
<?php
$pdo = new PDO(
'mysql:host=localhost;dbname=myapp',
'root',
'secret',
[
PDO::ATTR_PERSISTENT => true, // 持久连接
]
);
17.7.2 Swoole 连接池
<?php
use Swoole\Coroutine\Channel;
class ConnectionPool
{
private Channel $pool;
public function __construct(
private readonly string $dsn,
private readonly string $user,
private readonly string $pass,
private readonly int $maxSize = 10,
) {
$this->pool = new Channel($maxSize);
}
public function get(): PDO
{
if ($this->pool->isEmpty()) {
return new PDO($this->dsn, $this->user, $this->pass);
}
return $this->pool->pop(0.01);
}
public function put(PDO $connection): void
{
if ($this->pool->length() < $this->maxSize) {
$this->pool->push($connection);
}
}
}
17.8 数据库迁移
<?php
declare(strict_types=1);
class Migration
{
private PDO $pdo;
private string $migrationDir;
public function __construct(PDO $pdo, string $migrationDir)
{
$this->pdo = $pdo;
$this->migrationDir = $migrationDir;
$this->pdo->exec('
CREATE TABLE IF NOT EXISTS migrations (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
executed_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
)
');
}
public function migrate(): void
{
$applied = $this->getApplied();
$files = glob($this->migrationDir . '/*.sql');
sort($files);
foreach ($files as $file) {
$name = basename($file);
if (in_array($name, $applied)) continue;
$sql = file_get_contents($file);
$this->pdo->beginTransaction();
try {
$this->pdo->exec($sql);
$this->pdo->prepare('INSERT INTO migrations (name) VALUES (?)')
->execute([$name]);
$this->pdo->commit();
echo "Applied: {$name}\n";
} catch (\Throwable $e) {
$this->pdo->rollBack();
echo "Failed: {$name}: {$e->getMessage()}\n";
throw $e;
}
}
}
private function getApplied(): array
{
return $this->pdo->query('SELECT name FROM migrations')
->fetchAll(PDO::FETCH_COLUMN);
}
}
17.9 业务场景:分页查询
<?php
function paginate(PDO $pdo, string $table, int $page = 1, int $perPage = 15): array
{
$offset = ($page - 1) * $perPage;
$total = $pdo->query("SELECT COUNT(*) FROM {$table}")->fetchColumn();
$stmt = $pdo->prepare("SELECT * FROM {$table} ORDER BY id DESC LIMIT :limit OFFSET :offset");
$stmt->bindValue(':limit', $perPage, PDO::PARAM_INT);
$stmt->bindValue(':offset', $offset, PDO::PARAM_INT);
$stmt->execute();
return [
'data' => $stmt->fetchAll(),
'current_page' => $page,
'per_page' => $perPage,
'total' => (int)$total,
'last_page' => (int)ceil($total / $perPage),
];
}
17.10 扩展阅读
上一章:第 16 章 — 错误处理 下一章:第 18 章 — 文件系统