Memcached 传输协议精讲 / 第05章 检索命令深入
第05章 检索命令深入
get 和 gets 是 Memcached 中最频繁调用的命令。深入理解其工作机制是优化缓存性能的关键。
5.1 GET 命令详解
语法
get <key>*\r\n
- 支持单个或多个 key(空格分隔)
- 不存在的 key 被静默忽略(不会报错)
- 响应以
END结束
单 key 获取
get user:1001
VALUE user:1001 0 13
{"name":"Bob"}
END
多 key 批量获取
get user:1001 user:1002 user:1003
VALUE user:1001 0 13
{"name":"Bob"}
VALUE user:1002 0 15
{"name":"Alice"}
END
关键点:
user:1003不存在,响应中没有对应的 VALUE 块,也不会报错。
响应字段解析
VALUE <key> <flags> <bytes>\r\n
<data block>\r\n
| 字段 | 类型 | 说明 |
|---|---|---|
key | string | 原样返回请求的 key |
flags | uint16 | 存储时设置的 flags |
bytes | int | 数据块的字节长度 |
5.2 GETS 命令详解
语法
gets <key>*\r\n
与 get 完全相同,但每个 VALUE 块额外返回 cas_unique 字段。
响应格式
VALUE <key> <flags> <bytes> <cas_unique>\r\n
<data block>\r\n
...
END\r\n
示例
gets user:1001
VALUE user:1001 0 13 12345678
{"name":"Bob"}
END
cas_unique 说明
| 特性 | 说明 |
|---|---|
| 类型 | 64 位无符号整数 |
| 生成方式 | 服务端内部实现(通常基于时间戳 + 随机数) |
| 唯一性 | 全局唯一,每次写入都会更新 |
| 用途 | 供 cas 命令进行乐观锁写入 |
5.3 批量获取 vs 逐个获取
性能对比
逐个获取(3 次往返):
Client Server
│── get k1 ──▶│
│◀── VALUE ───│
│── get k2 ──▶│
│◀── VALUE ───│
│── get k3 ──▶│
│◀── VALUE ───│
批量获取(1 次往返):
Client Server
│── get k1 k2 k3 ──▶│
│◀── VALUE VALUE ───│
结论: 批量获取将 N 次网络往返减少为 1 次,显著降低延迟。
Python 实现
import socket
import time
class MemcachedRetrievalClient:
def __init__(self, host='127.0.0.1', port=11211):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((host, port))
def get(self, key: str) -> bytes | None:
"""获取单个 key"""
result = self._get_multi([key])
return result.get(key)
def gets(self, key: str) -> tuple[bytes | None, int | None]:
"""获取单个 key + cas_unique"""
result = self._gets_multi([key])
item = result.get(key)
if item:
return item['data'], item['cas']
return None, None
def get_multi(self, keys: list[str]) -> dict[str, bytes]:
"""批量获取"""
return {k: v['data'] for k, v in self._get_multi(keys).items()}
def gets_multi(self, keys: list[str]) -> dict[str, dict]:
"""批量获取 + cas_unique"""
return self._gets_multi(keys)
def _get_multi(self, keys: list[str]) -> dict[str, dict]:
cmd = f"get {' '.join(keys)}\r\n"
self.sock.sendall(cmd.encode())
return self._read_value_response()
def _gets_multi(self, keys: list[str]) -> dict[str, dict]:
cmd = f"gets {' '.join(keys)}\r\n"
self.sock.sendall(cmd.encode())
return self._read_value_response(with_cas=True)
def _read_value_response(self, with_cas=False) -> dict:
result = {}
buffer = b""
while True:
chunk = self.sock.recv(65536)
buffer += chunk
if b"END\r\n" in buffer:
break
lines = buffer.split(b"\r\n")
i = 0
while i < len(lines):
line = lines[i]
if line == b"END":
break
if line.startswith(b"VALUE "):
parts = line.decode().split()
key = parts[1]
flags = int(parts[2])
length = int(parts[3])
cas = int(parts[4]) if with_cas and len(parts) > 4 else None
i += 1
if i < len(lines):
data = lines[i]
result[key] = {
'flags': flags,
'data': data,
'cas': cas
}
i += 1
return result
def close(self):
self.sock.sendall(b"quit\r\n")
self.sock.close()
# 性能测试
client = MemcachedRetrievalClient()
# 写入测试数据
for i in range(10):
client._write_set(f"bench:{i}", f"value_{i}".encode())
# 批量获取
start = time.time()
result = client.get_multi([f"bench:{i}" for i in range(10)])
batch_time = time.time() - start
print(f"批量获取 10 个 key: {batch_time*1000:.2f}ms, 命中 {len(result)} 个")
# 逐个获取
start = time.time()
for i in range(10):
client.get(f"bench:{i}")
single_time = time.time() - start
print(f"逐个获取 10 个 key: {single_time*1000:.2f}ms")
print(f"批量获取快 {single_time/batch_time:.1f} 倍")
5.4 值格式与解析
VALUE 行解析
VALUE <key> <flags> <bytes> [<cas_unique>]\r\n
解析伪代码:
def parse_value_line(line: str, has_cas: bool = False):
parts = line.split()
result = {
'key': parts[1],
'flags': int(parts[2]),
'bytes': int(parts[3])
}
if has_cas:
result['cas_unique'] = int(parts[4])
return result
数据块长度精确匹配
数据块的长度必须与 <bytes> 字段精确匹配。客户端需要严格按字节数读取,不能依赖行终止符:
def read_exact(sock, n: int) -> bytes:
"""精确读取 n 个字节"""
data = b""
while len(data) < n:
chunk = sock.recv(n - len(data))
if not chunk:
raise ConnectionError("Connection closed")
data += chunk
return data
# 正确做法
length = 13
data = read_exact(sock, length) # 精确读取 13 字节
crlf = read_exact(sock, 2) # 读取 \r\n
# 错误做法 ❌
data = sock.readline() # 可能读取过多或过少
5.5 返回顺序
有序返回
响应中 VALUE 块的顺序与请求中的 key 顺序一致(仅返回存在的 key)。
get c a b
VALUE c 0 3
ccc
VALUE a 0 3
aaa
VALUE b 0 3
bbb
END
注意: 顺序是请求顺序,不是存储顺序或字母顺序。
处理顺序的客户端实现
def get_ordered(sock, keys: list[str]) -> dict[str, bytes]:
"""获取多个 key,保持请求顺序"""
cmd = f"get {' '.join(keys)}\r\n"
sock.sendall(cmd.encode())
result = {}
buffer = b""
while True:
chunk = sock.recv(65536)
buffer += chunk
if b"END\r\n" in buffer:
break
lines = buffer.split(b"\r\n")
i = 0
while i < len(lines):
line = lines[i]
if line == b"END":
break
if line.startswith(b"VALUE "):
key = line.decode().split()[1]
i += 1
if i < len(lines):
result[key] = lines[i]
i += 1
return result
5.6 未命中行为
静默忽略
不存在的 key 在响应中被完全忽略:
# 请求 3 个 key,只有 2 个存在
get existing1 missing existing2
VALUE existing1 0 3
aaa
VALUE existing2 0 3
bbb
END
未命中率监控
def get_with_miss_tracking(sock, keys: list[str]) -> tuple[dict, list]:
"""获取多个 key,同时跟踪未命中的 key"""
cmd = f"get {' '.join(keys)}\r\n"
sock.sendall(cmd.encode())
result = {}
buffer = b""
while True:
chunk = sock.recv(65536)
buffer += chunk
if b"END\r\n" in buffer:
break
lines = buffer.split(b"\r\n")
i = 0
while i < len(lines):
line = lines[i]
if line == b"END":
break
if line.startswith(b"VALUE "):
key = line.decode().split()[1]
i += 1
if i < len(lines):
result[key] = lines[i]
i += 1
missed = [k for k in keys if k not in result]
return result, missed
5.7 获取命令内部流程
收到 get/gets 命令
│
▼
解析命令行,提取所有 key
│
▼
对每个 key 循环处理:
│
├── 计算 key 的哈希值
│
├── 在哈希表中查找
│ ├── 找到 → 检查是否过期
│ │ ├── 未过期 → 加入响应
│ │ └── 已过期 → 惰性删除,跳过
│ └── 未找到 → 跳过
│
└── 继续下一个 key
│
▼
输出所有找到的 VALUE 块
│
▼
输出 END
5.8 大批量获取策略
分批获取
当 key 数量很多时,建议分批获取以避免:
- 单个请求过大导致服务端阻塞
- 响应数据超过客户端缓冲区
- 长时间占用连接
def batch_get(sock, keys: list[str], batch_size: int = 100) -> dict:
"""分批获取大量 key"""
result = {}
for i in range(0, len(keys), batch_size):
batch = keys[i:i + batch_size]
cmd = f"get {' '.join(batch)}\r\n"
sock.sendall(cmd.encode())
buffer = b""
while True:
chunk = sock.recv(65536)
buffer += chunk
if b"END\r\n" in buffer:
break
lines = buffer.split(b"\r\n")
j = 0
while j < len(lines):
line = lines[j]
if line == b"END":
break
if line.startswith(b"VALUE "):
key = line.decode().split()[1]
j += 1
if j < len(lines):
result[key] = lines[j]
j += 1
return result
管道化批量获取
利用 TCP 管道化进一步减少等待时间:
def pipelined_batch_get(sock, keys: list[str], batch_size: int = 100) -> dict:
"""管道化分批获取"""
result = {}
batches = [keys[i:i + batch_size] for i in range(0, len(keys), batch_size)]
# 发送所有请求
for batch in batches:
cmd = f"get {' '.join(batch)}\r\n"
sock.sendall(cmd.encode())
# 接收所有响应
for _ in batches:
buffer = b""
while True:
chunk = sock.recv(65536)
buffer += chunk
if b"END\r\n" in buffer:
break
lines = buffer.split(b"\r\n")
i = 0
while i < len(lines):
line = lines[i]
if line == b"END":
break
if line.startswith(b"VALUE "):
key = line.decode().split()[1]
i += 1
if i < len(lines):
result[key] = lines[i]
i += 1
return result
5.9 业务场景
场景一:用户信息批量查询
import json
import socket
class UserCache:
def __init__(self, host='127.0.0.1', port=11211):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((host, port))
def get_users(self, user_ids: list[int]) -> dict:
"""批量获取用户信息"""
keys = [f"user:{uid}" for uid in user_ids]
cmd = f"get {' '.join(keys)}\r\n"
self.sock.sendall(cmd.encode())
result = {}
buffer = b""
while True:
chunk = self.sock.recv(65536)
buffer += chunk
if b"END\r\n" in buffer:
break
lines = buffer.split(b"\r\n")
i = 0
while i < len(lines):
line = lines[i]
if line == b"END":
break
if line.startswith(b"VALUE "):
key = line.decode().split()[1]
uid = int(key.split(":")[1])
i += 1
if i < len(lines):
result[uid] = json.loads(lines[i])
i += 1
return result
# 使用
cache = UserCache()
users = cache.get_users([1001, 1002, 1003, 9999]) # 9999 不存在
for uid, info in users.items():
print(f"User {uid}: {info['name']}")
场景二:多级缓存查询
def multi_level_cache_get(sock, key: str) -> bytes | None:
"""
多级缓存查询模式:
1. 先查 L1 本地缓存(如 Python dict)
2. 未命中则查 L2 Memcached
3. 未命中则查数据库并回写
"""
import time
# L1 本地缓存(简单的内存字典)
local_cache = {}
local_ttl = {}
# 检查 L1
if key in local_cache:
if local_ttl.get(key, 0) > time.time():
return local_cache[key]
else:
del local_cache[key]
del local_ttl[key]
# 查询 L2(Memcached)
sock.sendall(f"get {key}\r\n".encode())
buffer = b""
while True:
chunk = sock.recv(65536)
buffer += chunk
if b"END\r\n" in buffer:
break
lines = buffer.split(b"\r\n")
if lines[0].startswith(b"VALUE "):
data = lines[1]
# 回填 L1
local_cache[key] = data
local_ttl[key] = time.time() + 60 # L1 TTL 60秒
return data
return None # 两级都未命中
场景三:缓存预热
def warm_up_cache(sock, db_query_fn, keys: list[str], ttl: int = 3600):
"""
缓存预热:启动时将热点数据加载到 Memcached
"""
# 先批量查询已有缓存
cmd = f"get {' '.join(keys)}\r\n"
sock.sendall(cmd.encode())
cached = set()
buffer = b""
while True:
chunk = sock.recv(65536)
buffer += chunk
if b"END\r\n" in buffer:
break
lines = buffer.split(b"\r\n")
i = 0
while i < len(lines):
line = lines[i]
if line == b"END":
break
if line.startswith(b"VALUE "):
cached.add(line.decode().split()[1])
i += 2
# 找出未命中的 key
missing = [k for k in keys if k not in cached]
print(f"预热检查: {len(cached)} 个已缓存, {len(missing)} 个需要加载")
# 从数据库加载并写入缓存
for key in missing:
value = db_query_fn(key)
if value:
data = value.encode()
cmd = f"set {key} 0 {ttl} {len(data)}\r\n"
sock.sendall(cmd.encode() + data + b"\r\n")
sock.recv(1024)
print(f"预热完成: 加载了 {len(missing)} 个 key")
5.10 注意事项
| 编号 | 注意事项 | 说明 |
|---|---|---|
| 1 | 批量获取 key 数量无硬限制 | 但建议单次不超过 1000 个 |
| 2 | 响应顺序 | VALUE 块按请求顺序返回,非存储顺序 |
| 3 | 不存在的 key 静默忽略 | 不会返回错误或未命中标志 |
| 4 | 数据块长度精确读取 | 必须按字节数精确读取,不能依赖 CRLF |
| 5 | gets 的 cas_unique | 每次写入都会改变,用于 CAS 操作 |
| 6 | 大 Value 注意网络开销 | 1MB 的 value × 100 个 key = 100MB 响应 |
5.11 扩展阅读
上一章: 第04章 存储命令深入 下一章: 第06章 二进制协议基础 — 了解 Memcached 二进制协议的帧格式与设计思想。