强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

Memcached 完全指南 / 第 16 章:生产最佳实践

第 16 章:生产最佳实践

16.1 缓存策略选型

Cache Aside Pattern(旁路缓存)

最广泛使用的缓存模式,由应用层管理缓存逻辑。

读流程:
1. 读缓存 → 命中 → 返回
2. 未命中 → 读数据库 → 写缓存 → 返回

写流程:
1. 更新数据库
2. 删除缓存(而非更新缓存)
def get_user(user_id):
    """Cache Aside 读取"""
    # 1. 查缓存
    cache_key = f"user:{user_id}"
    data = mc.get(cache_key)
    if data is not None:
        return json.loads(data) if data != "NULL" else None

    # 2. 查数据库
    user = db.query_user(user_id)

    # 3. 写缓存(空值也缓存,防穿透)
    if user:
        mc.set(cache_key, json.dumps(user), time=3600)
    else:
        mc.set(cache_key, "NULL", time=60)

    return user

def update_user(user_id, updates):
    """Cache Aside 更新"""
    # 1. 先更新数据库
    db.update_user(user_id, updates)

    # 2. 再删除缓存(不是更新!)
    mc.delete(f"user:{user_id}")

为什么删除而不是更新缓存?

# ❌ 更新缓存:存在并发问题
def update_user_bad(user_id, updates):
    db.update_user(user_id, updates)
    mc.set(f"user:{user_id}", json.dumps(db.query_user(user_id)))
    # 问题:如果另一个请求在 db.update 和 mc.set 之间读取了旧数据并写入缓存
    # 结果:缓存中存的是旧数据

# ✅ 删除缓存:最终一致性
def update_user_good(user_id, updates):
    db.update_user(user_id, updates)
    mc.delete(f"user:{user_id}")
    # 优势:下次读取时自动从 DB 加载最新数据

Read/Write Through

由缓存层统一管理读写,应用层不直接访问数据库。

class UserService:
    def __init__(self, cache, db):
        self.cache = cache
        self.db = db

    def get_user(self, user_id):
        """Read Through: 缓存层自动加载"""
        return self.cache.get_or_load(
            f"user:{user_id}",
            lambda: self.db.query_user(user_id),
            ttl=3600,
        )

    def update_user(self, user_id, updates):
        """Write Through: 缓存层自动同步"""
        self.db.update_user(user_id, updates)
        self.cache.delete(f"user:{user_id}")

Write Behind(异步写回)

import queue
import threading

class WriteBehindCache:
    def __init__(self, cache, db, batch_size=100, flush_interval=5):
        self.cache = cache
        self.db = db
        self.write_queue = queue.Queue()
        self.batch_size = batch_size
        self.flush_interval = flush_interval

        # 启动后台写入线程
        self._start_writer()

    def _start_writer(self):
        def writer():
            while True:
                batch = []
                deadline = time.time() + self.flush_interval
                while len(batch) < self.batch_size and time.time() < deadline:
                    try:
                        timeout = max(0.1, deadline - time.time())
                        item = self.write_queue.get(timeout=timeout)
                        batch.append(item)
                    except queue.Empty:
                        break
                if batch:
                    self.db.batch_update(batch)

        t = threading.Thread(target=writer, daemon=True)
        t.start()

    def set(self, key, data, ttl=3600):
        self.cache.set(key, json.dumps(data), time=ttl)
        self.write_queue.put({"key": key, "data": data})

    def get(self, key):
        raw = self.cache.get(key)
        return json.loads(raw) if raw else None

16.2 缓存策略选型指南

策略适用场景优点缺点
Cache Aside通用场景简单、灵活应用层逻辑多
Read/Write Through数据访问层封装应用层简洁需要封装层
Write Behind高写入场景写入延迟低可能丢失数据
Refresh Ahead热点数据无冷启动实现复杂
选择建议:

不确定 → Cache Aside ✓
需要统一数据层 → Read/Write Through ✓
写入量大 → Write Behind ✓
热点数据 → Cache Aside + 预热 ✓

16.3 Key 设计规范

命名规范

格式: {业务}:{类型}:{标识}[:{子标识}]

示例:
user:1001                    # 用户信息
user:1001:profile           # 用户资料
product:5001                # 商品信息
product:5001:price          # 商品价格
session:abc123              # 会话
config:db                   # 数据库配置
rank:product:daily          # 排行榜
rate:api:user:1001:20240101 # 限流计数器
lock:order:12345            # 分布式锁

Key 长度控制

# ❌ 不好的 Key
key = f"user_data_cache_for_user_id_{user_id}_with_full_profile_information"

# ✅ 好的 Key
key = f"user:{user_id}:profile"

# ❌ Key 太短,容易冲突
key = str(user_id)  # "1001" —— 可能和其他业务冲突

# ❌ Key 包含特殊字符
key = f"user@{user_id}#info"  # 避免特殊字符

# ✅ 只使用字母、数字、冒号、下划线、短横线
key = f"user:{user_id}:info"

Key 版本管理

# 当数据结构变更时,使用版本号
CACHE_VERSION = 3

def user_key(user_id):
    return f"v{CACHE_VERSION}:user:{user_id}"

# 升级时只需修改 CACHE_VERSION,旧缓存自动失效

Key 分布优化

# ❌ 可能导致热点的 Key 设计
key = f"product:hot"          # 所有请求打到同一节点
key = f"config:global"        # 全局配置,单一 Key

# ✅ 分散热点
key = f"product:hot:{random.randint(0, 9)}"  # 10 个副本分散到不同节点
key = f"config:global:{shard_id}"           # 按分片存储

16.4 容量规划

内存需求计算

#!/usr/bin/env python3
"""Memcached 容量规划计算器"""

def calc_memcached_memory(
    num_items: int,
    avg_key_size: int,
    avg_value_size: int,
    growth_factor: float = 1.25,
    overhead_ratio: float = 1.2,
) -> dict:
    """
    计算 Memcached 内存需求

    参数:
        num_items: 预估 Item 数量
        avg_key_size: 平均 Key 长度(字节)
        avg_value_size: 平均 Value 长度(字节)
        growth_factor: Slab 增长因子
        overhead_ratio: 冗余比例(碎片 + 元数据)
    """
    item_overhead = 48  # Item 头部开销

    # 每个 Item 实际占用
    item_size = item_overhead + avg_key_size + 8 + avg_value_size

    # 选择最接近的 Slab chunk 大小
    chunk_size = 96
    while chunk_size < item_size:
        chunk_size = int(chunk_size * growth_factor)
        chunk_size = (chunk_size + 7) & ~7  # 8 字节对齐

    # 内部碎片率
    fragmentation = (chunk_size - item_size) / chunk_size * 100

    # 总内存需求
    raw_memory = num_items * chunk_size
    total_memory = raw_memory * overhead_ratio

    # 转换为 MB
    total_mb = total_memory / (1024 * 1024)

    # 推荐配置(留 20% 余量)
    recommended_mb = int(total_mb * 1.2 / 64 + 1) * 64  # 向上取整到 64MB

    return {
        "item_size": item_size,
        "chunk_size": chunk_size,
        "fragmentation_pct": round(fragmentation, 1),
        "raw_memory_mb": round(total_mb, 1),
        "recommended_mb": recommended_mb,
    }

# 示例计算
result = calc_memcached_memory(
    num_items=1_000_000,
    avg_key_size=30,
    avg_value_size=200,
)
print(f"预估 Item 数量: 1,000,000")
print(f"平均 Item 大小: {result['item_size']}B")
print(f"Slab Chunk 大小: {result['chunk_size']}B")
print(f"内部碎片率: {result['fragmentation_pct']}%")
print(f"原始内存需求: {result['raw_memory_mb']}MB")
print(f"推荐配置 (-m): {result['recommended_mb']}MB")

连接数规划

连接数规划:

1. 统计应用实例数: N
2. 每个实例的连接池大小: C
3. 预留 20% 余量

推荐连接数 = N × C × 1.2

示例:
  10 个应用实例,每个 50 个连接
  推荐: 10 × 50 × 1.2 = 600
  配置: -c 1000(留更大余量)

节点数规划

节点数规划:

1. 总内存需求: M MB
2. 单节点内存: m MB(推荐 4-16GB)
3. 节点数: N = ceil(M / m)

示例:
  总需求 64GB,单节点 8GB
  节点数: 64 / 8 = 8 台

冗余考虑:
  如果需要故障时容量仍足够 → 额外 1-2 台
  8 + 2 = 10 台

16.5 缓存预热

冷启动问题

场景: 服务重启 / 新增节点
问题: 缓存为空,所有请求都回源数据库
结果: 数据库压力激增,可能被压垮

预热策略

class CacheWarmer:
    """缓存预热器"""

    def __init__(self, cache, db):
        self.cache = cache
        self.db = db

    def warmup(self, keys, loader, ttl=3600, batch_size=100):
        """批量预热"""
        total = len(keys)
        loaded = 0

        for i in range(0, total, batch_size):
            batch_keys = keys[i:i + batch_size]
            # 检查哪些 Key 已在缓存中
            existing = self.cache.get_multi(batch_keys)
            missing = [k for k in batch_keys if k not in existing]

            if missing:
                # 从数据库加载
                items = loader(missing)
                self.cache.set_multi(items, ttl=ttl)
                loaded += len(missing)

            progress = min(i + batch_size, total)
            print(f"预热进度: {progress}/{total}, 新加载: {loaded}")

    def warmup_hot_data(self, top_n=10000):
        """预热热点数据"""
        # 从数据库加载最热门的数据
        hot_items = self.db.query(
            "SELECT * FROM products ORDER BY view_count DESC LIMIT %s",
            (top_n,)
        )

        items = {f"product:{p['id']}": json.dumps(p) for p in hot_items}
        self.cache.set_multi(items, ttl=7200)
        print(f"预热完成: {len(items)} 条热点数据")


# 使用
warmer = CacheWarmer(mc, db)

# 服务启动时预热
if is_startup:
    hot_ids = db.query("SELECT id FROM users WHERE is_vip = true")
    keys = [f"user:{u['id']}" for u in hot_ids]
    warmer.warmup(keys, lambda ids: db.batch_get_users(ids))

16.6 缓存穿透/击穿/雪崩防护

综合防护方案

class ResilientCache:
    """带防护的缓存客户端"""

    def __init__(self, mc, db):
        self.mc = mc
        self.db = db
        self._locks = {}

    def get_user(self, user_id):
        cache_key = f"user:{user_id}"

        # 1. 查缓存
        data = self.mc.get(cache_key)
        if data is not None:
            if data == b"NULL":
                return None  # 空值缓存
            return json.loads(data)

        # 2. 布隆过滤器检查(如有)
        # if not bloom_filter.exists(cache_key):
        #     return None

        # 3. 获取分布式锁(防击穿)
        lock_key = f"lock:{cache_key}"
        if not self.mc.add(lock_key, "1", time=10):
            # 等待其他进程加载
            import time
            for _ in range(50):
                time.sleep(0.1)
                data = self.mc.get(cache_key)
                if data:
                    return json.loads(data) if data != b"NULL" else None
            # 超时直接查询
            return self._load_and_cache(user_id, cache_key)

        try:
            return self._load_and_cache(user_id, cache_key)
        finally:
            self.mc.delete(lock_key)

    def _load_and_cache(self, user_id, cache_key):
        user = self.db.query_user(user_id)
        if user:
            # 使用随机 TTL 防雪崩
            import random
            jitter = random.randint(-300, 300)
            ttl = max(60, 3600 + jitter)
            self.mc.set(cache_key, json.dumps(user), time=ttl)
        else:
            # 缓存空值(防穿透)
            self.mc.set(cache_key, "NULL", time=60)
        return user

16.7 故障处理

客户端降级

class DegradeableCache:
    """支持降级的缓存客户端"""

    def __init__(self, mc, db):
        self.mc = mc
        self.db = db
        self._healthy = True
        self._fail_count = 0
        self._fail_threshold = 5

    def get_with_fallback(self, key, loader, ttl=3600):
        """缓存读取,失败时降级到数据库"""
        if not self._healthy:
            return loader()

        try:
            data = self.mc.get(key)
            if data:
                return json.loads(data)
        except Exception as e:
            self._record_failure()
            logger.warning(f"Cache read failed: {e}")

        # 缓存未命中或失败,回源
        data = loader()

        # 尝试回写缓存
        try:
            self.mc.set(key, json.dumps(data), time=ttl)
        except Exception:
            pass

        return data

    def _record_failure(self):
        self._fail_count += 1
        if self._fail_count >= self._fail_threshold:
            self._healthy = False
            logger.error("Cache marked as unhealthy, falling back to DB")
            # 启动恢复检查
            threading.Timer(30, self._check_recovery).start()

    def _check_recovery(self):
        try:
            self.mc.set("__health_check__", "1", time=10)
            self._healthy = True
            self._fail_count = 0
            logger.info("Cache recovered")
        except Exception:
            threading.Timer(30, self._check_recovery).start()

灰度发布

def get_with_shadow_cache(user_id):
    """灰度对比新旧缓存"""
    old_key = f"user:{user_id}"
    new_key = f"v2:user:{user_id}"

    old_data = old_mc.get(old_key)

    if feature_flag.is_enabled("use_new_cache", user_id):
        new_data = new_mc.get(new_key)
        if new_data != old_data:
            logger.warning(f"Cache mismatch: user={user_id}")
        return json.loads(new_data) if new_data else json.loads(old_data)

    return json.loads(old_data)

16.8 团队协作规范

缓存 Key 注册表

# cache_keys.py — 集中管理所有缓存 Key

class CacheKeys:
    """缓存 Key 定义"""

    # 用户相关
    USER = "user:{user_id}"                    # TTL: 1h
    USER_PROFILE = "user:{user_id}:profile"    # TTL: 30min
    USER_SESSION = "session:{token}"           # TTL: 30min

    # 商品相关
    PRODUCT = "product:{product_id}"            # TTL: 1h
    PRODUCT_PRICE = "product:{product_id}:price"  # TTL: 5min
    PRODUCT_RANK = "rank:product:daily"         # TTL: 1min

    # 配置相关
    CONFIG = "config:{key}"                     # TTL: 5min
    FEATURE_FLAG = "flag:{feature}"             # TTL: 1min

    # 限流
    RATE_LIMIT = "rate:{service}:{user}:{window}"  # TTL: 1min

    # 分布式锁
    LOCK = "lock:{resource}:{id}"               # TTL: 10s

    @classmethod
    def user_key(cls, user_id):
        return cls.USER.format(user_id=user_id)

缓存使用规范

# Memcached 使用规范

## 1. Key 设计
- 使用 `{业务}:{类型}:{ID}` 格式
- Key 长度 10-60 字节
- 只使用字母、数字、冒号、下划线
- 所有 Key 必须在 `CacheKeys` 中注册

## 2. TTL 设置
- 默认 TTL: 3600 秒(1 小时)
- 最短 TTL: 60 秒
- 最长 TTL: 86400 秒(1 天)
- 配置类: 300 秒(5 分钟)
- 热点数据: 60 秒(1 分钟)

## 3. Value 大小
- 推荐: < 10KB
- 最大: 500KB
- 超过 100KB 考虑压缩
- 超过 500KB 考虑拆分

## 4. 序列化
- 使用 JSON(可读性好)
- 超过 100 个字段使用 MessagePack
- Value 中包含版本号

## 5. 缓存策略
- 读: Cache Aside
- 写: 先更新 DB,再删除缓存
- 禁止: 缓存和 DB 双写(无事务保证)

## 6. 监控告警
- 命中率 < 80% 告警
- 内存 > 90% 告警
- 淘汰 > 0 告警

16.9 生产检查清单

部署前检查

#!/bin/bash
echo "=== Memcached 生产部署检查 ==="

# 1. 安全检查
echo "1. 安全配置"
echo "  - 监听地址: $(echo 'stats settings' | nc localhost 11211 | grep inter)"
echo "  - UDP 端口: $(echo 'stats settings' | nc localhost 11211 | grep udpport)"

# 2. 性能配置
echo "2. 性能配置"
echo "  - 线程数: $(echo 'stats settings' | nc localhost 11211 | grep num_threads)"
echo "  - 最大连接: $(echo 'stats settings' | nc localhost 11211 | grep maxconns)"
echo "  - 最大内存: $(echo 'stats settings' | nc localhost 11211 | grep maxbytes)"

# 3. LRU 配置
echo "3. LRU 配置"
echo "  - lru_maintainer: $(echo 'stats settings' | nc localhost 11211 | grep lru_maintainer)"
echo "  - slab_automove: $(echo 'stats settings' | nc localhost 11211 | grep slab_automove)"
echo "  - lru_crawler: $(echo 'stats settings' | nc localhost 11211 | grep lru_crawler)"

# 4. 连接验证
echo "4. 连接验证"
echo "  - 当前连接: $(echo 'stats' | nc localhost 11211 | grep curr_connections)"

echo "=== 检查完成 ==="

16.10 全教程总结

Memcached 核心要点回顾:

1. 简单: 只做缓存,做到极致
2. 内存: Slab 分配器,无碎片
3. 快速: O(1) 查找,多线程,事件驱动
4. 分布: 客户端一致性哈希分片
5. 淘汰: 四级 LRU,TEMP 独立淘汰
6. 协议: Meta 协议最高效
7. 安全: 内网隔离 + SASL + TLS
8. 监控: Prometheus + 命中率告警
9. 策略: Cache Aside + 缓存空值 + 随机 TTL
10. Key: 简洁、有结构、版本化

扩展阅读

小结

要点内容
缓存策略Cache Aside(通用)、Write Behind(高写入)
Key 规范{业务}:{类型}:{ID},10-60 字节
容量规划内存 = Item数 × chunk大小 × 1.2
防护体系缓存空值 + 分布式锁 + 随机 TTL + 降级
团队协作Key 注册表 + 使用规范 + 告警规则