强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

LevelDB 完全指南 / 第 12 章 · 数据复制

第 12 章 · 数据复制

12.1 LevelDB 为什么没有内置复制

LevelDB 定位是嵌入式存储引擎,不提供网络功能:

特性LevelDBetcdTiKVCockroachDB
存储引擎底层使用底层使用(RocksDB)底层使用(Pebble)
网络协议gRPCgRPCSQL + gRPC
复制RaftRaftRaft
分片

💡 提示:如果你需要分布式 KV 存储,应该直接使用 etcd 或 TiKV。本章讨论的是基于 LevelDB 自行构建复制的方案。


12.2 手动复制方案

方案一:WAL 流复制

┌──────────┐    WAL 日志    ┌──────────┐
│ Primary  │───────────────→│ Replica  │
│ LevelDB  │                │ LevelDB  │
└──────────┘                └──────────┘

实现思路

// Primary 端:拦截写入,转发 WAL 到 Replica
class ReplicatedDB {
public:
    bool Put(const std::string& key, const std::string& value) {
        // 1. 写入本地
        leveldb::Status s = db_->Put(wopts_, key, value);
        if (!s.ok()) return false;

        // 2. 发送到 Replica(简化示意)
        std::string wal_entry = SerializeWALEntry(key, value);
        SendToReplica(wal_entry);
        return true;
    }

private:
    void SendToReplica(const std::string& entry) {
        // 通过 TCP/gRPC 发送到 Replica
        // 需要处理:网络超时、重试、顺序保证
    }
};

问题

  • 无法保证 Primary 和 Replica 的严格一致
  • 网络分区时的数据一致性难以处理
  • 没有自动故障转移

方案二:Snapshot + 增量复制

┌──────────┐   Snapshot文件   ┌──────────┐
│ Primary  │─────────────────→│ Replica  │
│ LevelDB  │                  │ LevelDB  │
└────┬─────┘                  └──────────┘
     │
     │  增量数据
     └──────────────────────────────────────→

实现步骤

// 1. 初始同步:复制整个数据库
void InitialSync(leveldb::DB* primary, leveldb::DB* replica) {
    const leveldb::Snapshot* snap = primary->GetSnapshot();
    leveldb::ReadOptions ropts;
    ropts.snapshot = snap;

    leveldb::WriteBatch batch;
    auto* it = primary->NewIterator(ropts);
    int count = 0;
    for (it->SeekToFirst(); it->Valid(); it->Next()) {
        batch.Put(it->key(), it->value());
        if (++count % 10000 == 0) {
            replica->Write(leveldb::WriteOptions(), &batch);
            batch = leveldb::WriteBatch();
        }
    }
    if (count % 10000 != 0) {
        replica->Write(leveldb::WriteOptions(), &batch);
    }

    delete it;
    primary->ReleaseSnapshot(snap);
}

// 2. 增量同步:记录写入日志
class WriteLogger {
public:
    void Log(const std::string& op, const std::string& key,
             const std::string& value) {
        std::lock_guard<std::mutex> lock(mu_);
        LogEntry entry{op, key, value, sequence_++};
        entries_.push_back(entry);
        // 持久化到文件或发送到消息队列
    }

    std::vector<LogEntry> GetEntriesSince(uint64_t seq) {
        std::lock_guard<std::mutex> lock(mu_);
        return {entries_.begin() + seq, entries_.end()};
    }

private:
    std::mutex mu_;
    uint64_t sequence_ = 0;
    std::vector<LogEntry> entries_;
};

方案三:应用层双写

// 简单但不推荐(不保证一致性)
bool DoubleWrite(leveldb::DB* db1, leveldb::DB* db2,
                 const std::string& key, const std::string& value) {
    leveldb::Status s1 = db1->Put(wopts, key, value);
    leveldb::Status s2 = db2->Put(wopts, key, value);
    return s1.ok() && s2.ok();
    // 问题:db1 成功但 db2 失败怎么办?
}

12.3 Raft 集成

架构设计

┌─────────────────────────────────────────────────┐
│                 Raft Consensus Layer              │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐      │
│  │  Node 1  │  │  Node 2  │  │  Node 3  │      │
│  │ (Leader) │←→│(Follower)│←→│(Follower)│      │
│  └────┬─────┘  └────┬─────┘  └────┬─────┘      │
│       │             │             │              │
└───────┼─────────────┼─────────────┼──────────────┘
        │             │             │
        ▼             ▼             ▼
   ┌─────────┐  ┌─────────┐  ┌─────────┐
   │ LevelDB │  │ LevelDB │  │ LevelDB │
   └─────────┘  └─────────┘  └─────────┘

Raft 状态机实现

// Raft 状态机接口
class LevelDBStateMachine : public raft::StateMachine {
public:
    LevelDBStateMachine(const std::string& db_path) {
        leveldb::Options opts;
        opts.create_if_missing = true;
        leveldb::DB::Open(opts, db_path, &db_);
    }

    // 应用已提交的日志条目
    void Apply(const raft::LogEntry& entry) override {
        // 解析命令
        Command cmd = Deserialize(entry.data());

        switch (cmd.type) {
            case Command::PUT:
                db_->Put(leveldb::WriteOptions(), cmd.key, cmd.value);
                break;
            case Command::DELETE:
                db_->Delete(leveldb::WriteOptions(), cmd.key);
                break;
            case Command::BATCH:
                db_->Write(leveldb::WriteOptions(), &cmd.batch);
                break;
        }
    }

    // 创建快照
    raft::Snapshot* TakeSnapshot() override {
        const leveldb::Snapshot* snap = db_->GetSnapshot();
        // 序列化所有数据...
        return new LevelDBSnapshot(snap);
    }

    // 恢复快照
    void RestoreSnapshot(raft::Snapshot* snapshot) override {
        // 从快照恢复数据到 LevelDB...
    }

private:
    leveldb::DB* db_;
};

使用 hashicorp/raft 库(Go 示例)

package main

import (
    "encoding/json"
    "io"
    "net"

    "github.com/hashicorp/raft"
    "github.com/hashicorp/raft-boltdb"
    "github.com/syndtr/goleveldb/leveldb"
)

type LevelDBFSM struct {
    db *leveldb.DB
}

func (fsm *LevelDBFSM) Apply(l *raft.Log) interface{} {
    var cmd struct {
        Op    string `json:"op"`
        Key   string `json:"key"`
        Value string `json:"value,omitempty"`
    }
    json.Unmarshal(l.Data, &cmd)

    switch cmd.Op {
    case "put":
        return fsm.db.Put([]byte(cmd.Key), []byte(cmd.Value), nil)
    case "delete":
        return fsm.db.Delete([]byte(cmd.Key), nil)
    }
    return nil
}

func (fsm *LevelDBFSM) Snapshot() (raft.FSMSnapshot, error) {
    // 创建一致性快照
    snap, _ := fsm.db.GetSnapshot()
    return &LevelDBSnapshot{snap: snap}, nil
}

func (fsm *LevelDBFSM) Restore(rc io.ReadCloser) error {
    // 从快照恢复
    // ...
    return nil
}

func main() {
    db, _ := leveldb.OpenFile("/data/raft-node-1", nil)
    fsm := &LevelDBFSM{db: db}

    config := raft.DefaultConfig()
    config.LocalID = raft.ServerID("node1")

    addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:12000")
    transport, _ := raft.NewTCPTransport(addr.String(), addr, 3, 10*time.Second, os.Stderr)

    logStore, _ := raftboltdb.NewBoltStore("/data/raft-log-1")
    stableStore, _ := raftboltdb.NewBoltStore("/data/raft-stable-1")
    snapshotStore, _ := raft.NewFileSnapshotStore("/data/raft-snap-1", 3, os.Stderr)

    r, _ := raft.NewRaft(config, fsm, logStore, stableStore, snapshotStore, transport)
    // ...
}

12.4 主从复制的挑战

挑战一:一致性保证

一致性模型实现复杂度性能影响数据丢失风险
异步复制最小高(主宕机丢数据)
半同步复制中等
强一致性(Raft)较大无(多数派确认)

挑战二:冲突解决

场景:网络分区后恢复

  Primary:  PUT("key1", "value_v2")  (Seq=100)
  Replica:  PUT("key1", "value_v3")  (Seq=100)

  如何合并?两种策略:
  1. Last-Writer-Wins (LWW):按时间戳选最新
  2. 版本向量:检测冲突,交由应用层解决

挑战三:Compaction 协调

问题:Primary 和 Replica 的 Compaction 时机不同
  - Primary 可能已经 Compaction 清理了 Tombstone
  - Replica 还没有 Compaction,保留着 Tombstone
  - 增量同步时,数据可能出现不一致

12.5 实际案例

Bitcoin Core 的 LevelDB 复制

Bitcoin Core 不做实时复制,而是通过:
  1. 全节点:从创世区块重放所有交易
  2. 区块头同步:SPV 节点只同步区块头
  3. IBD (Initial Block Download):从其他节点下载区块
  
  LevelDB 存储的是本地状态,不跨节点复制。

Ethereum 的 LevelDB 复制

Geth (Go-Ethereum) 使用 LevelDB 存储:
  - 状态树 (State Trie)
  - 区块链数据
  - 交易索引
  
  复制通过 P2P 协议同步区块,然后在本地重放。
  每个节点独立维护自己的 LevelDB 实例。

12.6 注意事项

⚠️ 警告说明
LevelDB 不是分布式数据库不要期望它提供内置复制
手动复制的一致性难以保证建议使用现成的分布式 KV(etcd/TiKV)
Raft 集成需要额外的存储Raft 日志本身也需要持久化
网络延迟影响写入性能强一致性写入需要等待多数派确认

12.7 本章小结

方案复杂度一致性适用场景
应用层双写简单容灾
WAL 流复制主从热备
Snapshot + 增量离线同步
Raft 集成生产级分布式

扩展阅读

  1. Raft 论文:Ongaro, D., Ousterhout, J. “In Search of an Understandable Consensus Algorithm” (2014)
  2. etcd 架构etcd.io
  3. TiKV 架构tikv.org
  4. hashicorp/raftGitHub

第 11 章 · 性能基准测试 | 第 13 章 · Docker 部署