强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

dqlite 分布式 SQLite 教程 / 第 5 章:C API 与 Go 绑定详解

第 5 章:C API 与 Go 绑定详解

本章深入讲解 dqlite 的 C 语言 API 和 Go 语言绑定,包括完整的客户端连接、SQL 执行、参数绑定、回调机制、事务处理和错误处理。


5.1 C API 概述

dqlite 的 C API 分为两个层次:

层次头文件说明
节点管理dqlite.h创建、启动、停止节点
客户端连接dqlite/client.h连接节点、执行 SQL、管理事务

5.1.1 核心数据结构

/* dqlite 核心类型 */
typedef struct dqlite_node dqlite_node;    /* 节点实例 */
typedef struct dqlite_client dqlite_client; /* 客户端连接 */

/* 节点状态 */
struct dqlite_node_info {
    uint64_t id;          /* 节点 ID */
    const char *address;  /* 节点地址 (host:port) */
    bool      role;       /* 角色 (voter/standby/spare) */
};

5.2 节点管理 API

5.2.1 完整节点生命周期

/* node_lifecycle.c */
#include <dqlite.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <unistd.h>
#include <errno.h>

static volatile sig_atomic_t running = 1;

static void signal_handler(int sig) {
    (void)sig;
    running = 0;
}

/* 创建并配置节点 */
static dqlite_node *create_node(uint64_t id, const char *dir, const char *addr) {
    dqlite_node *node = NULL;
    int rc;

    /* 确保数据目录存在 */
    if (mkdir(dir, 0755) != 0 && errno != EEXIST) {
        fprintf(stderr, "Failed to create dir %s: %s\n", dir, strerror(errno));
        return NULL;
    }

    /* 创建节点 */
    rc = dqlite_node_create(id, dir, addr, &node);
    if (rc != 0) {
        fprintf(stderr, "dqlite_node_create failed: %d\n", rc);
        return NULL;
    }

    /* 设置绑定地址 */
    rc = dqlite_node_set_bind_address(node, addr);
    if (rc != 0) {
        fprintf(stderr, "set_bind_address failed: %s\n", dqlite_node_errmsg(node));
        dqlite_node_destroy(node);
        return NULL;
    }

    return node;
}

int main(void) {
    dqlite_node *node;
    int rc;

    /* 创建节点 */
    node = create_node(1, "/tmp/dqlite-api-data", "127.0.0.1:9001");
    if (node == NULL) {
        return EXIT_FAILURE;
    }

    /* 启动节点 */
    rc = dqlite_node_start(node);
    if (rc != 0) {
        fprintf(stderr, "dqlite_node_start failed: %s\n", dqlite_node_errmsg(node));
        dqlite_node_destroy(node);
        return EXIT_FAILURE;
    }

    printf("Node started. PID: %d\n", getpid());
    printf("Data directory: /tmp/dqlite-api-data\n");
    printf("Bind address: 127.0.0.1:9001\n");

    /* 注册信号 */
    signal(SIGINT, signal_handler);
    signal(SIGTERM, signal_handler);

    /* 事件循环 */
    while (running) {
        sleep(1);
        /* 节点内部使用 libuv 事件循环,自动处理网络和 Raft 通信 */
    }

    /* 优雅关闭 */
    printf("\nReceived shutdown signal...\n");

    rc = dqlite_node_stop(node);
    if (rc != 0) {
        fprintf(stderr, "dqlite_node_stop failed: %s\n", dqlite_node_errmsg(node));
    }

    dqlite_node_destroy(node);
    printf("Node stopped and cleaned up.\n");

    return EXIT_SUCCESS;
}

5.2.2 节点配置选项

函数说明默认值
dqlite_node_set_bind_address()设置网络监听地址必须设置
dqlite_node_set_network_latency_ms()设置网络延迟估计通常自动检测
dqlite_node_set_snapshot_threshold()快照触发的日志条目阈值1024
dqlite_node_set_snapshot_trailing()快照后保留的日志条目数2048
/* 配置节点参数 */
dqlite_node *node;
dqlite_node_create(1, "/tmp/data", "127.0.0.1:9001", &node);

/* 设置网络延迟(毫秒)- 影响选举超时计算 */
dqlite_node_set_network_latency_ms(node, 20);

/* 设置快照阈值 - 超过此数量的日志条目触发快照 */
dqlite_node_set_snapshot_threshold(node, 2048);

/* 设置快照后保留的日志条目数 */
dqlite_node_set_snapshot_trailing(node, 4096);

5.3 客户端连接 API

5.3.1 建立客户端连接

/* client_connect.c */
#include <dqlite.h>
#include <dqlite/client.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>

/* 创建到 dqlite 节点的 TCP 连接 */
static int tcp_connect(const char *host, int port) {
    int sockfd;
    struct sockaddr_in addr;

    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0) {
        perror("socket");
        return -1;
    }

    memset(&addr, 0, sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_port = htons(port);
    inet_pton(AF_INET, host, &addr.sin_addr);

    if (connect(sockfd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
        perror("connect");
        close(sockfd);
        return -1;
    }

    return sockfd;
}

int main(void) {
    int fd;
    struct dqlite_client client;
    int rc;

    /* 1. 建立 TCP 连接 */
    fd = tcp_connect("127.0.0.1", 9001);
    if (fd < 0) {
        fprintf(stderr, "Failed to connect to dqlite node\n");
        return EXIT_FAILURE;
    }

    /* 2. 初始化客户端 */
    rc = dqlite_client_init(&client, fd);
    if (rc != 0) {
        fprintf(stderr, "dqlite_client_init failed: %d\n", rc);
        close(fd);
        return EXIT_FAILURE;
    }

    /* 3. 发送握手 */
    /* 客户端 ID 用于标识连接,可以是任意唯一值 */
    rc = dqlite_client_handshake(&client, 1);
    if (rc != 0) {
        fprintf(stderr, "Handshake failed\n");
        goto cleanup;
    }
    printf("Handshake successful\n");

    /* 4. 打开数据库 */
    uint32_t db_id;
    rc = dqlite_client_open(&client, "myapp.db", &db_id);
    if (rc != 0) {
        fprintf(stderr, "Open database failed\n");
        goto cleanup;
    }
    printf("Database opened: id=%u\n", db_id);

    /* 后续操作见下文 ... */

cleanup:
    dqlite_client_close(&client);
    close(fd);
    return rc == 0 ? EXIT_SUCCESS : EXIT_FAILURE;
}

5.3.2 执行 SQL(Exec)

/* 执行 DDL 和 DML 语句 */
int exec_sql(struct dqlite_client *client, uint32_t db_id, const char *sql) {
    struct dqlite_client_stmt stmt;
    int rc;

    /* 准备语句 */
    rc = dqlite_client_prepare(client, sql, &stmt);
    if (rc != 0) {
        fprintf(stderr, "Prepare failed: %s\n", sql);
        return rc;
    }

    /* 执行(无返回结果集) */
    rc = dqlite_client_exec(client, &stmt, NULL);
    if (rc != 0) {
        fprintf(stderr, "Exec failed: %s\n", sql);
        return rc;
    }

    return 0;
}

/* 示例:创建表并插入数据 */
int setup_database(struct dqlite_client *client, uint32_t db_id) {
    int rc;

    /* 创建表 */
    rc = exec_sql(client, db_id,
        "CREATE TABLE IF NOT EXISTS products ("
        "  id INTEGER PRIMARY KEY AUTOINCREMENT,"
        "  name TEXT NOT NULL,"
        "  price REAL NOT NULL,"
        "  stock INTEGER DEFAULT 0"
        ")");
    if (rc != 0) return rc;

    /* 插入数据 */
    rc = exec_sql(client, db_id,
        "INSERT INTO products (name, price, stock) VALUES ('Widget', 9.99, 100)");
    if (rc != 0) return rc;

    rc = exec_sql(client, db_id,
        "INSERT INTO products (name, price, stock) VALUES ('Gadget', 19.99, 50)");
    if (rc != 0) return rc;

    return 0;
}

5.3.3 查询数据(Query)

/* 查询数据 */
int query_products(struct dqlite_client *client, uint32_t db_id) {
    struct dqlite_client_stmt stmt;
    struct dqlite_client_rows rows;
    int rc;

    /* 准备查询 */
    rc = dqlite_client_prepare(client,
        "SELECT id, name, price, stock FROM products WHERE price > ?", &stmt);
    if (rc != 0) return rc;

    /* 绑定参数 */
    rc = dqlite_client_bind_double(client, &stmt, 0, 10.0);
    if (rc != 0) return rc;

    /* 执行查询 */
    rc = dqlite_client_query(client, &stmt, &rows);
    if (rc != 0) return rc;

    /* 遍历结果 */
    printf("%-5s %-15s %-10s %-8s\n", "ID", "Name", "Price", "Stock");
    printf("%-5s %-15s %-10s %-8s\n", "----", "--------------", "---------", "-------");

    while (dqlite_client_next_row(client, &rows)) {
        int64_t id = dqlite_client_column_int64(&rows, 0);
        const char *name = dqlite_client_column_text(&rows, 1);
        double price = dqlite_client_column_double(&rows, 2);
        int64_t stock = dqlite_client_column_int64(&rows, 3);

        printf("%-5lld %-15s %-10.2f %-8lld\n",
               (long long)id, name, price, (long long)stock);
    }

    /* 释放结果集 */
    dqlite_client_rows_close(client, &rows);

    return 0;
}

5.4 参数绑定详解

dqlite 支持位置参数(?)和命名参数(:name@name$name)。

5.4.1 C 参数绑定类型

函数绑定类型对应 SQL 类型
dqlite_client_bind_int64()64 位整数INTEGER
dqlite_client_bind_double()双精度浮点REAL
dqlite_client_bind_text()文本字符串TEXT
dqlite_client_bind_blob()二进制数据BLOB
dqlite_client_bind_null()NULL 值NULL
/* 参数绑定示例 */

/* 位置参数 (? 占位符) */
rc = dqlite_client_prepare(client,
    "INSERT INTO users (name, email, age, bio) VALUES (?, ?, ?, ?)", &stmt);

dqlite_client_bind_text(client, &stmt, 0, "张三");          /* ?1 */
dqlite_client_bind_text(client, &stmt, 1, "[email protected]");   /* ?2 */
dqlite_client_bind_int64(client, &stmt, 2, 28);              /* ?3 */
dqlite_client_bind_text(client, &stmt, 3, "Software dev");   /* ?4 */

rc = dqlite_client_exec(client, &stmt, NULL);

/* 命名参数 (:name 占位符) */
rc = dqlite_client_prepare(client,
    "INSERT INTO orders (product_id, quantity, total) "
    "VALUES (:pid, :qty, :total)", &stmt);

dqlite_client_bind_int64(client, &stmt, 0, 42);    /* :pid */
dqlite_client_bind_int64(client, &stmt, 1, 5);     /* :qty */
dqlite_client_bind_double(client, &stmt, 2, 99.95); /* :total */

rc = dqlite_client_exec(client, &stmt, NULL);

/* BLOB 绑定 */
unsigned char data[] = {0x89, 0x50, 0x4E, 0x47}; /* PNG 头部 */
rc = dqlite_client_prepare(client,
    "INSERT INTO files (name, content) VALUES (?, ?)", &stmt);

dqlite_client_bind_text(client, &stmt, 0, "image.png");
dqlite_client_bind_blob(client, &stmt, 1, data, sizeof(data));

rc = dqlite_client_exec(client, &stmt, NULL);

5.4.2 Go 参数绑定

// Go 使用 database/sql 的标准参数绑定

// 位置参数
db.Exec("INSERT INTO users (name, email) VALUES (?, ?)", "张三", "[email protected]")

// 查询参数
var name string
db.QueryRow("SELECT name FROM users WHERE id = ?", 1).Scan(&name)

// IN 子句(使用展开技巧)
ids := []int{1, 2, 3, 4, 5}
query := "SELECT id, name FROM users WHERE id IN (?" + strings.Repeat(",?", len(ids)-1) + ")"
args := make([]interface{}, len(ids))
for i, id := range ids {
    args[i] = id
}
rows, err := db.Query(query, args...)

// LIKE 查询
db.Query("SELECT name FROM users WHERE name LIKE ?", "%张%")

// NULL 处理
var email sql.NullString
err = db.QueryRow("SELECT email FROM users WHERE id = ?", 1).Scan(&email)
if email.Valid {
    fmt.Println("Email:", email.String)
} else {
    fmt.Println("Email: NULL")
}

5.5 回调与日志

5.5.1 Go 日志回调

import (
    "log"
    dqlite "github.com/canonical/go-dqlite/v2"
    "github.com/canonical/go-dqlite/v2/driver"
)

// 自定义日志函数
logFunc := func(level dqlite.LogLevel, msg string, args ...interface{}) {
    prefix := "[dqlite]"
    switch level {
    case dqlite.LogDebug:
        prefix = "[DEBUG]"
    case dqlite.LogInfo:
        prefix = "[INFO] "
    case dqlite.LogWarn:
        prefix = "[WARN] "
    case dqlite.LogError:
        prefix = "[ERROR]"
    }
    log.Printf("%s %s", prefix, fmt.Sprintf(msg, args...))
}

// 创建带日志的驱动
drv, err := driver.New(nodeStore,
    driver.WithLogFunc(logFunc),
)

5.5.2 日志级别

级别说明
DQLITE_LOG_DEBUG0调试信息,最详细
DQLITE_LOG_INFO1一般信息
DQLITE_LOG_WARN2警告信息
DQLITE_LOG_ERROR3错误信息,最少
/* C 语言设置日志回调 */
#include <dqlite.h>
#include <stdio.h>
#include <stdarg.h>

void log_handler(void *data, int level, const char *fmt, va_list args) {
    const char *prefix;
    switch (level) {
        case DQLITE_LOG_DEBUG: prefix = "DEBUG"; break;
        case DQLITE_LOG_INFO:  prefix = "INFO";  break;
        case DQLITE_LOG_WARN:  prefix = "WARN";  break;
        case DQLITE_LOG_ERROR: prefix = "ERROR"; break;
        default:               prefix = "?????"; break;
    }

    fprintf(stderr, "[dqlite-%s] ", prefix);
    vfprintf(stderr, fmt, args);
    fprintf(stderr, "\n");
}

/* 在创建节点前设置 */
/* 注意:具体的 API 可能随版本变化,请参考头文件 */

5.6 事务高级用法

5.6.1 隔离级别

在 dqlite 中,由于 Raft 共识的性质,所有已提交的事务都是全局有序的。dqlite 提供以下隔离保证:

隔离级别说明dqlite 支持
READ UNCOMMITTED可读未提交数据
READ COMMITTED只读已提交数据✅(默认)
REPEATABLE READ可重复读
SERIALIZABLE完全串行化
// Go 事务选项
tx, err := db.BeginTx(ctx, &sql.TxOptions{
    Isolation: sql.LevelSerializable,
    ReadOnly:  false,
})

// 只读事务(可以分散到 Follower 读取)
tx, err := db.BeginTx(ctx, &sql.TxOptions{
    ReadOnly: true,
})

5.6.2 保存点(Savepoint)

func complexOperation(db *sql.DB) error {
    tx, err := db.Begin()
    if err != nil {
        return err
    }
    defer tx.Rollback()

    // 主操作
    _, err = tx.Exec("INSERT INTO orders (customer_id, total) VALUES (?, ?)", 1, 100.0)
    if err != nil {
        return err
    }

    // 使用保存点
    _, err = tx.Exec("SAVEPOINT sp_inventory")
    if err != nil {
        return err
    }

    _, err = tx.Exec("UPDATE products SET stock = stock - 1 WHERE id = ? AND stock > 0", 42)
    if err != nil {
        // 库存不足,回滚到保存点(不影响订单插入)
        tx.Exec("ROLLBACK TO sp_inventory")
        log.Println("Inventory insufficient, order still created")
    } else {
        tx.Exec("RELEASE sp_inventory")
    }

    return tx.Commit()
}

5.6.3 批量操作优化

func batchInsert(db *sql.DB, records []Record) error {
    tx, err := db.Begin()
    if err != nil {
        return err
    }
    defer tx.Rollback()

    // 预编译语句(在事务内只编译一次)
    stmt, err := tx.Prepare("INSERT INTO logs (level, message, created_at) VALUES (?, ?, ?)")
    if err != nil {
        return err
    }
    defer stmt.Close()

    // 批量执行
    for _, r := range records {
        _, err := stmt.Exec(r.Level, r.Message, r.CreatedAt)
        if err != nil {
            return fmt.Errorf("insert record: %w", err)
        }
    }

    // 一次提交(所有记录在同一个 Raft 日志条目中)
    return tx.Commit()
}

性能提示: 在 dqlite 中,一个事务的所有操作被封装为一个 Raft 日志条目。批量插入时,将所有 INSERT 放在同一个事务中,可以显著减少 Raft 复制开销。


5.7 错误处理详解

5.7.1 C API 错误处理

/* 完整的错误处理模式 */
#include <dqlite.h>
#include <errno.h>
#include <string.h>

typedef struct {
    int code;
    char message[256];
} DqliteError;

/* 封装错误检查宏 */
#define DQLITE_TRY(node, expr)                                      \
    do {                                                            \
        int _rc = (expr);                                           \
        if (_rc != 0) {                                             \
            snprintf(err->message, sizeof(err->message),            \
                     "%s failed (rc=%d): %s",                       \
                     #expr, _rc, dqlite_node_errmsg(node));         \
            err->code = _rc;                                        \
            return -1;                                              \
        }                                                           \
    } while (0)

int safe_node_operation(DqliteError *err) {
    dqlite_node *node = NULL;

    DQLITE_TRY(node, dqlite_node_create(1, "/tmp/safe-data", "127.0.0.1:9001", &node));
    DQLITE_TRY(node, dqlite_node_start(node));

    /* ... 业务逻辑 ... */

    DQLITE_TRY(node, dqlite_node_stop(node));
    dqlite_node_destroy(node);
    return 0;
}

5.7.2 Go 错误分类与处理

import (
    "errors"
    "fmt"
    "strings"
    "database/sql"
)

// 错误分类
type ErrorCategory int

const (
    ErrorCategoryTransient ErrorCategory = iota // 临时错误,可重试
    ErrorCategoryPermanent                       // 永久错误,不可重试
    ErrorCategoryConflict                        // 冲突错误,可重试
)

func categorizeError(err error) ErrorCategory {
    if err == nil {
        return ErrorCategoryPermanent
    }

    msg := err.Error()

    // 临时错误 - 可重试
    if strings.Contains(msg, "database is locked") ||
       strings.Contains(msg, "SQLITE_BUSY") ||
       strings.Contains(msg, "connection refused") ||
       strings.Contains(msg, "no leader") {
        return ErrorCategoryTransient
    }

    // 冲突错误 - 可重试
    if strings.Contains(msg, "UNIQUE constraint") {
        return ErrorCategoryConflict
    }

    // 永久错误 - 不可重试
    return ErrorCategoryPermanent
}

// 带重试的执行器
func execWithRetry(ctx context.Context, db *sql.DB, maxRetries int,
    query string, args ...interface{}) (sql.Result, error) {

    var lastErr error
    for i := 0; i <= maxRetries; i++ {
        result, err := db.ExecContext(ctx, query, args...)
        if err == nil {
            return result, nil
        }

        lastErr = err
        cat := categorizeError(err)
        if cat == ErrorCategoryPermanent {
            return nil, fmt.Errorf("permanent error: %w", err)
        }

        // 指数退避
        if i < maxRetries {
            delay := time.Duration(1<<uint(i)*10) * time.Millisecond
            select {
            case <-time.After(delay):
            case <-ctx.Done():
                return nil, ctx.Err()
            }
        }
    }
    return nil, fmt.Errorf("max retries exceeded: %w", lastErr)
}

5.7.3 常见错误场景对照表

错误消息原因解决方案
database is locked并发写入冲突重试或减少并发
UNIQUE constraint failed唯一键冲突检查数据或使用 INSERT OR IGNORE
FOREIGN KEY constraint failed外键引用不存在先插入被引用记录
NOT NULL constraint failed缺少必填字段补充字段值
no such table表不存在先执行 CREATE TABLE
no leader集群无 Leader检查集群状态
server is not cluster leader请求发送到了 Follower重定向到 Leader
connection refused节点未启动启动节点
handshake failed协议不匹配检查版本兼容性

5.8 SQL 扩展与高级特性

5.8.1 UPSERT(INSERT OR REPLACE)

// SQLite 支持的 UPSERT 语法
_, err := db.Exec(`
    INSERT INTO config (key, value, updated_at)
    VALUES (?, ?, datetime('now'))
    ON CONFLICT(key) DO UPDATE SET
        value = excluded.value,
        updated_at = excluded.updated_at
`, "app.name", "MyApp")

// 或使用 INSERT OR REPLACE(注意:会删除旧记录后重新插入)
_, err = db.Exec(`
    INSERT OR REPLACE INTO settings (key, value)
    VALUES (?, ?)
`, "theme", "dark")

5.8.2 CTE(Common Table Expressions)

// 递归 CTE:获取组织架构树
rows, err := db.Query(`
    WITH RECURSIVE org_tree AS (
        -- 基础查询:顶级节点
        SELECT id, name, parent_id, 0 AS depth, name AS path
        FROM departments
        WHERE parent_id IS NULL

        UNION ALL

        -- 递归查询:子节点
        SELECT d.id, d.name, d.parent_id, t.depth + 1,
               t.path || ' > ' || d.name
        FROM departments d
        JOIN org_tree t ON d.parent_id = t.id
    )
    SELECT id, name, depth, path FROM org_tree ORDER BY path
`)

5.8.3 窗口函数

// 计算每个部门的薪资排名
rows, err := db.Query(`
    SELECT
        name,
        department,
        salary,
        RANK() OVER (PARTITION BY department ORDER BY salary DESC) as dept_rank,
        SUM(salary) OVER (PARTITION BY department) as dept_total
    FROM employees
    ORDER BY department, dept_rank
`)

5.9 完整客户端封装

以下是一个生产可用的 Go 客户端封装:

package dqliteclient

import (
    "context"
    "database/sql"
    "fmt"
    "log"
    "sync"
    "time"

    dqlite "github.com/canonical/go-dqlite/v2"
    "github.com/canonical/go-dqlite/v2/driver"
)

// Config 客户端配置
type Config struct {
    NodeID      uint64
    Address     string
    DataDir     string
    MaxConns    int
    LogFunc     func(dqlite.LogLevel, string, ...interface{})
}

// Client dqlite 客户端
type Client struct {
    config Config
    node   *dqlite.Node
    db     *sql.DB
    mu     sync.RWMutex
}

// New 创建新的客户端
func New(config Config) (*Client, error) {
    // 设置默认值
    if config.MaxConns == 0 {
        config.MaxConns = 10
    }

    // 创建节点
    node, err := dqlite.New(config.NodeID, config.Address, config.DataDir, nil)
    if err != nil {
        return nil, fmt.Errorf("create node: %w", err)
    }

    // 创建驱动
    store := driver.NewInmemNodeStore()
    store.Set(context.Background(), []driver.NodeInfo{
        {ID: config.NodeID, Address: config.Address},
    })

    drvOpts := []driver.Option{}
    if config.LogFunc != nil {
        drvOpts = append(drvOpts, driver.WithLogFunc(config.LogFunc))
    }

    drv, err := driver.New(store, drvOpts...)
    if err != nil {
        node.Close()
        return nil, fmt.Errorf("create driver: %w", err)
    }

    db := sql.OpenDB(drv)
    db.SetMaxOpenConns(config.MaxConns)
    db.SetMaxIdleConns(config.MaxConns / 2)
    db.SetConnMaxLifetime(0)

    // 验证连接
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    if err := db.PingContext(ctx); err != nil {
        db.Close()
        drv.Close()
        node.Close()
        return nil, fmt.Errorf("ping: %w", err)
    }

    return &Client{
        config: config,
        node:   node,
        db:     db,
    }, nil
}

// DB 获取数据库连接
func (c *Client) DB() *sql.DB {
    return c.db
}

// Close 关闭客户端
func (c *Client) Close() error {
    c.mu.Lock()
    defer c.mu.Unlock()

    if c.db != nil {
        c.db.Close()
    }
    if c.node != nil {
        c.node.Close()
    }
    return nil
}

// IsLeader 检查当前节点是否为 Leader
func (c *Client) IsLeader() bool {
    // 通过尝试写入来间接检测(简化实现)
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
    defer cancel()
    _, err := c.db.ExecContext(ctx, "SELECT 1")
    return err == nil
}

本章小结

要点说明
C API 两层结构节点管理(dqlite.h)+ 客户端连接(client.h)
参数绑定支持位置参数(?)和命名参数(:name)
事务管理支持标准隔离级别和保存点
批量操作单事务多操作可减少 Raft 复制开销
错误处理区分临时错误(可重试)和永久错误
日志回调支持 DEBUG/INFO/WARN/ERROR 四个级别

下一章

第 6 章:集群搭建与管理 — 学习如何搭建多节点集群、管理节点、处理 Leader 选举和故障转移。