LevelDB 完全指南 / 第 12 章 · 数据复制
第 12 章 · 数据复制
12.1 LevelDB 为什么没有内置复制
LevelDB 定位是嵌入式存储引擎,不提供网络功能:
| 特性 | LevelDB | etcd | TiKV | CockroachDB |
|---|---|---|---|---|
| 存储引擎 | ✅ | 底层使用 | 底层使用(RocksDB) | 底层使用(Pebble) |
| 网络协议 | ❌ | gRPC | gRPC | SQL + gRPC |
| 复制 | ❌ | Raft | Raft | Raft |
| 分片 | ❌ | ❌ | ✅ | ✅ |
💡 提示:如果你需要分布式 KV 存储,应该直接使用 etcd 或 TiKV。本章讨论的是基于 LevelDB 自行构建复制的方案。
12.2 手动复制方案
方案一:WAL 流复制
┌──────────┐ WAL 日志 ┌──────────┐
│ Primary │───────────────→│ Replica │
│ LevelDB │ │ LevelDB │
└──────────┘ └──────────┘
实现思路:
// Primary 端:拦截写入,转发 WAL 到 Replica
class ReplicatedDB {
public:
bool Put(const std::string& key, const std::string& value) {
// 1. 写入本地
leveldb::Status s = db_->Put(wopts_, key, value);
if (!s.ok()) return false;
// 2. 发送到 Replica(简化示意)
std::string wal_entry = SerializeWALEntry(key, value);
SendToReplica(wal_entry);
return true;
}
private:
void SendToReplica(const std::string& entry) {
// 通过 TCP/gRPC 发送到 Replica
// 需要处理:网络超时、重试、顺序保证
}
};
问题:
- 无法保证 Primary 和 Replica 的严格一致
- 网络分区时的数据一致性难以处理
- 没有自动故障转移
方案二:Snapshot + 增量复制
┌──────────┐ Snapshot文件 ┌──────────┐
│ Primary │─────────────────→│ Replica │
│ LevelDB │ │ LevelDB │
└────┬─────┘ └──────────┘
│
│ 增量数据
└──────────────────────────────────────→
实现步骤:
// 1. 初始同步:复制整个数据库
void InitialSync(leveldb::DB* primary, leveldb::DB* replica) {
const leveldb::Snapshot* snap = primary->GetSnapshot();
leveldb::ReadOptions ropts;
ropts.snapshot = snap;
leveldb::WriteBatch batch;
auto* it = primary->NewIterator(ropts);
int count = 0;
for (it->SeekToFirst(); it->Valid(); it->Next()) {
batch.Put(it->key(), it->value());
if (++count % 10000 == 0) {
replica->Write(leveldb::WriteOptions(), &batch);
batch = leveldb::WriteBatch();
}
}
if (count % 10000 != 0) {
replica->Write(leveldb::WriteOptions(), &batch);
}
delete it;
primary->ReleaseSnapshot(snap);
}
// 2. 增量同步:记录写入日志
class WriteLogger {
public:
void Log(const std::string& op, const std::string& key,
const std::string& value) {
std::lock_guard<std::mutex> lock(mu_);
LogEntry entry{op, key, value, sequence_++};
entries_.push_back(entry);
// 持久化到文件或发送到消息队列
}
std::vector<LogEntry> GetEntriesSince(uint64_t seq) {
std::lock_guard<std::mutex> lock(mu_);
return {entries_.begin() + seq, entries_.end()};
}
private:
std::mutex mu_;
uint64_t sequence_ = 0;
std::vector<LogEntry> entries_;
};
方案三:应用层双写
// 简单但不推荐(不保证一致性)
bool DoubleWrite(leveldb::DB* db1, leveldb::DB* db2,
const std::string& key, const std::string& value) {
leveldb::Status s1 = db1->Put(wopts, key, value);
leveldb::Status s2 = db2->Put(wopts, key, value);
return s1.ok() && s2.ok();
// 问题:db1 成功但 db2 失败怎么办?
}
12.3 Raft 集成
架构设计
┌─────────────────────────────────────────────────┐
│ Raft Consensus Layer │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Node 1 │ │ Node 2 │ │ Node 3 │ │
│ │ (Leader) │←→│(Follower)│←→│(Follower)│ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │
└───────┼─────────────┼─────────────┼──────────────┘
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ LevelDB │ │ LevelDB │ │ LevelDB │
└─────────┘ └─────────┘ └─────────┘
Raft 状态机实现
// Raft 状态机接口
class LevelDBStateMachine : public raft::StateMachine {
public:
LevelDBStateMachine(const std::string& db_path) {
leveldb::Options opts;
opts.create_if_missing = true;
leveldb::DB::Open(opts, db_path, &db_);
}
// 应用已提交的日志条目
void Apply(const raft::LogEntry& entry) override {
// 解析命令
Command cmd = Deserialize(entry.data());
switch (cmd.type) {
case Command::PUT:
db_->Put(leveldb::WriteOptions(), cmd.key, cmd.value);
break;
case Command::DELETE:
db_->Delete(leveldb::WriteOptions(), cmd.key);
break;
case Command::BATCH:
db_->Write(leveldb::WriteOptions(), &cmd.batch);
break;
}
}
// 创建快照
raft::Snapshot* TakeSnapshot() override {
const leveldb::Snapshot* snap = db_->GetSnapshot();
// 序列化所有数据...
return new LevelDBSnapshot(snap);
}
// 恢复快照
void RestoreSnapshot(raft::Snapshot* snapshot) override {
// 从快照恢复数据到 LevelDB...
}
private:
leveldb::DB* db_;
};
使用 hashicorp/raft 库(Go 示例)
package main
import (
"encoding/json"
"io"
"net"
"github.com/hashicorp/raft"
"github.com/hashicorp/raft-boltdb"
"github.com/syndtr/goleveldb/leveldb"
)
type LevelDBFSM struct {
db *leveldb.DB
}
func (fsm *LevelDBFSM) Apply(l *raft.Log) interface{} {
var cmd struct {
Op string `json:"op"`
Key string `json:"key"`
Value string `json:"value,omitempty"`
}
json.Unmarshal(l.Data, &cmd)
switch cmd.Op {
case "put":
return fsm.db.Put([]byte(cmd.Key), []byte(cmd.Value), nil)
case "delete":
return fsm.db.Delete([]byte(cmd.Key), nil)
}
return nil
}
func (fsm *LevelDBFSM) Snapshot() (raft.FSMSnapshot, error) {
// 创建一致性快照
snap, _ := fsm.db.GetSnapshot()
return &LevelDBSnapshot{snap: snap}, nil
}
func (fsm *LevelDBFSM) Restore(rc io.ReadCloser) error {
// 从快照恢复
// ...
return nil
}
func main() {
db, _ := leveldb.OpenFile("/data/raft-node-1", nil)
fsm := &LevelDBFSM{db: db}
config := raft.DefaultConfig()
config.LocalID = raft.ServerID("node1")
addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:12000")
transport, _ := raft.NewTCPTransport(addr.String(), addr, 3, 10*time.Second, os.Stderr)
logStore, _ := raftboltdb.NewBoltStore("/data/raft-log-1")
stableStore, _ := raftboltdb.NewBoltStore("/data/raft-stable-1")
snapshotStore, _ := raft.NewFileSnapshotStore("/data/raft-snap-1", 3, os.Stderr)
r, _ := raft.NewRaft(config, fsm, logStore, stableStore, snapshotStore, transport)
// ...
}
12.4 主从复制的挑战
挑战一:一致性保证
| 一致性模型 | 实现复杂度 | 性能影响 | 数据丢失风险 |
|---|---|---|---|
| 异步复制 | 低 | 最小 | 高(主宕机丢数据) |
| 半同步复制 | 中 | 中等 | 低 |
| 强一致性(Raft) | 高 | 较大 | 无(多数派确认) |
挑战二:冲突解决
场景:网络分区后恢复
Primary: PUT("key1", "value_v2") (Seq=100)
Replica: PUT("key1", "value_v3") (Seq=100)
如何合并?两种策略:
1. Last-Writer-Wins (LWW):按时间戳选最新
2. 版本向量:检测冲突,交由应用层解决
挑战三:Compaction 协调
问题:Primary 和 Replica 的 Compaction 时机不同
- Primary 可能已经 Compaction 清理了 Tombstone
- Replica 还没有 Compaction,保留着 Tombstone
- 增量同步时,数据可能出现不一致
12.5 实际案例
Bitcoin Core 的 LevelDB 复制
Bitcoin Core 不做实时复制,而是通过:
1. 全节点:从创世区块重放所有交易
2. 区块头同步:SPV 节点只同步区块头
3. IBD (Initial Block Download):从其他节点下载区块
LevelDB 存储的是本地状态,不跨节点复制。
Ethereum 的 LevelDB 复制
Geth (Go-Ethereum) 使用 LevelDB 存储:
- 状态树 (State Trie)
- 区块链数据
- 交易索引
复制通过 P2P 协议同步区块,然后在本地重放。
每个节点独立维护自己的 LevelDB 实例。
12.6 注意事项
| ⚠️ 警告 | 说明 |
|---|---|
| LevelDB 不是分布式数据库 | 不要期望它提供内置复制 |
| 手动复制的一致性难以保证 | 建议使用现成的分布式 KV(etcd/TiKV) |
| Raft 集成需要额外的存储 | Raft 日志本身也需要持久化 |
| 网络延迟影响写入性能 | 强一致性写入需要等待多数派确认 |
12.7 本章小结
| 方案 | 复杂度 | 一致性 | 适用场景 |
|---|---|---|---|
| 应用层双写 | 低 | 弱 | 简单容灾 |
| WAL 流复制 | 中 | 中 | 主从热备 |
| Snapshot + 增量 | 中 | 中 | 离线同步 |
| Raft 集成 | 高 | 强 | 生产级分布式 |
扩展阅读
- Raft 论文:Ongaro, D., Ousterhout, J. “In Search of an Understandable Consensus Algorithm” (2014)
- etcd 架构:etcd.io
- TiKV 架构:tikv.org
- hashicorp/raft:GitHub