强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

Memcached 传输协议精讲 / 第02章 文本协议详解

第02章 文本协议详解

文本协议是 Memcached 最基础也最常用的通信方式,掌握它是理解一切后续内容的前提。


2.1 协议基础规则

行终止符

Memcached 文本协议使用 \r\n(CRLF)作为行终止符,与 HTTP 协议一致。

命令行\r\n
数据行\r\n

注意: 这是最常见的客户端实现错误来源。使用 \n 而非 \r\n 会导致协议解析失败。

字符编码

部分编码
命令行ASCII
KeyASCII,最长 250 字节
数据体(Data)原始字节流,不强制要求编码

数据体(data block)可以是任意二进制数据,长度由命令行中的 <length> 字段精确指定。

通用命令行格式

大多数命令遵循以下模式:

<command> <key> <flags> <exptime> <length> [noreply]\r\n
<data block>\r\n
字段类型说明
commandstring命令名称
keystring缓存键,最长 250 字节
flagsuint16客户端自定义标志位
exptimeint过期时间(秒),0=永不过期
lengthint数据块的字节长度
noreplystring可选,设置后服务端不返回响应

2.2 存储命令语法

SET 命令

set <key> <flags> <exptime> <length> [noreply]\r\n
<data block>\r\n

完整示例

# 存储一个用户对象(flags=0, TTL=3600秒, 数据长度=13字节)
set user:1001 0 3600 13
{"name":"Bob"}

服务端响应

STORED\r\n          # 成功
NOT_STORED\r\n      # 存储失败(如条件命令不满足)
ERROR\r\n           # 未知命令
CLIENT_ERROR ...\r\n # 客户端错误
SERVER_ERROR ...\r\n # 服务端错误

其他存储命令对比

命令语法语义
setset <key> <flags> <exptime> <length>无条件存储(存在则覆盖)
addadd <key> <flags> <exptime> <length>仅当 key 不存在时存储
replacereplace <key> <flags> <exptime> <length>仅当 key 已存在时存储
appendappend <key> <flags> <exptime> <length>在已有值的末尾追加数据
prependprepend <key> <flags> <exptime> <length>在已有值的开头插入数据
cascas <key> <flags> <exptime> <length> <cas_unique>CAS 乐观锁写入

2.3 检索命令语法

GET 命令

get <key>*\r\n

支持一个或多个 key(空格分隔),返回所有存在的 key 及其值。

单 key 示例

get user:1001\r\n

多 key 示例

get user:1001 user:1002 session:abc\r\n

响应格式

VALUE <key> <flags> <length> [cas <cas_unique>]\r\n
<data block>\r\n
...
END\r\n

注意: cas 字段仅在使用 gets 命令时返回。

GETS 命令

gets <key>*\r\n

get 相同,但每个返回值附带 cas_unique 标识符,用于后续 CAS 操作。

响应示例

VALUE user:1001 0 13 12345
{"name":"Bob"}
END

2.4 删除命令语法

DELETE 命令

delete <key> [noreply]\r\n

响应

DELETED\r\n         # 成功删除
NOT_FOUND\r\n       # key 不存在

示例

echo "delete user:1001" | nc 127.0.0.1 11211
# DELETED

2.5 递增递减命令语法

INCR / DECR 命令

incr <key> <value> [noreply]\r\n
decr <key> <value> [noreply]\r\n
参数说明
key目标键(值必须为十进制数字字符串)
value递增/递减的步长(无符号 64 位整数)

响应

<number>\r\n        # 操作后的新值
NOT_FOUND\r\n       # key 不存在

重要规则

  • 若 key 不存在,返回 NOT_FOUND不会自动创建
  • 值溢出时行为未定义(不回绕到 0)
  • DECR 结果下限为 0,不会变为负数

示例

# 设置计数器
set view_count 0 0 1
0
STORED

# 递增
incr view_count 1
1

incr view_count 10
11

# 递减
decr view_count 5
6

2.6 其他管理命令

VERSION

version\r\n

响应:

VERSION 1.6.22\r\n

FLUSH_ALL

flush_all [delay]\r\n
参数说明
无参数立即清除所有缓存
delay延迟 N 秒后清除

响应:

OK\r\n

注意: flush_all 是全局操作,在生产环境中慎用!建议使用前加 delay 参数给予缓冲时间。

QUIT

quit\r\n

关闭当前连接,无响应。

VERBOSITY

verbosity <level> [noreply]\r\n

设置日志详细级别(0-4),级别越高输出越多。


2.7 Noreply 模式

在命令末尾添加 noreply 参数可指示服务端不返回响应,适用于对可靠性要求不高的写操作。

# 正常模式
set key1 0 0 5
hello
STORED

# noreply 模式(无响应)
set key2 0 0 5 noreply
hello

支持 noreply 的命令

命令支持 noreply
set
add
replace
append
prepend
cas
delete
incr
decr
flush_all
get / gets

使用场景

import socket

def batch_set_noreply(sock, items: dict, ttl: int = 300):
    """批量写入,使用 noreply 提升吞吐量"""
    for key, value in items.items():
        data = value.encode() if isinstance(value, str) else value
        cmd = f"set {key} 0 {ttl} {len(data)} noreply\r\n"
        sock.sendall(cmd.encode() + data + b"\r\n")
    # 不需要等待响应,直接返回

# 使用
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('127.0.0.1', 11211))
batch_set_noreply(sock, {
    "cache:a": "value_a",
    "cache:b": "value_b",
    "cache:c": "value_c",
})

注意: 使用 noreply 时无法得知操作是否成功。在需要确认的场景下,不要使用 noreply。


2.8 响应格式详解

存储命令响应

响应含义
STORED存储成功
NOT_STORED存储失败(add/replace 条件不满足,或数据长度为 0)
EXISTSCAS 操作失败:key 已被修改
NOT_FOUNDCAS 操作失败:key 不存在

检索命令响应

VALUE <key> <flags> <bytes> [cas <cas_unique>]\r\n
<data block>\r\n
VALUE <key2> <flags2> <bytes2> [cas <cas_unique2>]\r\n
<data block2>\r\n
END\r\n
  • 每个存在的 key 返回一个 VALUE
  • 不存在的 key 被静默忽略
  • 最后以 END 结束

错误响应

响应类型格式说明
通用错误ERROR\r\n未知命令
客户端错误CLIENT_ERROR <message>\r\n请求格式错误
服务端错误SERVER_ERROR <message>\r\n服务端内部错误

常见错误消息

错误消息原因
CLIENT_ERROR bad command line format命令行格式错误
CLIENT_ERROR bad data chunk数据块长度与声明不一致
CLIENT_ERROR bad command line format: invalid exptimeTTL 不是有效数字
CLIENT_ERROR bad command line format: invalid flagsflags 不是有效数字
SERVER_ERROR out of memory内存不足
SERVER_ERROR temporary failure临时性错误

2.9 完整协议解析器实现

以下是一个简化但可用的文本协议解析器:

#!/usr/bin/env python3
"""memcached_text_parser.py — Memcached 文本协议解析器"""

from dataclasses import dataclass
from typing import List, Optional, Union
from enum import Enum
import socket

class ResponseType(Enum):
    STORED = "STORED"
    NOT_STORED = "NOT_STORED"
    EXISTS = "EXISTS"
    NOT_FOUND = "NOT_FOUND"
    DELETED = "DELETED"
    ERROR = "ERROR"
    VALUE = "VALUE"
    END = "END"

@dataclass
class ValueResponse:
    key: str
    flags: int
    cas_unique: Optional[int]
    data: bytes

@dataclass
class Response:
    type: ResponseType
    values: List[ValueResponse]
    message: str = ""

class MemcachedTextParser:
    """Memcached 文本协议解析器"""

    def __init__(self):
        self.buffer = b""

    def feed(self, data: bytes):
        self.buffer += data

    def parse_response(self, command_type: str = "get") -> Optional[Response]:
        """尝试从 buffer 中解析完整响应"""
        lines = self.buffer.split(b"\r\n")

        if command_type in ("get", "gets"):
            return self._parse_value_response(lines)
        elif command_type in ("set", "add", "replace", "append", "prepend", "cas"):
            return self._parse_store_response(lines)
        elif command_type == "delete":
            return self._parse_simple_response(lines)
        elif command_type in ("incr", "decr"):
            return self._parse_counter_response(lines)
        return None

    def _parse_value_response(self, lines: list) -> Optional[Response]:
        """解析 VALUE 响应"""
        if b"END" not in lines:
            return None  # 数据不完整

        values = []
        i = 0
        while i < len(lines):
            line = lines[i]
            if line == b"END":
                return Response(type=ResponseType.END, values=values)
            if line.startswith(b"VALUE "):
                parts = line.decode().split()
                key = parts[1]
                flags = int(parts[2])
                length = int(parts[3])
                cas = int(parts[5]) if len(parts) > 5 and parts[4] == "cas" else None

                i += 1
                if i < len(lines):
                    data = lines[i]
                    values.append(ValueResponse(
                        key=key, flags=flags,
                        cas_unique=cas, data=data
                    ))
            i += 1

        return None  # 数据不完整

    def _parse_store_response(self, lines: list) -> Optional[Response]:
        """解析存储命令响应"""
        if len(lines) < 2:
            return None
        first_line = lines[0]
        if first_line == b"STORED":
            return Response(type=ResponseType.STORED, values=[])
        elif first_line == b"NOT_STORED":
            return Response(type=ResponseType.NOT_STORED, values=[])
        elif first_line == b"EXISTS":
            return Response(type=ResponseType.EXISTS, values=[])
        elif first_line == b"NOT_FOUND":
            return Response(type=ResponseType.NOT_FOUND, values=[])
        elif first_line.startswith(b"CLIENT_ERROR"):
            return Response(type=ResponseType.ERROR,
                          values=[], message=first_line.decode())
        elif first_line.startswith(b"SERVER_ERROR"):
            return Response(type=ResponseType.ERROR,
                          values=[], message=first_line.decode())
        elif first_line == b"ERROR":
            return Response(type=ResponseType.ERROR, values=[])
        return None

    def _parse_simple_response(self, lines: list) -> Optional[Response]:
        """解析简单响应"""
        if len(lines) < 2:
            return None
        line = lines[0]
        if line == b"DELETED":
            return Response(type=ResponseType.DELETED, values=[])
        elif line == b"NOT_FOUND":
            return Response(type=ResponseType.NOT_FOUND, values=[])
        return None

    def _parse_counter_response(self, lines: list) -> Optional[Response]:
        """解析计数器响应"""
        if len(lines) < 2:
            return None
        line = lines[0]
        if line == b"NOT_FOUND":
            return Response(type=ResponseType.NOT_FOUND, values=[])
        try:
            value = int(line)
            return Response(type=ResponseType.STORED,
                          values=[], message=str(value))
        except ValueError:
            return None


# 测试
parser = MemcachedTextParser()
parser.feed(b"VALUE user:1001 0 13\r\n")
parser.feed(b'{"name":"Bob"}\r\n')
parser.feed(b"END\r\n")

result = parser.parse_response("get")
if result and result.values:
    print(f"Key: {result.values[0].key}")
    print(f"Data: {result.values[0].data.decode()}")

2.10 业务场景

场景一:Session 会话管理

import json
import uuid
import socket

class SessionStore:
    def __init__(self, host='127.0.0.1', port=11211):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.connect((host, port))

    def create_session(self, user_data: dict, ttl: int = 1800) -> str:
        session_id = str(uuid.uuid4())
        data = json.dumps(user_data).encode()
        self.sock.sendall(
            f"set session:{session_id} 0 {ttl} {len(data)}\r\n".encode()
            + data + b"\r\n"
        )
        self.sock.recv(1024)
        return session_id

    def get_session(self, session_id: str) -> dict:
        self.sock.sendall(f"get session:{session_id}\r\n".encode())
        resp = self.sock.recv(65536).decode()
        lines = resp.strip().split("\r\n")
        if lines[0].startswith("VALUE"):
            return json.loads(lines[1])
        return None

    def destroy_session(self, session_id: str) -> bool:
        self.sock.sendall(f"delete session:{session_id}\r\n".encode())
        return b"DELETED" in self.sock.recv(1024)

# 使用
store = SessionStore()
sid = store.create_session({"user_id": 1001, "role": "admin"})
print(f"Session ID: {sid}")
print(f"Session Data: {store.get_session(sid)}")

场景二:商品库存扣减(原子操作)

def decrement_stock(sock, product_id: str, quantity: int) -> bool:
    """
    使用 DECR 原子扣减库存。
    注意:此方案要求库存值在缓存中初始化。
    """
    key = f"stock:{product_id}"
    sock.sendall(f"decr {key} {quantity}\r\n".encode())
    resp = sock.recv(1024).decode().strip()

    if resp == "NOT_FOUND":
        return False  # 库存键不存在

    remaining = int(resp)
    if remaining < 0:
        # 库存不足,回滚
        sock.sendall(f"incr {key} {quantity}\r\n".encode())
        sock.recv(1024)
        return False

    return True  # 扣减成功

2.11 注意事项

编号注意事项说明
1CRLF 行终止必须使用 \r\n,不要用 \n
2数据长度精确匹配<length> 必须与数据块的字节数完全一致
3Key 不能包含空白Key 中不能有空格、\r\n
4noreply 不可靠服务端仍可能因错误关闭连接
5管道化有序性响应顺序与请求顺序一致
6编码问题数据体是字节流,不要混用不同编码
7最大 Value 大小默认 1MB,可通过 -I 参数调整

2.12 扩展阅读


上一章: 第01章 Memcached 协议概述 下一章: 第03章 核心命令总览 — 系统梳理所有 Memcached 文本协议命令。