Memcached 传输协议精讲 / 第10章 最佳实践
第10章 最佳实践
从协议选择到安全加固,汇总 Memcached 在生产环境中的核心最佳实践。
10.1 协议选择指南
决策流程图
开始选择协议
│
├── 需要 telnet 调试?
│ └── 是 → 文本协议
│
├── 需要 Meta 协议特性(CAS 删除、惰性失效等)?
│ └── 是 → Meta 协议(1.6+)
│
├── 客户端库已支持二进制协议?
│ └── 是 → 二进制协议
│
└── 默认 → 文本协议(最广泛支持)
协议选择对比表
| 场景 | 推荐协议 | 理由 |
|---|
| Web 应用缓存 | 文本协议 | 简单可靠,客户端成熟 |
| 高性能微服务 | 二进制协议 | 解析效率高 |
| 需要 CAS 删除 | Meta 协议 | 原生支持 |
| 需要惰性失效 | Meta 协议 | 原生支持 |
| 运维调试 | 文本协议 | telnet 直接操作 |
| 批量元数据查询 | Meta 协议 | 单次往返获取多项元数据 |
| 客户端库开发 | 文本协议 + Meta | 双协议支持 |
10.2 客户端开发规范
协议解析要点
# 1. 严格使用 CRLF
LINE_TERMINATOR = b"\r\n"
# 2. 精确读取数据块长度
def read_value(sock, length: int) -> bytes:
data = b""
while len(data) < length:
chunk = sock.recv(length - len(data))
if not chunk:
raise ConnectionError("连接断开")
data += chunk
# 读取尾部 CRLF
crlf = sock.recv(2)
assert crlf == b"\r\n", f"期望 CRLF,收到 {crlf!r}"
return data
# 3. 正确处理部分读取
def recv_until(sock, marker: bytes, buffer_size: int = 4096) -> bytes:
buf = b""
while marker not in buf:
chunk = sock.recv(buffer_size)
if not chunk:
raise ConnectionError("连接断开")
buf += chunk
return buf
# 4. 验证响应格式
def validate_response(line: str, expected: list[str]):
if line not in expected:
raise ProtocolError(f"期望 {expected},收到: {line!r}")
class ProtocolError(Exception):
pass
Key 设计规范
| 规则 | 说明 | 示例 |
|---|
| 最长 250 字节 | 超长 key 会被截断或报错 | user:1001 |
| 无空白字符 | 空格、\r、\n 不允许 | session:abc123 |
| 使用冒号分隔 | 便于阅读和分类 | product:1001:detail |
| 包含版本号 | 缓存失效时更新版本 | v2:user:1001 |
| 命名空间前缀 | 避免 key 冲突 | myapp:user:1001 |
Key 设计示例
class CacheKeyBuilder:
"""缓存 Key 构建器"""
def __init__(self, app: str, version: str = "v1"):
self.app = app
self.version = version
def build(self, *parts: str) -> str:
key = f"{self.app}:{self.version}:" + ":".join(str(p) for p in parts)
if len(key.encode()) > 250:
raise ValueError(f"Key 过长: {len(key.encode())} bytes")
return key
def user(self, user_id: int) -> str:
return self.build("user", str(user_id))
def session(self, session_id: str) -> str:
return self.build("session", session_id)
def product(self, product_id: int, field: str = "detail") -> str:
return self.build("product", str(product_id), field)
# 使用
kb = CacheKeyBuilder("myapp", "v2")
print(kb.user(1001)) # myapp:v2:user:1001
print(kb.session("abc123")) # myapp:v2:session:abc123
print(kb.product(2001)) # myapp:v2:product:2001:detail
10.3 缓存策略
Cache-Aside(旁路缓存)
最常用的缓存模式,读写都不经过缓存,由应用层控制:
class CacheAsidePattern:
def __init__(self, cache_client, db_client):
self.cache = cache_client
self.db = db_client
def get(self, key: str, db_query_fn, ttl: int = 300):
"""
1. 先查缓存
2. 未命中则查数据库
3. 查询结果写入缓存
"""
# 1. 查缓存
cached = self.cache.get(key)
if cached is not None:
return json.loads(cached)
# 2. 查数据库
data = db_query_fn()
if data is None:
return None
# 3. 写缓存(设置合理的 TTL)
self.cache.set(key, json.dumps(data).encode(), exptime=ttl)
return data
def update(self, key: str, data: dict, db_update_fn, ttl: int = 300):
"""
1. 先更新数据库
2. 再更新缓存(或删除缓存)
"""
# 1. 更新数据库
db_update_fn(data)
# 2. 更新缓存(或者删除缓存,让下次读取时重新加载)
self.cache.set(key, json.dumps(data).encode(), exptime=ttl)
# 或者: self.cache.delete(key)
缓存穿透防护
class PenetrationProtection:
"""防止缓存穿透"""
def __init__(self, cache_client):
self.cache = cache_client
def get_or_set_default(self, key: str, db_query_fn,
ttl: int = 300, null_ttl: int = 60):
"""
查询数据库,如果结果为空则缓存空值(短 TTL)
防止大量不存在的 key 穿透到数据库
"""
cached = self.cache.get(key)
if cached is not None:
if cached == b"__NULL__":
return None
return json.loads(cached)
data = db_query_fn()
if data is None:
# 缓存空值,短 TTL
self.cache.set(key, b"__NULL__", exptime=null_ttl)
return None
self.cache.set(key, json.dumps(data).encode(), exptime=ttl)
return data
缓存击穿防护(互斥锁)
import time
import threading
class BreakdownProtection:
"""防止缓存击穿(热点 key 过期时大量请求穿透)"""
def __init__(self, cache_client):
self.cache = cache_client
self.local_locks = {}
self.lock = threading.Lock()
def get_with_lock(self, key: str, db_query_fn,
ttl: int = 300, lock_ttl: int = 10):
cached = self.cache.get(key)
if cached is not None:
return json.loads(cached)
# 使用本地锁 + 分布式锁防止击穿
with self.lock:
if key not in self.local_locks:
self.local_locks[key] = threading.Lock()
local_lock = self.local_locks[key]
# 尝试获取分布式锁
lock_key = f"lock:{key}"
lock_data = str(time.time()).encode()
# 使用 add 实现分布式锁
cmd = f"add {lock_key} 0 {lock_ttl} {len(lock_data)}\r\n"
self.cache.set(lock_key, lock_data, exptime=lock_ttl) # 简化版
with local_lock:
# 双重检查
cached = self.cache.get(key)
if cached is not None:
return json.loads(cached)
# 查询数据库
data = db_query_fn()
if data is not None:
self.cache.set(key, json.dumps(data).encode(), exptime=ttl)
return data
缓存雪崩防护
import random
class AvalancheProtection:
"""防止缓存雪崩(大量 key 同时过期)"""
def set_with_jitter(self, cache, key: str, value: bytes,
base_ttl: int = 300, jitter: int = 60):
"""
给 TTL 添加随机抖动,避免大量 key 同时过期
"""
actual_ttl = base_ttl + random.randint(0, jitter)
cache.set(key, value, exptime=actual_ttl)
10.4 性能优化
1. 批量操作
# ❌ 低效:逐个获取
for key in keys:
value = client.get(key)
# ✅ 高效:批量获取
values = client.get_multi(keys)
2. Noreply 模式
# ✅ 批量写入使用 noreply
for key, value in items.items():
client.set(key, value, noreply=True)
3. 管道化
# ✅ 管道化发送
sock.sendall(b"set k1 0 0 5\r\nhello\r\n")
sock.sendall(b"set k2 0 0 5\r\nworld\r\n")
sock.sendall(b"set k3 0 0 5\r\ntest!\r\n")
# 然后一次性读取所有响应
4. 压缩大 Value
import gzip
COMPRESS_THRESHOLD = 1024 # 超过 1KB 压缩
def set_with_compress(client, key: str, value: bytes, ttl: int = 300):
if len(value) > COMPRESS_THRESHOLD:
compressed = gzip.compress(value)
if len(compressed) < len(value):
# flags 标记压缩
client.set(key, compressed, flags=0x01, exptime=ttl)
return
client.set(key, value, exptime=ttl)
def get_with_decompress(client, key: str) -> bytes | None:
result = client.gets(key)
if result is None:
return None
value, flags, cas = result
if flags & 0x01: # 压缩标志
return gzip.decompress(value)
return value
5. 连接复用
# ❌ 低效:每次操作新建连接
def bad_get(key):
sock = socket.socket(...)
sock.connect(...)
sock.sendall(...)
sock.close()
# ✅ 高效:复用长连接
class PersistentClient:
def __init__(self, host, port):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((host, port))
def __del__(self):
self.sock.close()
6. 合理设置 TTL
# 根据数据更新频率设置 TTL
TTL_CONFIG = {
'user_profile': 3600, # 用户资料,1小时
'session': 1800, # 会话,30分钟
'product_detail': 600, # 商品详情,10分钟
'hot_ranking': 60, # 热榜,1分钟
'config': 86400, # 配置,1天
'captcha': 300, # 验证码,5分钟
}
10.5 监控与告警
关键监控指标
| 指标 | 来源 | 告警阈值 | 说明 |
|---|
| 缓存命中率 | stats | < 80% | 影响性能的关键指标 |
| 当前连接数 | stats | > 80% 最大连接数 | 连接数不足 |
| 内存使用率 | stats | > 90% 最大内存 | 内存不足 |
| 淘汰次数 | stats | 持续增长 | 内存不足导致淘汰 |
| 命令延迟 | 客户端 | > 5ms | 响应变慢 |
| 线程利用率 | stats | > 80% | Worker 线程繁忙 |
监控脚本
#!/usr/bin/env python3
"""monitor.py — Memcached 监控脚本"""
import socket
import time
import json
class MemcachedMonitor:
def __init__(self, host='127.0.0.1', port=11211):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((host, port))
self.prev_stats = {}
def get_stats(self) -> dict:
self.sock.sendall(b"stats\r\n")
buffer = b""
while True:
chunk = self.sock.recv(8192)
buffer += chunk
if b"END\r\n" in buffer:
break
stats = {}
for line in buffer.decode().split("\r\n"):
if line.startswith("STAT"):
parts = line.split()
stats[parts[1]] = parts[2]
return stats
def check_health(self) -> dict:
stats = self.get_stats()
current = {k: int(v) for k, v in stats.items() if v.isdigit()}
result = {
'timestamp': time.time(),
'curr_items': current.get('curr_items', 0),
'bytes': current.get('bytes', 0),
'max_bytes': current.get('max_bytes', 0),
'curr_connections': current.get('curr_connections', 0),
'cmd_get': current.get('cmd_get', 0),
'cmd_set': current.get('cmd_set', 0),
'get_hits': current.get('get_hits', 0),
'get_misses': current.get('get_misses', 0),
'evictions': current.get('evictions', 0),
}
# 计算命中率
total = result['get_hits'] + result['get_misses']
result['hit_rate'] = (
result['get_hits'] / total * 100 if total > 0 else 0
)
# 计算增量
if self.prev_stats:
result['cmd_get_rate'] = (
result['cmd_get'] - self.prev_stats.get('cmd_get', 0)
)
result['cmd_set_rate'] = (
result['cmd_set'] - self.prev_stats.get('cmd_set', 0)
)
self.prev_stats = result
return result
def alert(self, stats: dict):
"""检查告警条件"""
alerts = []
if stats['hit_rate'] < 80:
alerts.append(f"⚠️ 缓存命中率低: {stats['hit_rate']:.1f}%")
if stats['bytes'] > 0 and stats['max_bytes'] > 0:
usage = stats['bytes'] / stats['max_bytes'] * 100
if usage > 90:
alerts.append(f"⚠️ 内存使用率高: {usage:.1f}%")
if stats['evictions'] > self.prev_stats.get('evictions', 0):
alerts.append(f"⚠️ 发生淘汰: {stats['evictions']}")
return alerts
# 使用
monitor = MemcachedMonitor()
for _ in range(10):
stats = monitor.check_health()
print(json.dumps(stats, indent=2))
alerts = monitor.alert(stats)
for alert in alerts:
print(alert)
time.sleep(1)
10.6 安全加固
1. 网络层安全
# 仅监听内网地址
memcached -l 127.0.0.1 -p 11211
# 或监听特定内网 IP
memcached -l 10.0.0.1 -p 11211
# 使用 iptables 限制访问
sudo iptables -A INPUT -p tcp --dport 11211 -s 10.0.0.0/24 -j ACCEPT
sudo iptables -A INPUT -p tcp --dport 11211 -j DROP
2. SASL 认证
# 启动时启用 SASL
memcached -S -l 127.0.0.1
# 配置 SASL
echo "mech_list: plain" > /etc/sasl2/memcached.conf
saslpasswd2 -c -a memcached -f /etc/sasldb2 memcached_user
# 客户端 SASL 认证
def sasl_auth(sock, username: str, password: str) -> bool:
"""SASL PLAIN 认证"""
# 二进制协议 SASL 认证
challenge = b"\0" + username.encode() + b"\0" + password.encode()
mechanism = b"PLAIN"
# ... 发送 SASL Auth 命令
return True
3. 命令禁用
# 禁用危险命令(通过启动参数或 ACL)
memcached -l 127.0.0.1 --disable-evictions
# 或通过代理层过滤命令
4. 访问控制列表(ACL)
# 在代理层实现 ACL
ALLOWED_COMMANDS = {
'get', 'gets', 'set', 'add', 'replace',
'delete', 'incr', 'decr', 'flush_all',
'stats', 'version', 'quit'
}
BLOCKED_COMMANDS = {'flush_all'} # 生产环境禁用
def check_command_allowed(command: str) -> bool:
cmd = command.strip().split()[0].lower()
if cmd in BLOCKED_COMMANDS:
return False
return cmd in ALLOWED_COMMANDS
5. 值大小限制
# 设置最大 value 大小
memcached -I 512k # 默认 1MB,调整为 512KB
10.7 运维最佳实践
启动参数推荐
# 生产环境推荐配置
memcached \
-l 10.0.0.1 \ # 监听内网 IP
-p 11211 \ # 端口
-m 4096 \ # 4GB 内存
-c 10000 \ # 最大 10000 连接
-t 8 \ # 8 个工作线程
-U 0 \ # 禁用 UDP
-o modern \ # 启用现代特性(1.6+)
-o slab_automove=1 \ # 自动调整 Slab 分配
-I 1m \ # 最大 Value 1MB
-v \ # 基本日志
-f 1.25 \ # Slab 增长因子
-n 48 # 最小 item 空间
systemd 服务配置
# /etc/systemd/system/memcached.service
[Unit]
Description=Memcached
Documentation=https://memcached.org/
After=network.target
[Service]
Type=simple
User=memcache
Group=memcache
ExecStart=/usr/bin/memcached \
-l 127.0.0.1 \
-p 11211 \
-m 4096 \
-c 10000 \
-t 4 \
-o modern \
-U 0
Restart=always
RestartSec=5
LimitNOFILE=65536
[Install]
WantedBy=multi-user.target
日志分析
# 查看 Memcached 日志
journalctl -u memcached -f
# 统计连接信息
echo "stats" | nc 127.0.0.1 11211 | grep -E "(curr_items|bytes|cmd_|get_)"
# 监控 slab 分配
watch -n 1 'echo "stats slabs" | nc 127.0.0.1 11211 | head -30'
10.8 常见问题排查
问题一:缓存命中率低
# 诊断步骤
echo "stats" | nc 127.0.0.1 11211 | grep -E "(get_hits|get_misses|evictions)"
# 可能原因:
# 1. TTL 过短 → 增加 TTL
# 2. 内存不足导致淘汰 → 增加内存或优化 key
# 3. Key 分布不均 → 检查哈希分布
# 4. 缓存穿透 → 使用空值缓存
问题二:内存使用过高
# 查看内存分配
echo "stats slabs" | nc 127.0.0.1 11211
# 查看 item 分布
echo "stats sizes" | nc 127.0.0.1 11211
# 可能原因:
# 1. 大量大 Value → 考虑压缩或拆分
# 2. Slab 碎片 → 调整增长因子 -f
# 3. 内存泄漏(item 未过期) → 检查 TTL 设置
问题三:连接数过多
# 查看连接信息
echo "stats" | nc 127.0.0.1 11211 | grep connection
# 可能原因:
# 1. 连接池配置不当 → 调整连接池大小
# 2. 连接未正确关闭 → 检查客户端代码
# 3. 慢查询阻塞 → 检查大 Value 操作
问题四:性能下降
# 检查淘汰率
echo "stats" | nc 127.0.0.1 11211 | grep evictions
# 检查 Slab 状态
echo "stats slabs" | nc 127.0.0.1 11211
# 可能原因:
# 1. Slab 不均衡 → 启用 slab_automove
# 2. 大量 CAS 操作 → 减少并发冲突
# 3. 网络延迟 → 检查网络状况
10.9 生产环境检查清单
| 检查项 | 状态 | 说明 |
|---|
| 仅监听内网地址 | ☐ | -l 参数 |
| 禁用 UDP | ☐ | -U 0 |
| 设置最大内存 | ☐ | -m 参数 |
| 设置最大连接数 | ☐ | -c 参数 |
| 配置防火墙 | ☐ | iptables / 安全组 |
| 启用 SASL 认证 | ☐ | -S 参数 |
| 配置监控告警 | ☐ | 命中率、内存、连接数 |
| 配置日志 | ☐ | journalctl |
| 测试故障转移 | ☐ | 模拟节点故障 |
| 测试数据迁移 | ☐ | 节点增减测试 |
| 配置备份策略 | ☐ | 定期 dump |
| 文档化架构 | ☐ | 架构图、key 设计 |
10.10 扩展阅读
全书总结
本教程系统讲解了 Memcached 的三大协议(文本协议、二进制协议、Meta 协议)及其完整命令集,深入探讨了分布式架构、一致性哈希、连接池、故障转移等核心主题,最后给出了生产环境的最佳实践。
核心要点回顾
| 主题 | 关键知识 |
|---|
| 协议选择 | 文本协议可读性好,二进制协议效率高,Meta 协议功能丰富 |
| 缓存策略 | Cache-Aside 模式最常用,注意穿透/击穿/雪崩防护 |
| 分布式设计 | 一致性哈希保证节点增减时最小化数据迁移 |
| 性能优化 | 批量操作、管道化、连接复用、压缩大 Value |
| 安全加固 | 网络隔离、SASL 认证、命令禁用、值大小限制 |
上一章: 第09章 代理与分布式
返回: 目录