强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

dqlite 分布式 SQLite 教程 / 第 4 章:基本操作

第 4 章:基本操作

本章介绍 dqlite 的基本操作,包括创建数据库、执行 SQL 语句、连接管理和错误处理。所有示例同时提供 C 和 Go 两种语言版本。


4.1 操作概览

dqlite 的基本操作遵循以下流程:

┌──────────┐     ┌──────────┐     ┌──────────┐     ┌──────────┐
│ 创建节点  │────▶│ 启动节点  │────▶│ 建立连接  │────▶│ 执行 SQL │
└──────────┘     └──────────┘     └──────────┘     └──────────┘
                                                        │
                                                  ┌─────┴─────┐
                                                  ▼           ▼
                                              ┌──────┐   ┌──────┐
                                              │ DDL  │   │ DML  │
                                              │(建表) │   │(增删改│
                                              └──────┘   │ 查)  │
                                                         └──────┘
阶段C APIGo API
创建节点dqlite_node_create()dqlite.New()
启动节点dqlite_node_start()node.Open()
建立连接dqlite_client_connect()sql.Open("dqlite", ...)
执行 SQLsqlite3_exec() via clientdb.Exec() / db.Query()

4.2 创建并启动节点

4.2.1 C 语言版本

/* basic_node.c - 创建并启动 dqlite 节点 */
#include <dqlite.h>
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <unistd.h>

static volatile int running = 1;

void handle_signal(int sig) {
    (void)sig;
    running = 0;
}

int main(int argc, char *argv[]) {
    dqlite_node *node;
    int rc;

    /* 解析参数 */
    uint64_t id = 1;
    const char *address = "127.0.0.1:9001";
    const char *data_dir = "/tmp/dqlite-basic-data";

    if (argc >= 3) {
        id = (uint64_t)atoi(argv[1]);
        address = argv[2];
    }
    if (argc >= 4) {
        data_dir = argv[3];
    }

    /* 创建节点 */
    rc = dqlite_node_create(id, data_dir, address, &node);
    if (rc != 0) {
        fprintf(stderr, "Error: Failed to create node (rc=%d)\n", rc);
        return EXIT_FAILURE;
    }

    /* 启动节点 */
    rc = dqlite_node_start(node);
    if (rc != 0) {
        fprintf(stderr, "Error: Failed to start node: %s\n",
                dqlite_node_errmsg(node));
        dqlite_node_destroy(node);
        return EXIT_FAILURE;
    }

    printf("dqlite node %lu started on %s (data: %s)\n",
           id, address, data_dir);

    /* 注册信号处理 */
    signal(SIGINT, handle_signal);
    signal(SIGTERM, handle_signal);

    /* 主循环 */
    while (running) {
        sleep(1);
    }

    /* 清理 */
    printf("\nShutting down node...\n");
    dqlite_node_stop(node);
    dqlite_node_destroy(node);
    printf("Node stopped.\n");

    return EXIT_SUCCESS;
}

编译和运行:

gcc -Wall -o basic_node basic_node.c $(pkg-config --cflags --libs dqlite) -lpthread
./basic_node 1 "127.0.0.1:9001" /tmp/dqlite-node1

4.2.2 Go 语言版本

// basic_node.go - 创建并启动 dqlite 节点
package main

import (
    "fmt"
    "log"
    "os"
    "os/signal"
    "syscall"

    dqlite "github.com/canonical/go-dqlite/v2"
)

func main() {
    id := uint64(1)
    address := "127.0.0.1:9001"
    dir := "/tmp/dqlite-go-data"

    if len(os.Args) >= 3 {
        fmt.Sscanf(os.Args[1], "%d", &id)
        address = os.Args[2]
    }
    if len(os.Args) >= 4 {
        dir = os.Args[3]
    }

    // 确保数据目录存在
    if err := os.MkdirAll(dir, 0755); err != nil {
        log.Fatalf("Failed to create data dir: %v", err)
    }

    // 创建节点
    node, err := dqlite.New(id, address, dir, nil)
    if err != nil {
        log.Fatalf("Failed to create node: %v", err)
    }
    defer node.Close()

    fmt.Printf("dqlite node %d started on %s (data: %s)\n", id, address, dir)

    // 等待信号
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
    <-sigCh

    fmt.Println("\nShutting down node...")
}

4.3 创建数据库

在 dqlite 中,数据库不需要显式创建。当你第一次 Open 一个数据库名时,它会自动创建。

4.3.1 数据库命名

规则说明
名称长度建议 < 255 字符
后缀无强制要求,建议使用 .db
路径不支持路径分隔符,数据库在节点数据目录内
并发创建同名数据库只有一个,多次 Open 返回同一实例

4.3.2 Go 版本:使用 database/sql

package main

import (
    "context"
    "database/sql"
    "fmt"
    "log"
    "os"
    "time"

    dqlite "github.com/canonical/go-dqlite/v2"
    "github.com/canonical/go-dqlite/v2/driver"
)

func main() {
    dir, _ := os.MkdirTemp("", "dqlite-basic")
    defer os.RemoveAll(dir)

    // 启动节点
    node, err := dqlite.New(1, "127.0.0.1:9001", dir, nil)
    if err != nil {
        log.Fatal(err)
    }
    defer node.Close()

    // 创建驱动
    store := driver.NewInmemNodeStore()
    store.Set(context.Background(), []driver.NodeInfo{
        {ID: 1, Address: "127.0.0.1:9001"},
    })

    drv, err := driver.New(store)
    if err != nil {
        log.Fatal(err)
    }
    defer drv.Close()

    db := sql.OpenDB(drv)
    defer db.Close()

    // 等待连接
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    if err := db.PingContext(ctx); err != nil {
        log.Fatal(err)
    }

    fmt.Println("Database 'app.db' created (auto-created on first access)")
    _ = db // 使用 db 进行后续操作
}

4.3.3 C 版本:通过客户端连接

/* 使用 dqlite 客户端 API 连接数据库 */
#include <dqlite.h>
#include <stdio.h>

/* 注意:完整的客户端 API 使用需要包含 client.h,
 * 这里展示概念。实际使用参见第 5 章 */

/*
 * dqlite 的数据库创建是隐式的:
 * 1. 启动节点
 * 2. 通过客户端协议连接
 * 3. 发送 Open 请求指定数据库名
 * 4. 如果数据库不存在则自动创建
 */

int main() {
    printf("dqlite databases are created implicitly on first Open.\n");
    printf("See Chapter 5 for complete C API client examples.\n");
    return 0;
}

4.4 SQL 操作

4.4.1 创建表(DDL)

// 创建表
_, err := db.Exec(`
    CREATE TABLE IF NOT EXISTS users (
        id         INTEGER PRIMARY KEY AUTOINCREMENT,
        username   TEXT    NOT NULL UNIQUE,
        email      TEXT    NOT NULL,
        age        INTEGER DEFAULT 0,
        created_at TEXT    DEFAULT (datetime('now'))
    )
`)
if err != nil {
    log.Fatalf("Create table failed: %v", err)
}

// 创建索引
_, err = db.Exec(`
    CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)
`)
if err != nil {
    log.Fatalf("Create index failed: %v", err)
}

// 创建多个表
schema := `
    CREATE TABLE IF NOT EXISTS posts (
        id         INTEGER PRIMARY KEY AUTOINCREMENT,
        user_id    INTEGER NOT NULL REFERENCES users(id),
        title      TEXT    NOT NULL,
        content    TEXT,
        created_at TEXT    DEFAULT (datetime('now'))
    );
    CREATE INDEX IF NOT EXISTS idx_posts_user ON posts(user_id);
`
_, err = db.Exec(schema)
if err != nil {
    log.Fatalf("Create posts table failed: %v", err)
}

4.4.2 插入数据

// 单条插入(参数绑定)
result, err := db.Exec(
    "INSERT INTO users (username, email, age) VALUES (?, ?, ?)",
    "zhangsan", "[email protected]", 28,
)
if err != nil {
    log.Fatalf("Insert failed: %v", err)
}
id, _ := result.LastInsertId()
rows, _ := result.RowsAffected()
fmt.Printf("Inserted: id=%d, affected=%d\n", id, rows)

// 批量插入(事务内)
tx, err := db.Begin()
if err != nil {
    log.Fatal(err)
}
defer tx.Rollback() // 如果 Commit 成功,Rollback 是空操作

users := []struct {
    Username string
    Email    string
    Age      int
}{
    {"lisi", "[email protected]", 25},
    {"wangwu", "[email protected]", 32},
    {"zhaoliu", "[email protected]", 19},
}

stmt, err := tx.Prepare("INSERT INTO users (username, email, age) VALUES (?, ?, ?)")
if err != nil {
    log.Fatal(err)
}
defer stmt.Close()

for _, u := range users {
    _, err := stmt.Exec(u.Username, u.Email, u.Age)
    if err != nil {
        log.Fatalf("Batch insert failed: %v", err)
    }
}

if err := tx.Commit(); err != nil {
    log.Fatalf("Commit failed: %v", err)
}
fmt.Println("Batch insert committed successfully")

4.4.3 查询数据

// 查询所有用户
rows, err := db.Query("SELECT id, username, email, age FROM users ORDER BY id")
if err != nil {
    log.Fatal(err)
}
defer rows.Close()

fmt.Println("=== All Users ===")
fmt.Printf("%-5s %-12s %-25s %-5s\n", "ID", "Username", "Email", "Age")
fmt.Println(strings.Repeat("-", 50))

for rows.Next() {
    var id, age int
    var username, email string
    if err := rows.Scan(&id, &username, &email, &age); err != nil {
        log.Fatal(err)
    }
    fmt.Printf("%-5d %-12s %-25s %-5d\n", id, username, email, age)
}
if err := rows.Err(); err != nil {
    log.Fatal(err)
}

// 单行查询
var name string
err = db.QueryRow("SELECT username FROM users WHERE id = ?", 1).Scan(&name)
switch {
case err == sql.ErrNoRows:
    fmt.Println("User not found")
case err != nil:
    log.Fatal(err)
default:
    fmt.Printf("User 1: %s\n", name)
}

// 聚合查询
var count int
err = db.QueryRow("SELECT COUNT(*) FROM users WHERE age > ?", 20).Scan(&count)
if err != nil {
    log.Fatal(err)
}
fmt.Printf("Users with age > 20: %d\n", count)

4.4.4 更新和删除

// 更新
result, err := db.Exec(
    "UPDATE users SET email = ? WHERE username = ?",
    "[email protected]", "zhangsan",
)
if err != nil {
    log.Fatalf("Update failed: %v", err)
}
affected, _ := result.RowsAffected()
fmt.Printf("Updated %d row(s)\n", affected)

// 删除
result, err = db.Exec("DELETE FROM users WHERE age < ?", 20)
if err != nil {
    log.Fatalf("Delete failed: %v", err)
}
affected, _ = result.RowsAffected()
fmt.Printf("Deleted %d row(s)\n", affected)

4.5 连接管理

4.5.1 Go 连接池配置

import "database/sql"

db := sql.OpenDB(drv)

// 设置连接池参数
db.SetMaxOpenConns(10)       // 最大打开连接数
db.SetMaxIdleConns(5)        // 最大空闲连接数
db.SetConnMaxLifetime(0)     // 连接最大生存时间(0 = 不限)
db.SetConnMaxIdleTime(5 * time.Minute) // 空闲连接最大存活时间

// 验证连接
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
if err := db.PingContext(ctx); err != nil {
    log.Fatalf("Connection failed: %v", err)
}

4.5.2 连接参数建议

场景MaxOpenConnsMaxIdleConns说明
低并发嵌入式1-31最小资源占用
中等并发5-103-5一般应用
高并发20-5010-20需注意 SQLite 并发限制

注意: SQLite 本身是单写入者模型。即使配置了大量连接,写操作仍然会串行执行。过多的连接反而会增加竞争和上下文切换开销。

4.5.3 连接错误处理

import (
    "database/sql"
    "errors"
    "strings"
)

func isRetryable(err error) bool {
    if err == nil {
        return false
    }
    // SQLite BUSY 错误通常是可重试的
    if strings.Contains(err.Error(), "database is locked") {
        return true
    }
    if strings.Contains(err.Error(), "SQLITE_BUSY") {
        return true
    }
    return false
}

func execWithRetry(db *sql.DB, query string, args ...interface{}) (sql.Result, error) {
    var result sql.Result
    var err error

    for i := 0; i < 3; i++ {
        result, err = db.Exec(query, args...)
        if err == nil {
            return result, nil
        }
        if !isRetryable(err) {
            return nil, err
        }
        time.Sleep(time.Duration(10*(1<<i)) * time.Millisecond) // 指数退避
    }
    return nil, fmt.Errorf("failed after 3 retries: %w", err)
}

4.5.4 连接关闭

// 应用退出时正确关闭数据库连接
func main() {
    db, err := sql.OpenDB(drv)
    if err != nil {
        log.Fatal(err)
    }
    defer func() {
        if err := db.Close(); err != nil {
            log.Printf("Error closing database: %v", err)
        }
    }()

    // ... 业务逻辑
}

4.6 事务管理

4.6.1 基本事务

// 基本事务
tx, err := db.Begin()
if err != nil {
    log.Fatal(err)
}

_, err = tx.Exec("UPDATE accounts SET balance = balance - ? WHERE id = ?", 100, 1)
if err != nil {
    tx.Rollback()
    log.Fatal(err)
}

_, err = tx.Exec("UPDATE accounts SET balance = balance + ? WHERE id = ?", 100, 2)
if err != nil {
    tx.Rollback()
    log.Fatal(err)
}

if err := tx.Commit(); err != nil {
    log.Fatal(err)
}
fmt.Println("Transfer completed")

4.6.2 使用 defer 确保回滚

func transferMoney(db *sql.DB, fromID, toID int, amount float64) error {
    tx, err := db.Begin()
    if err != nil {
        return err
    }
    defer tx.Rollback() // Commit 成功后 Rollback 是空操作

    // 检查余额
    var balance float64
    err = tx.QueryRow("SELECT balance FROM accounts WHERE id = ?", fromID).Scan(&balance)
    if err != nil {
        return fmt.Errorf("query balance: %w", err)
    }
    if balance < amount {
        return fmt.Errorf("insufficient balance: %.2f < %.2f", balance, amount)
    }

    // 扣减
    _, err = tx.Exec("UPDATE accounts SET balance = balance - ? WHERE id = ?", amount, fromID)
    if err != nil {
        return fmt.Errorf("debit: %w", err)
    }

    // 增加
    _, err = tx.Exec("UPDATE accounts SET balance = balance + ? WHERE id = ?", amount, toID)
    if err != nil {
        return fmt.Errorf("credit: %w", err)
    }

    return tx.Commit()
}

4.6.3 只读事务

// 使用只读事务获取一致性快照
tx, err := db.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
if err != nil {
    log.Fatal(err)
}
defer tx.Rollback()

// 在同一个事务中多次读取保证一致性
var totalAccounts int
var totalBalance float64

err = tx.QueryRow("SELECT COUNT(*) FROM accounts").Scan(&totalAccounts)
if err != nil {
    log.Fatal(err)
}

err = tx.QueryRow("SELECT COALESCE(SUM(balance), 0) FROM accounts").Scan(&totalBalance)
if err != nil {
    log.Fatal(err)
}

fmt.Printf("Accounts: %d, Total balance: %.2f\n", totalAccounts, totalBalance)

4.7 错误处理

4.7.1 常见错误码

SQLite 错误码含义处理方式
SQLITE_BUSY (5)数据库被锁定等待并重试
SQLITE_CONSTRAINT (19)约束违反(UNIQUE、FOREIGN KEY)检查数据
SQLITE_NOTFOUND (12)未找到记录检查查询条件
SQLITE_CORRUPT (11)数据库文件损坏从备份恢复
SQLITE_FULL (13)磁盘空间不足清理磁盘空间
SQLITE_MISUSE (21)API 使用不当检查代码逻辑

4.7.2 Go 错误处理模式

import (
    "database/sql"
    "errors"
    "fmt"
)

type DqliteError struct {
    Code    int
    Message string
}

func (e *DqliteError) Error() string {
    return fmt.Sprintf("dqlite error %d: %s", e.Code, e.Message)
}

func handleDBError(err error) error {
    if err == nil {
        return nil
    }

    // sql.ErrNoRows
    if errors.Is(err, sql.ErrNoRows) {
        return fmt.Errorf("record not found: %w", err)
    }

    // 约束违反
    if strings.Contains(err.Error(), "UNIQUE constraint failed") {
        return fmt.Errorf("duplicate record: %w", err)
    }

    if strings.Contains(err.Error(), "FOREIGN KEY constraint failed") {
        return fmt.Errorf("referenced record not found: %w", err)
    }

    // 数据库锁定
    if strings.Contains(err.Error(), "database is locked") {
        return fmt.Errorf("database busy, retry later: %w", err)
    }

    return err
}

4.7.3 C 错误处理模式

#include <dqlite.h>
#include <stdio.h>
#include <stdlib.h>

#define CHECK_RC(node, rc, msg)                                    \
    do {                                                           \
        if ((rc) != 0) {                                           \
            fprintf(stderr, "Error [%s]: %s (rc=%d)\n",            \
                    (msg), dqlite_node_errmsg(node), (rc));        \
            goto cleanup;                                          \
        }                                                          \
    } while (0)

int main() {
    dqlite_node *node = NULL;
    int rc;

    rc = dqlite_node_create(1, "/tmp/dqlite-err-test", "127.0.0.1:9001", &node);
    CHECK_RC(node, rc, "create node");

    rc = dqlite_node_start(node);
    CHECK_RC(node, rc, "start node");

    printf("Node started successfully\n");

    /* 业务逻辑 ... */

cleanup:
    if (node != NULL) {
        dqlite_node_stop(node);
        dqlite_node_destroy(node);
    }
    return rc;
}

4.8 实战示例:用户管理系统

以下是一个完整的用户管理 CRUD 示例:

package main

import (
    "context"
    "database/sql"
    "fmt"
    "log"
    "os"
    "time"

    dqlite "github.com/canonical/go-dqlite/v2"
    "github.com/canonical/go-dqlite/v2/driver"
)

// User 数据模型
type User struct {
    ID        int64
    Username  string
    Email     string
    Age       int
    CreatedAt string
}

// UserStore 用户存储
type UserStore struct {
    db *sql.DB
}

func NewUserStore(db *sql.DB) (*UserStore, error) {
    // 初始化表
    _, err := db.Exec(`
        CREATE TABLE IF NOT EXISTS users (
            id         INTEGER PRIMARY KEY AUTOINCREMENT,
            username   TEXT NOT NULL UNIQUE,
            email      TEXT NOT NULL,
            age        INTEGER DEFAULT 0,
            created_at TEXT DEFAULT (datetime('now'))
        )
    `)
    if err != nil {
        return nil, err
    }
    return &UserStore{db: db}, nil
}

func (s *UserStore) Create(user *User) error {
    result, err := s.db.Exec(
        "INSERT INTO users (username, email, age) VALUES (?, ?, ?)",
        user.Username, user.Email, user.Age,
    )
    if err != nil {
        return err
    }
    user.ID, _ = result.LastInsertId()
    return nil
}

func (s *UserStore) GetByID(id int64) (*User, error) {
    u := &User{}
    err := s.db.QueryRow(
        "SELECT id, username, email, age, created_at FROM users WHERE id = ?", id,
    ).Scan(&u.ID, &u.Username, &u.Email, &u.Age, &u.CreatedAt)
    if err != nil {
        return nil, err
    }
    return u, nil
}

func (s *UserStore) List() ([]User, error) {
    rows, err := s.db.Query("SELECT id, username, email, age, created_at FROM users ORDER BY id")
    if err != nil {
        return nil, err
    }
    defer rows.Close()

    var users []User
    for rows.Next() {
        var u User
        if err := rows.Scan(&u.ID, &u.Username, &u.Email, &u.Age, &u.CreatedAt); err != nil {
            return nil, err
        }
        users = append(users, u)
    }
    return users, rows.Err()
}

func (s *UserStore) UpdateEmail(id int64, email string) error {
    result, err := s.db.Exec("UPDATE users SET email = ? WHERE id = ?", email, id)
    if err != nil {
        return err
    }
    affected, _ := result.RowsAffected()
    if affected == 0 {
        return sql.ErrNoRows
    }
    return nil
}

func (s *UserStore) Delete(id int64) error {
    result, err := s.db.Exec("DELETE FROM users WHERE id = ?", id)
    if err != nil {
        return err
    }
    affected, _ := result.RowsAffected()
    if affected == 0 {
        return sql.ErrNoRows
    }
    return nil
}

func main() {
    dir, _ := os.MkdirTemp("", "dqlite-crud")
    defer os.RemoveAll(dir)

    // 启动节点
    node, err := dqlite.New(1, "127.0.0.1:9001", dir, nil)
    if err != nil {
        log.Fatal(err)
    }
    defer node.Close()

    // 创建驱动
    store := driver.NewInmemNodeStore()
    store.Set(context.Background(), []driver.NodeInfo{
        {ID: 1, Address: "127.0.0.1:9001"},
    })
    drv, err := driver.New(store)
    if err != nil {
        log.Fatal(err)
    }
    defer drv.Close()

    db := sql.OpenDB(drv)
    defer db.Close()

    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    if err := db.PingContext(ctx); err != nil {
        log.Fatal(err)
    }

    // 初始化存储
    store2, err := NewUserStore(db)
    if err != nil {
        log.Fatal(err)
    }

    // Create
    user := &User{Username: "zhangsan", Email: "[email protected]", Age: 28}
    if err := store2.Create(user); err != nil {
        log.Fatalf("Create failed: %v", err)
    }
    fmt.Printf("Created: %+v\n", user)

    // Read
    fetched, err := store2.GetByID(user.ID)
    if err != nil {
        log.Fatalf("Get failed: %v", err)
    }
    fmt.Printf("Fetched: %+v\n", fetched)

    // Update
    if err := store2.UpdateEmail(user.ID, "[email protected]"); err != nil {
        log.Fatalf("Update failed: %v", err)
    }
    fmt.Println("Email updated")

    // List
    users, err := store2.List()
    if err != nil {
        log.Fatalf("List failed: %v", err)
    }
    fmt.Printf("All users: %d\n", len(users))
    for _, u := range users {
        fmt.Printf("  [%d] %s <%s> age=%d\n", u.ID, u.Username, u.Email, u.Age)
    }

    // Delete
    if err := store2.Delete(user.ID); err != nil {
        log.Fatalf("Delete failed: %v", err)
    }
    fmt.Println("User deleted")
}

4.9 SQL 兼容性

dqlite 使用 SQLite 作为底层存储,因此继承了 SQLite 的 SQL 语法。

4.9.1 支持的 SQL 特性

特性支持情况说明
CREATE TABLE✅ 完全支持含约束、默认值
ALTER TABLE✅ 支持RENAME、ADD COLUMN
CREATE INDEX✅ 支持B-tree 索引
INSERT / UPDATE / DELETE✅ 完全支持含参数绑定
SELECT✅ 完全支持JOIN、子查询、CTE
事务 (BEGIN/COMMIT/ROLLBACK)✅ 支持通过 Raft 复制
PRAGMA⚠️ 部分支持有些 PRAGMA 被 dqlite 拦截
ATTACH DATABASE❌ 不支持dqlite 管理数据库生命周期
EXPLAIN✅ 支持查询计划分析

4.9.2 不支持或受限的特性

特性说明
ATTACH DATABASE不能在 dqlite 中附加其他数据库
VACUUM建议在快照时自动处理
自定义函数需要在编译时注册
扩展(FTS、R-Tree等)取决于编译选项

本章小结

要点说明
数据库创建隐式创建,首次 Open 时自动生成
SQL 操作标准 SQLite SQL 语法,参数绑定推荐用 ? 占位符
事务管理使用 defer tx.Rollback() 模式确保安全
连接池Go 的 database/sql 内置连接池
错误处理注意 SQLITE_BUSY 的重试机制
兼容性继承 SQLite 的 SQL 特性,部分 PRAGMA 受限

下一章

第 5 章:C API 与 Go 绑定详解 — 深入了解 dqlite 的 C API、Go 绑定、高级事务处理和错误处理机制。