dqlite 分布式 SQLite 教程 / 第 4 章:基本操作
第 4 章:基本操作
本章介绍 dqlite 的基本操作,包括创建数据库、执行 SQL 语句、连接管理和错误处理。所有示例同时提供 C 和 Go 两种语言版本。
4.1 操作概览
dqlite 的基本操作遵循以下流程:
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ 创建节点 │────▶│ 启动节点 │────▶│ 建立连接 │────▶│ 执行 SQL │
└──────────┘ └──────────┘ └──────────┘ └──────────┘
│
┌─────┴─────┐
▼ ▼
┌──────┐ ┌──────┐
│ DDL │ │ DML │
│(建表) │ │(增删改│
└──────┘ │ 查) │
└──────┘
| 阶段 | C API | Go API |
|---|---|---|
| 创建节点 | dqlite_node_create() | dqlite.New() |
| 启动节点 | dqlite_node_start() | node.Open() |
| 建立连接 | dqlite_client_connect() | sql.Open("dqlite", ...) |
| 执行 SQL | sqlite3_exec() via client | db.Exec() / db.Query() |
4.2 创建并启动节点
4.2.1 C 语言版本
/* basic_node.c - 创建并启动 dqlite 节点 */
#include <dqlite.h>
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <unistd.h>
static volatile int running = 1;
void handle_signal(int sig) {
(void)sig;
running = 0;
}
int main(int argc, char *argv[]) {
dqlite_node *node;
int rc;
/* 解析参数 */
uint64_t id = 1;
const char *address = "127.0.0.1:9001";
const char *data_dir = "/tmp/dqlite-basic-data";
if (argc >= 3) {
id = (uint64_t)atoi(argv[1]);
address = argv[2];
}
if (argc >= 4) {
data_dir = argv[3];
}
/* 创建节点 */
rc = dqlite_node_create(id, data_dir, address, &node);
if (rc != 0) {
fprintf(stderr, "Error: Failed to create node (rc=%d)\n", rc);
return EXIT_FAILURE;
}
/* 启动节点 */
rc = dqlite_node_start(node);
if (rc != 0) {
fprintf(stderr, "Error: Failed to start node: %s\n",
dqlite_node_errmsg(node));
dqlite_node_destroy(node);
return EXIT_FAILURE;
}
printf("dqlite node %lu started on %s (data: %s)\n",
id, address, data_dir);
/* 注册信号处理 */
signal(SIGINT, handle_signal);
signal(SIGTERM, handle_signal);
/* 主循环 */
while (running) {
sleep(1);
}
/* 清理 */
printf("\nShutting down node...\n");
dqlite_node_stop(node);
dqlite_node_destroy(node);
printf("Node stopped.\n");
return EXIT_SUCCESS;
}
编译和运行:
gcc -Wall -o basic_node basic_node.c $(pkg-config --cflags --libs dqlite) -lpthread
./basic_node 1 "127.0.0.1:9001" /tmp/dqlite-node1
4.2.2 Go 语言版本
// basic_node.go - 创建并启动 dqlite 节点
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
dqlite "github.com/canonical/go-dqlite/v2"
)
func main() {
id := uint64(1)
address := "127.0.0.1:9001"
dir := "/tmp/dqlite-go-data"
if len(os.Args) >= 3 {
fmt.Sscanf(os.Args[1], "%d", &id)
address = os.Args[2]
}
if len(os.Args) >= 4 {
dir = os.Args[3]
}
// 确保数据目录存在
if err := os.MkdirAll(dir, 0755); err != nil {
log.Fatalf("Failed to create data dir: %v", err)
}
// 创建节点
node, err := dqlite.New(id, address, dir, nil)
if err != nil {
log.Fatalf("Failed to create node: %v", err)
}
defer node.Close()
fmt.Printf("dqlite node %d started on %s (data: %s)\n", id, address, dir)
// 等待信号
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
fmt.Println("\nShutting down node...")
}
4.3 创建数据库
在 dqlite 中,数据库不需要显式创建。当你第一次 Open 一个数据库名时,它会自动创建。
4.3.1 数据库命名
| 规则 | 说明 |
|---|---|
| 名称长度 | 建议 < 255 字符 |
| 后缀 | 无强制要求,建议使用 .db |
| 路径 | 不支持路径分隔符,数据库在节点数据目录内 |
| 并发创建 | 同名数据库只有一个,多次 Open 返回同一实例 |
4.3.2 Go 版本:使用 database/sql
package main
import (
"context"
"database/sql"
"fmt"
"log"
"os"
"time"
dqlite "github.com/canonical/go-dqlite/v2"
"github.com/canonical/go-dqlite/v2/driver"
)
func main() {
dir, _ := os.MkdirTemp("", "dqlite-basic")
defer os.RemoveAll(dir)
// 启动节点
node, err := dqlite.New(1, "127.0.0.1:9001", dir, nil)
if err != nil {
log.Fatal(err)
}
defer node.Close()
// 创建驱动
store := driver.NewInmemNodeStore()
store.Set(context.Background(), []driver.NodeInfo{
{ID: 1, Address: "127.0.0.1:9001"},
})
drv, err := driver.New(store)
if err != nil {
log.Fatal(err)
}
defer drv.Close()
db := sql.OpenDB(drv)
defer db.Close()
// 等待连接
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := db.PingContext(ctx); err != nil {
log.Fatal(err)
}
fmt.Println("Database 'app.db' created (auto-created on first access)")
_ = db // 使用 db 进行后续操作
}
4.3.3 C 版本:通过客户端连接
/* 使用 dqlite 客户端 API 连接数据库 */
#include <dqlite.h>
#include <stdio.h>
/* 注意:完整的客户端 API 使用需要包含 client.h,
* 这里展示概念。实际使用参见第 5 章 */
/*
* dqlite 的数据库创建是隐式的:
* 1. 启动节点
* 2. 通过客户端协议连接
* 3. 发送 Open 请求指定数据库名
* 4. 如果数据库不存在则自动创建
*/
int main() {
printf("dqlite databases are created implicitly on first Open.\n");
printf("See Chapter 5 for complete C API client examples.\n");
return 0;
}
4.4 SQL 操作
4.4.1 创建表(DDL)
// 创建表
_, err := db.Exec(`
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
email TEXT NOT NULL,
age INTEGER DEFAULT 0,
created_at TEXT DEFAULT (datetime('now'))
)
`)
if err != nil {
log.Fatalf("Create table failed: %v", err)
}
// 创建索引
_, err = db.Exec(`
CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)
`)
if err != nil {
log.Fatalf("Create index failed: %v", err)
}
// 创建多个表
schema := `
CREATE TABLE IF NOT EXISTS posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL REFERENCES users(id),
title TEXT NOT NULL,
content TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_posts_user ON posts(user_id);
`
_, err = db.Exec(schema)
if err != nil {
log.Fatalf("Create posts table failed: %v", err)
}
4.4.2 插入数据
// 单条插入(参数绑定)
result, err := db.Exec(
"INSERT INTO users (username, email, age) VALUES (?, ?, ?)",
"zhangsan", "[email protected]", 28,
)
if err != nil {
log.Fatalf("Insert failed: %v", err)
}
id, _ := result.LastInsertId()
rows, _ := result.RowsAffected()
fmt.Printf("Inserted: id=%d, affected=%d\n", id, rows)
// 批量插入(事务内)
tx, err := db.Begin()
if err != nil {
log.Fatal(err)
}
defer tx.Rollback() // 如果 Commit 成功,Rollback 是空操作
users := []struct {
Username string
Email string
Age int
}{
{"lisi", "[email protected]", 25},
{"wangwu", "[email protected]", 32},
{"zhaoliu", "[email protected]", 19},
}
stmt, err := tx.Prepare("INSERT INTO users (username, email, age) VALUES (?, ?, ?)")
if err != nil {
log.Fatal(err)
}
defer stmt.Close()
for _, u := range users {
_, err := stmt.Exec(u.Username, u.Email, u.Age)
if err != nil {
log.Fatalf("Batch insert failed: %v", err)
}
}
if err := tx.Commit(); err != nil {
log.Fatalf("Commit failed: %v", err)
}
fmt.Println("Batch insert committed successfully")
4.4.3 查询数据
// 查询所有用户
rows, err := db.Query("SELECT id, username, email, age FROM users ORDER BY id")
if err != nil {
log.Fatal(err)
}
defer rows.Close()
fmt.Println("=== All Users ===")
fmt.Printf("%-5s %-12s %-25s %-5s\n", "ID", "Username", "Email", "Age")
fmt.Println(strings.Repeat("-", 50))
for rows.Next() {
var id, age int
var username, email string
if err := rows.Scan(&id, &username, &email, &age); err != nil {
log.Fatal(err)
}
fmt.Printf("%-5d %-12s %-25s %-5d\n", id, username, email, age)
}
if err := rows.Err(); err != nil {
log.Fatal(err)
}
// 单行查询
var name string
err = db.QueryRow("SELECT username FROM users WHERE id = ?", 1).Scan(&name)
switch {
case err == sql.ErrNoRows:
fmt.Println("User not found")
case err != nil:
log.Fatal(err)
default:
fmt.Printf("User 1: %s\n", name)
}
// 聚合查询
var count int
err = db.QueryRow("SELECT COUNT(*) FROM users WHERE age > ?", 20).Scan(&count)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Users with age > 20: %d\n", count)
4.4.4 更新和删除
// 更新
result, err := db.Exec(
"UPDATE users SET email = ? WHERE username = ?",
"[email protected]", "zhangsan",
)
if err != nil {
log.Fatalf("Update failed: %v", err)
}
affected, _ := result.RowsAffected()
fmt.Printf("Updated %d row(s)\n", affected)
// 删除
result, err = db.Exec("DELETE FROM users WHERE age < ?", 20)
if err != nil {
log.Fatalf("Delete failed: %v", err)
}
affected, _ = result.RowsAffected()
fmt.Printf("Deleted %d row(s)\n", affected)
4.5 连接管理
4.5.1 Go 连接池配置
import "database/sql"
db := sql.OpenDB(drv)
// 设置连接池参数
db.SetMaxOpenConns(10) // 最大打开连接数
db.SetMaxIdleConns(5) // 最大空闲连接数
db.SetConnMaxLifetime(0) // 连接最大生存时间(0 = 不限)
db.SetConnMaxIdleTime(5 * time.Minute) // 空闲连接最大存活时间
// 验证连接
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
if err := db.PingContext(ctx); err != nil {
log.Fatalf("Connection failed: %v", err)
}
4.5.2 连接参数建议
| 场景 | MaxOpenConns | MaxIdleConns | 说明 |
|---|---|---|---|
| 低并发嵌入式 | 1-3 | 1 | 最小资源占用 |
| 中等并发 | 5-10 | 3-5 | 一般应用 |
| 高并发 | 20-50 | 10-20 | 需注意 SQLite 并发限制 |
注意: SQLite 本身是单写入者模型。即使配置了大量连接,写操作仍然会串行执行。过多的连接反而会增加竞争和上下文切换开销。
4.5.3 连接错误处理
import (
"database/sql"
"errors"
"strings"
)
func isRetryable(err error) bool {
if err == nil {
return false
}
// SQLite BUSY 错误通常是可重试的
if strings.Contains(err.Error(), "database is locked") {
return true
}
if strings.Contains(err.Error(), "SQLITE_BUSY") {
return true
}
return false
}
func execWithRetry(db *sql.DB, query string, args ...interface{}) (sql.Result, error) {
var result sql.Result
var err error
for i := 0; i < 3; i++ {
result, err = db.Exec(query, args...)
if err == nil {
return result, nil
}
if !isRetryable(err) {
return nil, err
}
time.Sleep(time.Duration(10*(1<<i)) * time.Millisecond) // 指数退避
}
return nil, fmt.Errorf("failed after 3 retries: %w", err)
}
4.5.4 连接关闭
// 应用退出时正确关闭数据库连接
func main() {
db, err := sql.OpenDB(drv)
if err != nil {
log.Fatal(err)
}
defer func() {
if err := db.Close(); err != nil {
log.Printf("Error closing database: %v", err)
}
}()
// ... 业务逻辑
}
4.6 事务管理
4.6.1 基本事务
// 基本事务
tx, err := db.Begin()
if err != nil {
log.Fatal(err)
}
_, err = tx.Exec("UPDATE accounts SET balance = balance - ? WHERE id = ?", 100, 1)
if err != nil {
tx.Rollback()
log.Fatal(err)
}
_, err = tx.Exec("UPDATE accounts SET balance = balance + ? WHERE id = ?", 100, 2)
if err != nil {
tx.Rollback()
log.Fatal(err)
}
if err := tx.Commit(); err != nil {
log.Fatal(err)
}
fmt.Println("Transfer completed")
4.6.2 使用 defer 确保回滚
func transferMoney(db *sql.DB, fromID, toID int, amount float64) error {
tx, err := db.Begin()
if err != nil {
return err
}
defer tx.Rollback() // Commit 成功后 Rollback 是空操作
// 检查余额
var balance float64
err = tx.QueryRow("SELECT balance FROM accounts WHERE id = ?", fromID).Scan(&balance)
if err != nil {
return fmt.Errorf("query balance: %w", err)
}
if balance < amount {
return fmt.Errorf("insufficient balance: %.2f < %.2f", balance, amount)
}
// 扣减
_, err = tx.Exec("UPDATE accounts SET balance = balance - ? WHERE id = ?", amount, fromID)
if err != nil {
return fmt.Errorf("debit: %w", err)
}
// 增加
_, err = tx.Exec("UPDATE accounts SET balance = balance + ? WHERE id = ?", amount, toID)
if err != nil {
return fmt.Errorf("credit: %w", err)
}
return tx.Commit()
}
4.6.3 只读事务
// 使用只读事务获取一致性快照
tx, err := db.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
if err != nil {
log.Fatal(err)
}
defer tx.Rollback()
// 在同一个事务中多次读取保证一致性
var totalAccounts int
var totalBalance float64
err = tx.QueryRow("SELECT COUNT(*) FROM accounts").Scan(&totalAccounts)
if err != nil {
log.Fatal(err)
}
err = tx.QueryRow("SELECT COALESCE(SUM(balance), 0) FROM accounts").Scan(&totalBalance)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Accounts: %d, Total balance: %.2f\n", totalAccounts, totalBalance)
4.7 错误处理
4.7.1 常见错误码
| SQLite 错误码 | 含义 | 处理方式 |
|---|---|---|
SQLITE_BUSY (5) | 数据库被锁定 | 等待并重试 |
SQLITE_CONSTRAINT (19) | 约束违反(UNIQUE、FOREIGN KEY) | 检查数据 |
SQLITE_NOTFOUND (12) | 未找到记录 | 检查查询条件 |
SQLITE_CORRUPT (11) | 数据库文件损坏 | 从备份恢复 |
SQLITE_FULL (13) | 磁盘空间不足 | 清理磁盘空间 |
SQLITE_MISUSE (21) | API 使用不当 | 检查代码逻辑 |
4.7.2 Go 错误处理模式
import (
"database/sql"
"errors"
"fmt"
)
type DqliteError struct {
Code int
Message string
}
func (e *DqliteError) Error() string {
return fmt.Sprintf("dqlite error %d: %s", e.Code, e.Message)
}
func handleDBError(err error) error {
if err == nil {
return nil
}
// sql.ErrNoRows
if errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("record not found: %w", err)
}
// 约束违反
if strings.Contains(err.Error(), "UNIQUE constraint failed") {
return fmt.Errorf("duplicate record: %w", err)
}
if strings.Contains(err.Error(), "FOREIGN KEY constraint failed") {
return fmt.Errorf("referenced record not found: %w", err)
}
// 数据库锁定
if strings.Contains(err.Error(), "database is locked") {
return fmt.Errorf("database busy, retry later: %w", err)
}
return err
}
4.7.3 C 错误处理模式
#include <dqlite.h>
#include <stdio.h>
#include <stdlib.h>
#define CHECK_RC(node, rc, msg) \
do { \
if ((rc) != 0) { \
fprintf(stderr, "Error [%s]: %s (rc=%d)\n", \
(msg), dqlite_node_errmsg(node), (rc)); \
goto cleanup; \
} \
} while (0)
int main() {
dqlite_node *node = NULL;
int rc;
rc = dqlite_node_create(1, "/tmp/dqlite-err-test", "127.0.0.1:9001", &node);
CHECK_RC(node, rc, "create node");
rc = dqlite_node_start(node);
CHECK_RC(node, rc, "start node");
printf("Node started successfully\n");
/* 业务逻辑 ... */
cleanup:
if (node != NULL) {
dqlite_node_stop(node);
dqlite_node_destroy(node);
}
return rc;
}
4.8 实战示例:用户管理系统
以下是一个完整的用户管理 CRUD 示例:
package main
import (
"context"
"database/sql"
"fmt"
"log"
"os"
"time"
dqlite "github.com/canonical/go-dqlite/v2"
"github.com/canonical/go-dqlite/v2/driver"
)
// User 数据模型
type User struct {
ID int64
Username string
Email string
Age int
CreatedAt string
}
// UserStore 用户存储
type UserStore struct {
db *sql.DB
}
func NewUserStore(db *sql.DB) (*UserStore, error) {
// 初始化表
_, err := db.Exec(`
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
email TEXT NOT NULL,
age INTEGER DEFAULT 0,
created_at TEXT DEFAULT (datetime('now'))
)
`)
if err != nil {
return nil, err
}
return &UserStore{db: db}, nil
}
func (s *UserStore) Create(user *User) error {
result, err := s.db.Exec(
"INSERT INTO users (username, email, age) VALUES (?, ?, ?)",
user.Username, user.Email, user.Age,
)
if err != nil {
return err
}
user.ID, _ = result.LastInsertId()
return nil
}
func (s *UserStore) GetByID(id int64) (*User, error) {
u := &User{}
err := s.db.QueryRow(
"SELECT id, username, email, age, created_at FROM users WHERE id = ?", id,
).Scan(&u.ID, &u.Username, &u.Email, &u.Age, &u.CreatedAt)
if err != nil {
return nil, err
}
return u, nil
}
func (s *UserStore) List() ([]User, error) {
rows, err := s.db.Query("SELECT id, username, email, age, created_at FROM users ORDER BY id")
if err != nil {
return nil, err
}
defer rows.Close()
var users []User
for rows.Next() {
var u User
if err := rows.Scan(&u.ID, &u.Username, &u.Email, &u.Age, &u.CreatedAt); err != nil {
return nil, err
}
users = append(users, u)
}
return users, rows.Err()
}
func (s *UserStore) UpdateEmail(id int64, email string) error {
result, err := s.db.Exec("UPDATE users SET email = ? WHERE id = ?", email, id)
if err != nil {
return err
}
affected, _ := result.RowsAffected()
if affected == 0 {
return sql.ErrNoRows
}
return nil
}
func (s *UserStore) Delete(id int64) error {
result, err := s.db.Exec("DELETE FROM users WHERE id = ?", id)
if err != nil {
return err
}
affected, _ := result.RowsAffected()
if affected == 0 {
return sql.ErrNoRows
}
return nil
}
func main() {
dir, _ := os.MkdirTemp("", "dqlite-crud")
defer os.RemoveAll(dir)
// 启动节点
node, err := dqlite.New(1, "127.0.0.1:9001", dir, nil)
if err != nil {
log.Fatal(err)
}
defer node.Close()
// 创建驱动
store := driver.NewInmemNodeStore()
store.Set(context.Background(), []driver.NodeInfo{
{ID: 1, Address: "127.0.0.1:9001"},
})
drv, err := driver.New(store)
if err != nil {
log.Fatal(err)
}
defer drv.Close()
db := sql.OpenDB(drv)
defer db.Close()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := db.PingContext(ctx); err != nil {
log.Fatal(err)
}
// 初始化存储
store2, err := NewUserStore(db)
if err != nil {
log.Fatal(err)
}
// Create
user := &User{Username: "zhangsan", Email: "[email protected]", Age: 28}
if err := store2.Create(user); err != nil {
log.Fatalf("Create failed: %v", err)
}
fmt.Printf("Created: %+v\n", user)
// Read
fetched, err := store2.GetByID(user.ID)
if err != nil {
log.Fatalf("Get failed: %v", err)
}
fmt.Printf("Fetched: %+v\n", fetched)
// Update
if err := store2.UpdateEmail(user.ID, "[email protected]"); err != nil {
log.Fatalf("Update failed: %v", err)
}
fmt.Println("Email updated")
// List
users, err := store2.List()
if err != nil {
log.Fatalf("List failed: %v", err)
}
fmt.Printf("All users: %d\n", len(users))
for _, u := range users {
fmt.Printf(" [%d] %s <%s> age=%d\n", u.ID, u.Username, u.Email, u.Age)
}
// Delete
if err := store2.Delete(user.ID); err != nil {
log.Fatalf("Delete failed: %v", err)
}
fmt.Println("User deleted")
}
4.9 SQL 兼容性
dqlite 使用 SQLite 作为底层存储,因此继承了 SQLite 的 SQL 语法。
4.9.1 支持的 SQL 特性
| 特性 | 支持情况 | 说明 |
|---|---|---|
| CREATE TABLE | ✅ 完全支持 | 含约束、默认值 |
| ALTER TABLE | ✅ 支持 | RENAME、ADD COLUMN |
| CREATE INDEX | ✅ 支持 | B-tree 索引 |
| INSERT / UPDATE / DELETE | ✅ 完全支持 | 含参数绑定 |
| SELECT | ✅ 完全支持 | JOIN、子查询、CTE |
| 事务 (BEGIN/COMMIT/ROLLBACK) | ✅ 支持 | 通过 Raft 复制 |
| PRAGMA | ⚠️ 部分支持 | 有些 PRAGMA 被 dqlite 拦截 |
| ATTACH DATABASE | ❌ 不支持 | dqlite 管理数据库生命周期 |
| EXPLAIN | ✅ 支持 | 查询计划分析 |
4.9.2 不支持或受限的特性
| 特性 | 说明 |
|---|---|
ATTACH DATABASE | 不能在 dqlite 中附加其他数据库 |
VACUUM | 建议在快照时自动处理 |
| 自定义函数 | 需要在编译时注册 |
| 扩展(FTS、R-Tree等) | 取决于编译选项 |
本章小结
| 要点 | 说明 |
|---|---|
| 数据库创建 | 隐式创建,首次 Open 时自动生成 |
| SQL 操作 | 标准 SQLite SQL 语法,参数绑定推荐用 ? 占位符 |
| 事务管理 | 使用 defer tx.Rollback() 模式确保安全 |
| 连接池 | Go 的 database/sql 内置连接池 |
| 错误处理 | 注意 SQLITE_BUSY 的重试机制 |
| 兼容性 | 继承 SQLite 的 SQL 特性,部分 PRAGMA 受限 |
下一章
→ 第 5 章:C API 与 Go 绑定详解 — 深入了解 dqlite 的 C API、Go 绑定、高级事务处理和错误处理机制。