Memcached 传输协议精讲 / 第04章 存储命令深入
第04章 存储命令深入
存储命令是 Memcached 的核心写入接口。理解每个命令的语义差异、flags 用法和 TTL 行为,是正确使用缓存的前提。
4.1 SET — 无条件存储
语法
set <key> <flags> <exptime> <length> [noreply]\r\n
<data block>\r\n
语义
set 是最基本的存储命令。无论 key 是否已存在,都无条件覆盖已有值。这是最常用的写入命令。
参数详解
| 参数 | 类型 | 范围 | 说明 |
|---|---|---|---|
key | string | 1-250 字节 | 缓存键,不能含空白和控制字符 |
flags | uint16 | 0-65535 | 客户端自定义标志位,服务端原样返回 |
exptime | int | 见下文 | 过期时间 |
length | int | 0-1048576 | 数据块字节长度(默认最大 1MB) |
noreply | string | — | 可选,设置后不返回响应 |
exptime 规则
| 值 | 含义 |
|---|---|
| 0 | 永不过期 |
| 1-2592000 (30天) | 秒数 |
| >2592000 | Unix 时间戳(绝对时间) |
# 永不过期
set key1 0 0 5
hello
STORED
# 60 秒后过期
set key2 0 60 5
hello
STORED
# Unix 时间戳(2025-12-31 23:59:59 UTC)
set key3 0 1767225599 5
hello
STORED
完整交互示例
$ telnet 127.0.0.1 11211
set mykey 0 300 11
hello world
STORED
get mykey
VALUE mykey 0 11
hello world
END
# 覆盖
set mykey 0 300 7
new data
STORED
get mykey
VALUE mykey 0 7
new data
END
4.2 ADD — 不存在时写入
语法
add <key> <flags> <exptime> <length> [noreply]\r\n
<data block>\r\n
语义
仅当 key 不存在(或已过期被清除)时写入。若 key 已存在,返回 NOT_STORED。
交互示例
# key 不存在,写入成功
add newkey 0 60 5
hello
STORED
# key 已存在,写入失败
add newkey 0 60 5
world
NOT_STORED
典型业务场景
1. 初始化配置
def init_config(sock, key: str, value: str, ttl: int = 86400):
"""仅在配置不存在时写入(避免覆盖已有配置)"""
data = value.encode()
cmd = f"add {key} 0 {ttl} {len(data)}\r\n"
sock.sendall(cmd.encode() + data + b"\r\n")
resp = sock.recv(1024).decode().strip()
return resp == "STORED"
# 使用
if init_config(sock, "config:max_connections", "1000"):
print("配置初始化成功")
else:
print("配置已存在,跳过")
2. 分布式锁
add 的语义天然适合实现分布式锁:只有第一个请求能成功 add,后续请求返回 NOT_STORED。
import uuid
import time
def try_acquire_lock(sock, lock_name: str, ttl: int = 30) -> str | None:
token = str(uuid.uuid4())
data = token.encode()
cmd = f"add lock:{lock_name} 0 {ttl} {len(data)}\r\n"
sock.sendall(cmd.encode() + data + b"\r\n")
resp = sock.recv(1024).decode().strip()
return token if resp == "STORED" else None
def release_lock(sock, lock_name: str, token: str):
"""简单删除(注意:无法验证 token,生产环境建议使用 Lua 脚本)"""
sock.sendall(f"delete lock:{lock_name}\r\n".encode())
sock.recv(1024)
# 使用
token = try_acquire_lock(sock, "order:process")
if token:
try:
print("获取锁成功,执行业务逻辑...")
# ... 业务处理 ...
finally:
release_lock(sock, "order:process", token)
else:
print("获取锁失败,其他进程正在处理")
4.3 REPLACE — 已存在时替换
语法
replace <key> <flags> <exptime> <length> [noreply]\r\n
<data block>\r\n
语义
仅当 key 已存在时替换值。若 key 不存在,返回 NOT_STORED(注意:不是 NOT_FOUND)。
交互示例
# key 不存在,替换失败
replace missing_key 0 60 5
hello
NOT_STORED
# 先 set 创建
set mykey 0 0 5
hello
STORED
# 再 replace 替换
replace mykey 0 0 7
goodbye
STORED
典型业务场景
条件更新:只更新已初始化的数据,避免因并发 add 产生脏数据。
def update_user_profile(sock, user_id: int, profile_json: str):
"""只更新已缓存的用户资料,不存在则跳过"""
key = f"user:{user_id}:profile"
data = profile_json.encode()
cmd = f"replace {key} 0 3600 {len(data)}\r\n"
sock.sendall(cmd.encode() + data + b"\r\n")
resp = sock.recv(1024).decode().strip()
return resp == "STORED"
4.4 APPEND — 追加数据
语法
append <key> <flags> <exptime> <length> [noreply]\r\n
<data block>\r\n
语义
在已有值的末尾追加数据。不修改 flags 和 exptime。key 必须已存在。
注意: append 不会在尾部自动添加分隔符,需要客户端自行处理。
交互示例
set log 0 0 5
hello
STORED
append log 0 0 6
world
STORED
get log
VALUE log 0 11
hello world
END
业务场景:日志聚合
def append_log(sock, log_key: str, log_entry: str, ttl: int = 3600):
"""将日志条目追加到缓存中的日志键"""
entry = log_entry.encode()
# 首次使用 add,后续使用 append
cmd = f"add {log_key} 0 {ttl} {len(entry)}\r\n"
sock.sendall(cmd.encode() + entry + b"\r\n")
resp = sock.recv(1024).decode().strip()
if resp == "NOT_STORED":
# key 已存在,使用 append
cmd = f"append {log_key} 0 0 {len(entry)}\r\n"
sock.sendall(cmd.encode() + entry + b"\r\n")
sock.recv(1024)
# 使用
append_log(sock, "log:request:12345", "2025-01-01 10:00:00 GET /api/users\n")
append_log(sock, "log:request:12345", "2025-01-01 10:00:01 200 OK\n")
4.5 PREPEND — 前插数据
语法
prepend <key> <flags> <exptime> <length> [noreply]\r\n
<data block>\r\n
语义
在已有值的开头插入数据。与 append 方向相反。
交互示例
set greeting 0 0 5
world
STORED
prepend greeting 0 0 6
hello
STORED
get greeting
VALUE greeting 0 11
hello world
END
业务场景:优先级队列
def prepend_priority(sock, queue_key: str, item: str, ttl: int = 3600):
"""将高优先级任务插入队列头部"""
data = item.encode()
cmd = f"prepend {queue_key} 0 {ttl} {len(data)}\r\n"
sock.sendall(cmd.encode() + data + b"\r\n")
resp = sock.recv(1024).decode().strip()
return resp == "STORED"
4.6 CAS — Compare-And-Swap
语法
cas <key> <flags> <exptime> <length> <cas_unique> [noreply]\r\n
<data block>\r\n
语义
CAS 是 Memcached 实现乐观并发控制的核心命令。只有当 key 的当前 cas_unique 与请求中的值匹配时,才执行写入。
CAS 响应
| 响应 | 含义 |
|---|---|
STORED | 写入成功(版本匹配) |
EXISTS | 写入失败(版本已被其他客户端修改) |
NOT_FOUND | key 不存在 |
完整 CAS 工作流
#!/usr/bin/env python3
"""cas_demo.py — CAS 乐观锁更新演示"""
import socket
import json
class CASClient:
def __init__(self, host='127.0.0.1', port=11211):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((host, port))
def gets(self, key: str):
"""获取值和 CAS unique"""
self.sock.sendall(f"gets {key}\r\n".encode())
resp = self.sock.recv(65536).decode()
lines = resp.strip().split("\r\n")
if lines[0].startswith("VALUE"):
parts = lines[0].split()
cas_unique = int(parts[4])
value = lines[1]
return value, cas_unique
return None, None
def cas(self, key: str, value: str, cas_unique: int,
flags: int = 0, exptime: int = 0) -> str:
data = value.encode()
cmd = f"cas {key} {flags} {exptime} {len(data)} {cas_unique}\r\n"
self.sock.sendall(cmd.encode() + data + b"\r\n")
return self.sock.recv(1024).decode().strip()
def set(self, key: str, value: str, flags: int = 0, exptime: int = 0):
data = value.encode()
cmd = f"set {key} {flags} {exptime} {len(data)}\r\n"
self.sock.sendall(cmd.encode() + data + b"\r\n")
self.sock.recv(1024)
def cas_update(client: CASClient, key: str, update_fn, max_retries: int = 5):
"""带重试的 CAS 更新"""
for attempt in range(max_retries):
value, cas_unique = client.gets(key)
if value is None:
print(f"Key '{key}' 不存在")
return False
# 应用更新函数
new_value = update_fn(value)
# 尝试 CAS 写入
result = client.cas(key, new_value, cas_unique)
if result == "STORED":
print(f"CAS 更新成功(第 {attempt + 1} 次尝试)")
return True
elif result == "EXISTS":
print(f"版本冲突,重试...(第 {attempt + 1} 次)")
continue
else:
print(f"CAS 失败: {result}")
return False
print(f"超过最大重试次数 ({max_retries})")
return False
# 使用示例
client = CASClient()
client.set("counter", '{"count": 0}')
def increment_counter(value_str: str) -> str:
data = json.loads(value_str)
data["count"] += 1
return json.dumps(data)
cas_update(client, "counter", increment_counter)
# 读取最终值
value, _ = client.gets("counter")
print(f"最终值: {value}")
4.7 FLAGS 详解
flags 是一个 16 位无符号整数(0-65535),Memcached 服务端不解释这个字段,完全由客户端定义含义。
常见用法
| 位 | 用途 | 说明 |
|---|---|---|
| 0-7 | 数据类型标识 | 标记存储的数据类型 |
| 8-15 | 压缩标识 | 标记数据是否被压缩 |
典型客户端定义
# 常见 flags 定义
FLAGS_RAW = 0x00 # 原始字符串
FLAGS_JSON = 0x01 # JSON 对象
FLAGS_PICKLE = 0x02 # Python pickle
FLAGS_INT = 0x03 # 整数
FLAGS_GZIP = 0x10 # GZIP 压缩标志(可与类型组合)
def get_type_flags(flags: int) -> int:
return flags & 0x0F
def is_compressed(flags: int) -> bool:
return (flags & 0x10) != 0
# 存储 JSON 数据(带类型标识)
import json
def set_json(sock, key: str, obj: dict, ttl: int = 300):
data = json.dumps(obj).encode()
cmd = f"set {key} {FLAGS_JSON} {ttl} {len(data)}\r\n"
sock.sendall(cmd.encode() + data + b"\r\n")
return sock.recv(1024).decode().strip() == "STORED"
def get_json(sock, key: str):
sock.sendall(f"get {key}\r\n".encode())
resp = sock.recv(65536).decode()
lines = resp.strip().split("\r\n")
if lines[0].startswith("VALUE"):
parts = lines[0].split()
flags = int(parts[2])
data = lines[1]
if get_type_flags(flags) == FLAGS_JSON:
return json.loads(data)
return data
return None
压缩标志示例
import gzip
import json
def set_compressed_json(sock, key: str, obj: dict, ttl: int = 300):
raw = json.dumps(obj).encode()
compressed = gzip.compress(raw)
# 仅在压缩后更小时使用压缩
if len(compressed) < len(raw):
data = compressed
flags = FLAGS_JSON | FLAGS_GZIP
else:
data = raw
flags = FLAGS_JSON
cmd = f"set {key} {flags} {ttl} {len(data)}\r\n"
sock.sendall(cmd.encode() + data + b"\r\n")
return sock.recv(1024).decode().strip() == "STORED"
def get_compressed_json(sock, key: str):
sock.sendall(f"get {key}\r\n".encode())
resp = sock.recv(65536).decode()
lines = resp.strip().split("\r\n")
if lines[0].startswith("VALUE"):
parts = lines[0].split()
flags = int(parts[2])
data = lines[1].encode()
if is_compressed(flags):
data = gzip.decompress(data)
if get_type_flags(flags) == FLAGS_JSON:
return json.loads(data)
return data.decode()
return None
4.8 TTL (Time-To-Live) 深入
TTL 处理流程
写入时设置 exptime
│
▼
┌───────────────────────────────┐
│ exptime == 0 │──▶ 永不过期
│ exptime 1-2592000 (30天) │──▶ 当前时间 + exptime
│ exptime > 2592000 │──▶ 视为 Unix 时间戳
└───────────────────────────────┘
│
▼
Item 被访问时检查是否过期
│
├── 未过期 → 正常返回
└── 已过期 → 返回 NOT_FOUND(惰性删除)
惰性删除机制
Memcached 不会主动扫描过期 item。过期检查发生在:
- get 访问时:检查是否过期,过期则返回 NOT_FOUND
- LRU 淘汰时:优先跳过已过期的 item
- flush_all 时:标记所有 item 为过期
set shortlived 0 2 5
hello
STORED
# 立即获取(存在)
get shortlived
VALUE shortlived 0 5
hello
END
# 等待 3 秒后获取(已过期)
get shortlived
END
TTL 最佳实践
| 场景 | 建议 TTL | 原因 |
|---|---|---|
| 用户 Session | 1800 (30分钟) | 安全性 + 资源回收 |
| 热点数据 | 300-3600 | 平衡新鲜度和命中率 |
| 配置数据 | 86400 (1天) | 配置变化不频繁 |
| 计数器 | 0 或 86400 | 根据业务重置周期 |
| 临时锁 | 30 | 防止死锁 |
4.9 存储命令内部流程
以 set 命令为例,Memcached 内部处理流程如下:
收到 set 命令
│
▼
解析命令行(key, flags, exptime, length)
│
▼
检查 key 长度 ≤ 250 字节
│
├── 超长 → CLIENT_ERROR
└── 正常 ↓
读取 length 字节的数据体
│
▼
计算 item 所需空间(key + value + 元数据)
│
▼
从对应 Slab Class 分配内存
│
├── 成功 ↓
└── 失败 → 尝试 LRU 淘汰
│
├── 淘汰成功 → 重试分配
└── 无 item 可淘汰 → SERVER_ERROR out of memory
│
▼
在哈希表中插入/更新 item
│
▼
返回 STORED
4.10 客户端实现完整示例
#!/usr/bin/env python3
"""storage_client.py — 完整的存储命令客户端实现"""
import socket
from typing import Optional
class MemcachedStorageClient:
def __init__(self, host: str = '127.0.0.1', port: int = 11211):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((host, port))
def _send_store(self, cmd: str, data: bytes, noreply: bool = False) -> str:
nr = " noreply" if noreply else ""
full_cmd = f"{cmd}{nr}\r\n"
self.sock.sendall(full_cmd.encode() + data + b"\r\n")
if noreply:
return "NOREPLY"
return self.sock.recv(1024).decode().strip()
def set(self, key: str, value: bytes, flags: int = 0,
exptime: int = 0, noreply: bool = False) -> str:
cmd = f"set {key} {flags} {exptime} {len(value)}"
return self._send_store(cmd, value, noreply)
def add(self, key: str, value: bytes, flags: int = 0,
exptime: int = 0, noreply: bool = False) -> str:
cmd = f"add {key} {flags} {exptime} {len(value)}"
return self._send_store(cmd, value, noreply)
def replace(self, key: str, value: bytes, flags: int = 0,
exptime: int = 0, noreply: bool = False) -> str:
cmd = f"replace {key} {flags} {exptime} {len(value)}"
return self._send_store(cmd, value, noreply)
def append(self, key: str, value: bytes, noreply: bool = False) -> str:
cmd = f"append {key} 0 0 {len(value)}"
return self._send_store(cmd, value, noreply)
def prepend(self, key: str, value: bytes, noreply: bool = False) -> str:
cmd = f"prepend {key} 0 0 {len(value)}"
return self._send_store(cmd, value, noreply)
def cas(self, key: str, value: bytes, cas_unique: int,
flags: int = 0, exptime: int = 0, noreply: bool = False) -> str:
cmd = f"cas {key} {flags} {exptime} {len(value)} {cas_unique}"
return self._send_store(cmd, value, noreply)
def close(self):
self.sock.sendall(b"quit\r\n")
self.sock.close()
# 测试
client = MemcachedStorageClient()
print(client.set("test:1", b"hello"))
# STORED
print(client.add("test:2", b"world"))
# STORED
print(client.add("test:2", b"again"))
# NOT_STORED
print(client.replace("test:1", b"updated"))
# STORED
print(client.append("test:1", b"!"))
# STORED
client.close()
4.11 注意事项
| 编号 | 注意事项 | 说明 |
|---|---|---|
| 1 | length 必须精确 | 数据体字节数必须与 length 完全一致,否则 CLIENT_ERROR |
| 2 | append/prepend 不修改 flags/expire | 仅修改数据内容 |
| 3 | CAS 唯一标识会变化 | 每次写入(包括 set/add/replace)都会更新 cas_unique |
| 4 | 零长度值 | length=0 是合法的,会存储一个空值 |
| 5 | flags 客户端自定义 | 服务端原样存储和返回,不做解释 |
4.12 扩展阅读
上一章: 第03章 核心命令总览 下一章: 第05章 检索命令深入 — 深入解析 get/gets 的工作机制与最佳实践。