强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

Memcached 传输协议精讲 / 第04章 存储命令深入

第04章 存储命令深入

存储命令是 Memcached 的核心写入接口。理解每个命令的语义差异、flags 用法和 TTL 行为,是正确使用缓存的前提。


4.1 SET — 无条件存储

语法

set <key> <flags> <exptime> <length> [noreply]\r\n
<data block>\r\n

语义

set 是最基本的存储命令。无论 key 是否已存在,都无条件覆盖已有值。这是最常用的写入命令。

参数详解

参数类型范围说明
keystring1-250 字节缓存键,不能含空白和控制字符
flagsuint160-65535客户端自定义标志位,服务端原样返回
exptimeint见下文过期时间
lengthint0-1048576数据块字节长度(默认最大 1MB)
noreplystring可选,设置后不返回响应

exptime 规则

含义
0永不过期
1-2592000 (30天)秒数
>2592000Unix 时间戳(绝对时间)
# 永不过期
set key1 0 0 5
hello
STORED

# 60 秒后过期
set key2 0 60 5
hello
STORED

# Unix 时间戳(2025-12-31 23:59:59 UTC)
set key3 0 1767225599 5
hello
STORED

完整交互示例

$ telnet 127.0.0.1 11211

set mykey 0 300 11
hello world
STORED

get mykey
VALUE mykey 0 11
hello world
END

# 覆盖
set mykey 0 300 7
new data
STORED

get mykey
VALUE mykey 0 7
new data
END

4.2 ADD — 不存在时写入

语法

add <key> <flags> <exptime> <length> [noreply]\r\n
<data block>\r\n

语义

仅当 key 不存在(或已过期被清除)时写入。若 key 已存在,返回 NOT_STORED

交互示例

# key 不存在,写入成功
add newkey 0 60 5
hello
STORED

# key 已存在,写入失败
add newkey 0 60 5
world
NOT_STORED

典型业务场景

1. 初始化配置

def init_config(sock, key: str, value: str, ttl: int = 86400):
    """仅在配置不存在时写入(避免覆盖已有配置)"""
    data = value.encode()
    cmd = f"add {key} 0 {ttl} {len(data)}\r\n"
    sock.sendall(cmd.encode() + data + b"\r\n")
    resp = sock.recv(1024).decode().strip()
    return resp == "STORED"

# 使用
if init_config(sock, "config:max_connections", "1000"):
    print("配置初始化成功")
else:
    print("配置已存在,跳过")

2. 分布式锁

add 的语义天然适合实现分布式锁:只有第一个请求能成功 add,后续请求返回 NOT_STORED

import uuid
import time

def try_acquire_lock(sock, lock_name: str, ttl: int = 30) -> str | None:
    token = str(uuid.uuid4())
    data = token.encode()
    cmd = f"add lock:{lock_name} 0 {ttl} {len(data)}\r\n"
    sock.sendall(cmd.encode() + data + b"\r\n")
    resp = sock.recv(1024).decode().strip()
    return token if resp == "STORED" else None

def release_lock(sock, lock_name: str, token: str):
    """简单删除(注意:无法验证 token,生产环境建议使用 Lua 脚本)"""
    sock.sendall(f"delete lock:{lock_name}\r\n".encode())
    sock.recv(1024)

# 使用
token = try_acquire_lock(sock, "order:process")
if token:
    try:
        print("获取锁成功,执行业务逻辑...")
        # ... 业务处理 ...
    finally:
        release_lock(sock, "order:process", token)
else:
    print("获取锁失败,其他进程正在处理")

4.3 REPLACE — 已存在时替换

语法

replace <key> <flags> <exptime> <length> [noreply]\r\n
<data block>\r\n

语义

仅当 key 已存在时替换值。若 key 不存在,返回 NOT_STORED(注意:不是 NOT_FOUND)。

交互示例

# key 不存在,替换失败
replace missing_key 0 60 5
hello
NOT_STORED

# 先 set 创建
set mykey 0 0 5
hello
STORED

# 再 replace 替换
replace mykey 0 0 7
goodbye
STORED

典型业务场景

条件更新:只更新已初始化的数据,避免因并发 add 产生脏数据。

def update_user_profile(sock, user_id: int, profile_json: str):
    """只更新已缓存的用户资料,不存在则跳过"""
    key = f"user:{user_id}:profile"
    data = profile_json.encode()
    cmd = f"replace {key} 0 3600 {len(data)}\r\n"
    sock.sendall(cmd.encode() + data + b"\r\n")
    resp = sock.recv(1024).decode().strip()
    return resp == "STORED"

4.4 APPEND — 追加数据

语法

append <key> <flags> <exptime> <length> [noreply]\r\n
<data block>\r\n

语义

在已有值的末尾追加数据。不修改 flags 和 exptime。key 必须已存在。

注意: append 不会在尾部自动添加分隔符,需要客户端自行处理。

交互示例

set log 0 0 5
hello
STORED

append log 0 0 6
 world
STORED

get log
VALUE log 0 11
hello world
END

业务场景:日志聚合

def append_log(sock, log_key: str, log_entry: str, ttl: int = 3600):
    """将日志条目追加到缓存中的日志键"""
    entry = log_entry.encode()
    # 首次使用 add,后续使用 append
    cmd = f"add {log_key} 0 {ttl} {len(entry)}\r\n"
    sock.sendall(cmd.encode() + entry + b"\r\n")
    resp = sock.recv(1024).decode().strip()

    if resp == "NOT_STORED":
        # key 已存在,使用 append
        cmd = f"append {log_key} 0 0 {len(entry)}\r\n"
        sock.sendall(cmd.encode() + entry + b"\r\n")
        sock.recv(1024)

# 使用
append_log(sock, "log:request:12345", "2025-01-01 10:00:00 GET /api/users\n")
append_log(sock, "log:request:12345", "2025-01-01 10:00:01 200 OK\n")

4.5 PREPEND — 前插数据

语法

prepend <key> <flags> <exptime> <length> [noreply]\r\n
<data block>\r\n

语义

在已有值的开头插入数据。与 append 方向相反。

交互示例

set greeting 0 0 5
world
STORED

prepend greeting 0 0 6
hello 
STORED

get greeting
VALUE greeting 0 11
hello world
END

业务场景:优先级队列

def prepend_priority(sock, queue_key: str, item: str, ttl: int = 3600):
    """将高优先级任务插入队列头部"""
    data = item.encode()
    cmd = f"prepend {queue_key} 0 {ttl} {len(data)}\r\n"
    sock.sendall(cmd.encode() + data + b"\r\n")
    resp = sock.recv(1024).decode().strip()
    return resp == "STORED"

4.6 CAS — Compare-And-Swap

语法

cas <key> <flags> <exptime> <length> <cas_unique> [noreply]\r\n
<data block>\r\n

语义

CAS 是 Memcached 实现乐观并发控制的核心命令。只有当 key 的当前 cas_unique 与请求中的值匹配时,才执行写入。

CAS 响应

响应含义
STORED写入成功(版本匹配)
EXISTS写入失败(版本已被其他客户端修改)
NOT_FOUNDkey 不存在

完整 CAS 工作流

#!/usr/bin/env python3
"""cas_demo.py — CAS 乐观锁更新演示"""

import socket
import json

class CASClient:
    def __init__(self, host='127.0.0.1', port=11211):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.connect((host, port))

    def gets(self, key: str):
        """获取值和 CAS unique"""
        self.sock.sendall(f"gets {key}\r\n".encode())
        resp = self.sock.recv(65536).decode()
        lines = resp.strip().split("\r\n")
        if lines[0].startswith("VALUE"):
            parts = lines[0].split()
            cas_unique = int(parts[4])
            value = lines[1]
            return value, cas_unique
        return None, None

    def cas(self, key: str, value: str, cas_unique: int,
            flags: int = 0, exptime: int = 0) -> str:
        data = value.encode()
        cmd = f"cas {key} {flags} {exptime} {len(data)} {cas_unique}\r\n"
        self.sock.sendall(cmd.encode() + data + b"\r\n")
        return self.sock.recv(1024).decode().strip()

    def set(self, key: str, value: str, flags: int = 0, exptime: int = 0):
        data = value.encode()
        cmd = f"set {key} {flags} {exptime} {len(data)}\r\n"
        self.sock.sendall(cmd.encode() + data + b"\r\n")
        self.sock.recv(1024)

def cas_update(client: CASClient, key: str, update_fn, max_retries: int = 5):
    """带重试的 CAS 更新"""
    for attempt in range(max_retries):
        value, cas_unique = client.gets(key)
        if value is None:
            print(f"Key '{key}' 不存在")
            return False

        # 应用更新函数
        new_value = update_fn(value)

        # 尝试 CAS 写入
        result = client.cas(key, new_value, cas_unique)
        if result == "STORED":
            print(f"CAS 更新成功(第 {attempt + 1} 次尝试)")
            return True
        elif result == "EXISTS":
            print(f"版本冲突,重试...(第 {attempt + 1} 次)")
            continue
        else:
            print(f"CAS 失败: {result}")
            return False

    print(f"超过最大重试次数 ({max_retries})")
    return False

# 使用示例
client = CASClient()
client.set("counter", '{"count": 0}')

def increment_counter(value_str: str) -> str:
    data = json.loads(value_str)
    data["count"] += 1
    return json.dumps(data)

cas_update(client, "counter", increment_counter)

# 读取最终值
value, _ = client.gets("counter")
print(f"最终值: {value}")

4.7 FLAGS 详解

flags 是一个 16 位无符号整数(0-65535),Memcached 服务端不解释这个字段,完全由客户端定义含义。

常见用法

用途说明
0-7数据类型标识标记存储的数据类型
8-15压缩标识标记数据是否被压缩

典型客户端定义

# 常见 flags 定义
FLAGS_RAW      = 0x00  # 原始字符串
FLAGS_JSON     = 0x01  # JSON 对象
FLAGS_PICKLE   = 0x02  # Python pickle
FLAGS_INT      = 0x03  # 整数
FLAGS_GZIP     = 0x10  # GZIP 压缩标志(可与类型组合)

def get_type_flags(flags: int) -> int:
    return flags & 0x0F

def is_compressed(flags: int) -> bool:
    return (flags & 0x10) != 0

# 存储 JSON 数据(带类型标识)
import json

def set_json(sock, key: str, obj: dict, ttl: int = 300):
    data = json.dumps(obj).encode()
    cmd = f"set {key} {FLAGS_JSON} {ttl} {len(data)}\r\n"
    sock.sendall(cmd.encode() + data + b"\r\n")
    return sock.recv(1024).decode().strip() == "STORED"

def get_json(sock, key: str):
    sock.sendall(f"get {key}\r\n".encode())
    resp = sock.recv(65536).decode()
    lines = resp.strip().split("\r\n")
    if lines[0].startswith("VALUE"):
        parts = lines[0].split()
        flags = int(parts[2])
        data = lines[1]
        if get_type_flags(flags) == FLAGS_JSON:
            return json.loads(data)
        return data
    return None

压缩标志示例

import gzip
import json

def set_compressed_json(sock, key: str, obj: dict, ttl: int = 300):
    raw = json.dumps(obj).encode()
    compressed = gzip.compress(raw)

    # 仅在压缩后更小时使用压缩
    if len(compressed) < len(raw):
        data = compressed
        flags = FLAGS_JSON | FLAGS_GZIP
    else:
        data = raw
        flags = FLAGS_JSON

    cmd = f"set {key} {flags} {ttl} {len(data)}\r\n"
    sock.sendall(cmd.encode() + data + b"\r\n")
    return sock.recv(1024).decode().strip() == "STORED"

def get_compressed_json(sock, key: str):
    sock.sendall(f"get {key}\r\n".encode())
    resp = sock.recv(65536).decode()
    lines = resp.strip().split("\r\n")
    if lines[0].startswith("VALUE"):
        parts = lines[0].split()
        flags = int(parts[2])
        data = lines[1].encode()
        if is_compressed(flags):
            data = gzip.decompress(data)
        if get_type_flags(flags) == FLAGS_JSON:
            return json.loads(data)
        return data.decode()
    return None

4.8 TTL (Time-To-Live) 深入

TTL 处理流程

写入时设置 exptime
    │
    ▼
┌───────────────────────────────┐
│ exptime == 0                  │──▶ 永不过期
│ exptime 1-2592000 (30天)     │──▶ 当前时间 + exptime
│ exptime > 2592000            │──▶ 视为 Unix 时间戳
└───────────────────────────────┘
    │
    ▼
Item 被访问时检查是否过期
    │
    ├── 未过期 → 正常返回
    └── 已过期 → 返回 NOT_FOUND(惰性删除)

惰性删除机制

Memcached 不会主动扫描过期 item。过期检查发生在:

  1. get 访问时:检查是否过期,过期则返回 NOT_FOUND
  2. LRU 淘汰时:优先跳过已过期的 item
  3. flush_all 时:标记所有 item 为过期
set shortlived 0 2 5
hello
STORED

# 立即获取(存在)
get shortlived
VALUE shortlived 0 5
hello
END

# 等待 3 秒后获取(已过期)
get shortlived
END

TTL 最佳实践

场景建议 TTL原因
用户 Session1800 (30分钟)安全性 + 资源回收
热点数据300-3600平衡新鲜度和命中率
配置数据86400 (1天)配置变化不频繁
计数器0 或 86400根据业务重置周期
临时锁30防止死锁

4.9 存储命令内部流程

set 命令为例,Memcached 内部处理流程如下:

收到 set 命令
    │
    ▼
解析命令行(key, flags, exptime, length)
    │
    ▼
检查 key 长度 ≤ 250 字节
    │
    ├── 超长 → CLIENT_ERROR
    └── 正常 ↓
读取 length 字节的数据体
    │
    ▼
计算 item 所需空间(key + value + 元数据)
    │
    ▼
从对应 Slab Class 分配内存
    │
    ├── 成功 ↓
    └── 失败 → 尝试 LRU 淘汰
         │
         ├── 淘汰成功 → 重试分配
         └── 无 item 可淘汰 → SERVER_ERROR out of memory
    │
    ▼
在哈希表中插入/更新 item
    │
    ▼
返回 STORED

4.10 客户端实现完整示例

#!/usr/bin/env python3
"""storage_client.py — 完整的存储命令客户端实现"""

import socket
from typing import Optional

class MemcachedStorageClient:
    def __init__(self, host: str = '127.0.0.1', port: int = 11211):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.connect((host, port))

    def _send_store(self, cmd: str, data: bytes, noreply: bool = False) -> str:
        nr = " noreply" if noreply else ""
        full_cmd = f"{cmd}{nr}\r\n"
        self.sock.sendall(full_cmd.encode() + data + b"\r\n")
        if noreply:
            return "NOREPLY"
        return self.sock.recv(1024).decode().strip()

    def set(self, key: str, value: bytes, flags: int = 0,
            exptime: int = 0, noreply: bool = False) -> str:
        cmd = f"set {key} {flags} {exptime} {len(value)}"
        return self._send_store(cmd, value, noreply)

    def add(self, key: str, value: bytes, flags: int = 0,
            exptime: int = 0, noreply: bool = False) -> str:
        cmd = f"add {key} {flags} {exptime} {len(value)}"
        return self._send_store(cmd, value, noreply)

    def replace(self, key: str, value: bytes, flags: int = 0,
                exptime: int = 0, noreply: bool = False) -> str:
        cmd = f"replace {key} {flags} {exptime} {len(value)}"
        return self._send_store(cmd, value, noreply)

    def append(self, key: str, value: bytes, noreply: bool = False) -> str:
        cmd = f"append {key} 0 0 {len(value)}"
        return self._send_store(cmd, value, noreply)

    def prepend(self, key: str, value: bytes, noreply: bool = False) -> str:
        cmd = f"prepend {key} 0 0 {len(value)}"
        return self._send_store(cmd, value, noreply)

    def cas(self, key: str, value: bytes, cas_unique: int,
            flags: int = 0, exptime: int = 0, noreply: bool = False) -> str:
        cmd = f"cas {key} {flags} {exptime} {len(value)} {cas_unique}"
        return self._send_store(cmd, value, noreply)

    def close(self):
        self.sock.sendall(b"quit\r\n")
        self.sock.close()


# 测试
client = MemcachedStorageClient()

print(client.set("test:1", b"hello"))
# STORED

print(client.add("test:2", b"world"))
# STORED

print(client.add("test:2", b"again"))
# NOT_STORED

print(client.replace("test:1", b"updated"))
# STORED

print(client.append("test:1", b"!"))
# STORED

client.close()

4.11 注意事项

编号注意事项说明
1length 必须精确数据体字节数必须与 length 完全一致,否则 CLIENT_ERROR
2append/prepend 不修改 flags/expire仅修改数据内容
3CAS 唯一标识会变化每次写入(包括 set/add/replace)都会更新 cas_unique
4零长度值length=0 是合法的,会存储一个空值
5flags 客户端自定义服务端原样存储和返回,不做解释

4.12 扩展阅读


上一章: 第03章 核心命令总览 下一章: 第05章 检索命令深入 — 深入解析 get/gets 的工作机制与最佳实践。