强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

Memcached 传输协议精讲 / 第05章 检索命令深入

第05章 检索命令深入

get 和 gets 是 Memcached 中最频繁调用的命令。深入理解其工作机制是优化缓存性能的关键。


5.1 GET 命令详解

语法

get <key>*\r\n
  • 支持单个或多个 key(空格分隔)
  • 不存在的 key 被静默忽略(不会报错)
  • 响应以 END 结束

单 key 获取

get user:1001
VALUE user:1001 0 13
{"name":"Bob"}
END

多 key 批量获取

get user:1001 user:1002 user:1003
VALUE user:1001 0 13
{"name":"Bob"}
VALUE user:1002 0 15
{"name":"Alice"}
END

关键点: user:1003 不存在,响应中没有对应的 VALUE 块,也不会报错。

响应字段解析

VALUE <key> <flags> <bytes>\r\n
<data block>\r\n
字段类型说明
keystring原样返回请求的 key
flagsuint16存储时设置的 flags
bytesint数据块的字节长度

5.2 GETS 命令详解

语法

gets <key>*\r\n

get 完全相同,但每个 VALUE 块额外返回 cas_unique 字段。

响应格式

VALUE <key> <flags> <bytes> <cas_unique>\r\n
<data block>\r\n
...
END\r\n

示例

gets user:1001
VALUE user:1001 0 13 12345678
{"name":"Bob"}
END

cas_unique 说明

特性说明
类型64 位无符号整数
生成方式服务端内部实现(通常基于时间戳 + 随机数)
唯一性全局唯一,每次写入都会更新
用途cas 命令进行乐观锁写入

5.3 批量获取 vs 逐个获取

性能对比

逐个获取(3 次往返):
Client          Server
  │── get k1 ──▶│
  │◀── VALUE ───│
  │── get k2 ──▶│
  │◀── VALUE ───│
  │── get k3 ──▶│
  │◀── VALUE ───│

批量获取(1 次往返):
Client          Server
  │── get k1 k2 k3 ──▶│
  │◀── VALUE VALUE ───│

结论: 批量获取将 N 次网络往返减少为 1 次,显著降低延迟。

Python 实现

import socket
import time

class MemcachedRetrievalClient:
    def __init__(self, host='127.0.0.1', port=11211):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.connect((host, port))

    def get(self, key: str) -> bytes | None:
        """获取单个 key"""
        result = self._get_multi([key])
        return result.get(key)

    def gets(self, key: str) -> tuple[bytes | None, int | None]:
        """获取单个 key + cas_unique"""
        result = self._gets_multi([key])
        item = result.get(key)
        if item:
            return item['data'], item['cas']
        return None, None

    def get_multi(self, keys: list[str]) -> dict[str, bytes]:
        """批量获取"""
        return {k: v['data'] for k, v in self._get_multi(keys).items()}

    def gets_multi(self, keys: list[str]) -> dict[str, dict]:
        """批量获取 + cas_unique"""
        return self._gets_multi(keys)

    def _get_multi(self, keys: list[str]) -> dict[str, dict]:
        cmd = f"get {' '.join(keys)}\r\n"
        self.sock.sendall(cmd.encode())
        return self._read_value_response()

    def _gets_multi(self, keys: list[str]) -> dict[str, dict]:
        cmd = f"gets {' '.join(keys)}\r\n"
        self.sock.sendall(cmd.encode())
        return self._read_value_response(with_cas=True)

    def _read_value_response(self, with_cas=False) -> dict:
        result = {}
        buffer = b""

        while True:
            chunk = self.sock.recv(65536)
            buffer += chunk

            if b"END\r\n" in buffer:
                break

        lines = buffer.split(b"\r\n")
        i = 0
        while i < len(lines):
            line = lines[i]
            if line == b"END":
                break
            if line.startswith(b"VALUE "):
                parts = line.decode().split()
                key = parts[1]
                flags = int(parts[2])
                length = int(parts[3])
                cas = int(parts[4]) if with_cas and len(parts) > 4 else None

                i += 1
                if i < len(lines):
                    data = lines[i]
                    result[key] = {
                        'flags': flags,
                        'data': data,
                        'cas': cas
                    }
            i += 1

        return result

    def close(self):
        self.sock.sendall(b"quit\r\n")
        self.sock.close()


# 性能测试
client = MemcachedRetrievalClient()

# 写入测试数据
for i in range(10):
    client._write_set(f"bench:{i}", f"value_{i}".encode())

# 批量获取
start = time.time()
result = client.get_multi([f"bench:{i}" for i in range(10)])
batch_time = time.time() - start
print(f"批量获取 10 个 key: {batch_time*1000:.2f}ms, 命中 {len(result)} 个")

# 逐个获取
start = time.time()
for i in range(10):
    client.get(f"bench:{i}")
single_time = time.time() - start
print(f"逐个获取 10 个 key: {single_time*1000:.2f}ms")

print(f"批量获取快 {single_time/batch_time:.1f} 倍")

5.4 值格式与解析

VALUE 行解析

VALUE <key> <flags> <bytes> [<cas_unique>]\r\n

解析伪代码:

def parse_value_line(line: str, has_cas: bool = False):
    parts = line.split()
    result = {
        'key': parts[1],
        'flags': int(parts[2]),
        'bytes': int(parts[3])
    }
    if has_cas:
        result['cas_unique'] = int(parts[4])
    return result

数据块长度精确匹配

数据块的长度必须<bytes> 字段精确匹配。客户端需要严格按字节数读取,不能依赖行终止符:

def read_exact(sock, n: int) -> bytes:
    """精确读取 n 个字节"""
    data = b""
    while len(data) < n:
        chunk = sock.recv(n - len(data))
        if not chunk:
            raise ConnectionError("Connection closed")
        data += chunk
    return data

# 正确做法
length = 13
data = read_exact(sock, length)  # 精确读取 13 字节
crlf = read_exact(sock, 2)       # 读取 \r\n

# 错误做法 ❌
data = sock.readline()  # 可能读取过多或过少

5.5 返回顺序

有序返回

响应中 VALUE 块的顺序与请求中的 key 顺序一致(仅返回存在的 key)。

get c a b
VALUE c 0 3
ccc
VALUE a 0 3
aaa
VALUE b 0 3
bbb
END

注意: 顺序是请求顺序,不是存储顺序或字母顺序。

处理顺序的客户端实现

def get_ordered(sock, keys: list[str]) -> dict[str, bytes]:
    """获取多个 key,保持请求顺序"""
    cmd = f"get {' '.join(keys)}\r\n"
    sock.sendall(cmd.encode())

    result = {}
    buffer = b""
    while True:
        chunk = sock.recv(65536)
        buffer += chunk
        if b"END\r\n" in buffer:
            break

    lines = buffer.split(b"\r\n")
    i = 0
    while i < len(lines):
        line = lines[i]
        if line == b"END":
            break
        if line.startswith(b"VALUE "):
            key = line.decode().split()[1]
            i += 1
            if i < len(lines):
                result[key] = lines[i]
        i += 1

    return result

5.6 未命中行为

静默忽略

不存在的 key 在响应中被完全忽略:

# 请求 3 个 key,只有 2 个存在
get existing1 missing existing2
VALUE existing1 0 3
aaa
VALUE existing2 0 3
bbb
END

未命中率监控

def get_with_miss_tracking(sock, keys: list[str]) -> tuple[dict, list]:
    """获取多个 key,同时跟踪未命中的 key"""
    cmd = f"get {' '.join(keys)}\r\n"
    sock.sendall(cmd.encode())

    result = {}
    buffer = b""
    while True:
        chunk = sock.recv(65536)
        buffer += chunk
        if b"END\r\n" in buffer:
            break

    lines = buffer.split(b"\r\n")
    i = 0
    while i < len(lines):
        line = lines[i]
        if line == b"END":
            break
        if line.startswith(b"VALUE "):
            key = line.decode().split()[1]
            i += 1
            if i < len(lines):
                result[key] = lines[i]
        i += 1

    missed = [k for k in keys if k not in result]
    return result, missed

5.7 获取命令内部流程

收到 get/gets 命令
    │
    ▼
解析命令行,提取所有 key
    │
    ▼
对每个 key 循环处理:
    │
    ├── 计算 key 的哈希值
    │
    ├── 在哈希表中查找
    │   ├── 找到 → 检查是否过期
    │   │   ├── 未过期 → 加入响应
    │   │   └── 已过期 → 惰性删除,跳过
    │   └── 未找到 → 跳过
    │
    └── 继续下一个 key
    │
    ▼
输出所有找到的 VALUE 块
    │
    ▼
输出 END

5.8 大批量获取策略

分批获取

当 key 数量很多时,建议分批获取以避免:

  1. 单个请求过大导致服务端阻塞
  2. 响应数据超过客户端缓冲区
  3. 长时间占用连接
def batch_get(sock, keys: list[str], batch_size: int = 100) -> dict:
    """分批获取大量 key"""
    result = {}
    for i in range(0, len(keys), batch_size):
        batch = keys[i:i + batch_size]
        cmd = f"get {' '.join(batch)}\r\n"
        sock.sendall(cmd.encode())

        buffer = b""
        while True:
            chunk = sock.recv(65536)
            buffer += chunk
            if b"END\r\n" in buffer:
                break

        lines = buffer.split(b"\r\n")
        j = 0
        while j < len(lines):
            line = lines[j]
            if line == b"END":
                break
            if line.startswith(b"VALUE "):
                key = line.decode().split()[1]
                j += 1
                if j < len(lines):
                    result[key] = lines[j]
            j += 1

    return result

管道化批量获取

利用 TCP 管道化进一步减少等待时间:

def pipelined_batch_get(sock, keys: list[str], batch_size: int = 100) -> dict:
    """管道化分批获取"""
    result = {}
    batches = [keys[i:i + batch_size] for i in range(0, len(keys), batch_size)]

    # 发送所有请求
    for batch in batches:
        cmd = f"get {' '.join(batch)}\r\n"
        sock.sendall(cmd.encode())

    # 接收所有响应
    for _ in batches:
        buffer = b""
        while True:
            chunk = sock.recv(65536)
            buffer += chunk
            if b"END\r\n" in buffer:
                break

        lines = buffer.split(b"\r\n")
        i = 0
        while i < len(lines):
            line = lines[i]
            if line == b"END":
                break
            if line.startswith(b"VALUE "):
                key = line.decode().split()[1]
                i += 1
                if i < len(lines):
                    result[key] = lines[i]
            i += 1

    return result

5.9 业务场景

场景一:用户信息批量查询

import json
import socket

class UserCache:
    def __init__(self, host='127.0.0.1', port=11211):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.connect((host, port))

    def get_users(self, user_ids: list[int]) -> dict:
        """批量获取用户信息"""
        keys = [f"user:{uid}" for uid in user_ids]
        cmd = f"get {' '.join(keys)}\r\n"
        self.sock.sendall(cmd.encode())

        result = {}
        buffer = b""
        while True:
            chunk = self.sock.recv(65536)
            buffer += chunk
            if b"END\r\n" in buffer:
                break

        lines = buffer.split(b"\r\n")
        i = 0
        while i < len(lines):
            line = lines[i]
            if line == b"END":
                break
            if line.startswith(b"VALUE "):
                key = line.decode().split()[1]
                uid = int(key.split(":")[1])
                i += 1
                if i < len(lines):
                    result[uid] = json.loads(lines[i])
            i += 1

        return result

# 使用
cache = UserCache()
users = cache.get_users([1001, 1002, 1003, 9999])  # 9999 不存在
for uid, info in users.items():
    print(f"User {uid}: {info['name']}")

场景二:多级缓存查询

def multi_level_cache_get(sock, key: str) -> bytes | None:
    """
    多级缓存查询模式:
    1. 先查 L1 本地缓存(如 Python dict)
    2. 未命中则查 L2 Memcached
    3. 未命中则查数据库并回写
    """
    import time

    # L1 本地缓存(简单的内存字典)
    local_cache = {}
    local_ttl = {}

    # 检查 L1
    if key in local_cache:
        if local_ttl.get(key, 0) > time.time():
            return local_cache[key]
        else:
            del local_cache[key]
            del local_ttl[key]

    # 查询 L2(Memcached)
    sock.sendall(f"get {key}\r\n".encode())
    buffer = b""
    while True:
        chunk = sock.recv(65536)
        buffer += chunk
        if b"END\r\n" in buffer:
            break

    lines = buffer.split(b"\r\n")
    if lines[0].startswith(b"VALUE "):
        data = lines[1]
        # 回填 L1
        local_cache[key] = data
        local_ttl[key] = time.time() + 60  # L1 TTL 60秒
        return data

    return None  # 两级都未命中

场景三:缓存预热

def warm_up_cache(sock, db_query_fn, keys: list[str], ttl: int = 3600):
    """
    缓存预热:启动时将热点数据加载到 Memcached
    """
    # 先批量查询已有缓存
    cmd = f"get {' '.join(keys)}\r\n"
    sock.sendall(cmd.encode())

    cached = set()
    buffer = b""
    while True:
        chunk = sock.recv(65536)
        buffer += chunk
        if b"END\r\n" in buffer:
            break

    lines = buffer.split(b"\r\n")
    i = 0
    while i < len(lines):
        line = lines[i]
        if line == b"END":
            break
        if line.startswith(b"VALUE "):
            cached.add(line.decode().split()[1])
        i += 2

    # 找出未命中的 key
    missing = [k for k in keys if k not in cached]
    print(f"预热检查: {len(cached)} 个已缓存, {len(missing)} 个需要加载")

    # 从数据库加载并写入缓存
    for key in missing:
        value = db_query_fn(key)
        if value:
            data = value.encode()
            cmd = f"set {key} 0 {ttl} {len(data)}\r\n"
            sock.sendall(cmd.encode() + data + b"\r\n")
            sock.recv(1024)

    print(f"预热完成: 加载了 {len(missing)} 个 key")

5.10 注意事项

编号注意事项说明
1批量获取 key 数量无硬限制但建议单次不超过 1000 个
2响应顺序VALUE 块按请求顺序返回,非存储顺序
3不存在的 key 静默忽略不会返回错误或未命中标志
4数据块长度精确读取必须按字节数精确读取,不能依赖 CRLF
5gets 的 cas_unique每次写入都会改变,用于 CAS 操作
6大 Value 注意网络开销1MB 的 value × 100 个 key = 100MB 响应

5.11 扩展阅读


上一章: 第04章 存储命令深入 下一章: 第06章 二进制协议基础 — 了解 Memcached 二进制协议的帧格式与设计思想。