强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

Memcached 传输协议精讲 / 第07章 二进制协议命令

第07章 二进制协议命令

本章详细讲解每个 Memcached 操作在二进制协议中的具体帧格式。


7.1 GET / GETQ / GETK / GETKQ

四种获取命令的区别

命令Opcode说明
GET0x00标准获取
GETQ0x09静默获取(key 不存在时不返回响应)
GETK0x0C获取 + 返回 key
GETKQ0x0D静默获取 + 返回 key

GET 请求帧

Header (24 bytes):
  Magic:      0x80
  Opcode:     0x00
  Key Length: len(key)
  Extras:     0
  Body Length: len(key)

Body:
  Key:        <key bytes>

GET 响应帧

Header (24 bytes):
  Magic:      0x81
  Opcode:     0x00
  Key Length:  0 (GET) 或 len(key) (GETK)
  Extras:     4 (flags)
  Status:     0x0000 (成功) 或 0x0001 (未找到)
  Body Length: 4 + len(key) + len(value)
  CAS:        <cas_unique>

Body:
  Extras:     <flags: uint32>
  Key:        <key bytes> (GETK/GETKQ)
  Value:      <value bytes>

Python 实现

import struct
import socket

def binary_get(sock, key: str, getk: bool = False):
    """二进制 GET 命令"""
    key_bytes = key.encode()
    opcode = 0x0C if getk else 0x00

    header = struct.pack(
        "!BBHBBHIIQ",
        0x80,           # magic
        opcode,         # opcode
        len(key_bytes), # key length
        0,              # extras length
        0x00,           # data type
        0,              # reserved
        len(key_bytes), # body length
        1,              # opaque
        0               # cas
    )

    sock.sendall(header + key_bytes)

    # 读取响应头
    resp_header = _recv_exact(sock, 24)
    (_, opcode, key_len, extras_len, _, status, body_len, opaque, cas) = \
        struct.unpack("!BBHBBHIIQ", resp_header)

    # 读取响应体
    body = _recv_exact(sock, body_len)

    if status == 0x0001:  # Key Not Found
        return None

    flags = struct.unpack(">I", body[:extras_len])[0]
    value = body[extras_len + key_len:]

    return {
        'value': value,
        'flags': flags,
        'cas': cas,
        'opaque': opaque
    }

def _recv_exact(sock, n: int) -> bytes:
    data = b""
    while len(data) < n:
        chunk = sock.recv(n - len(data))
        if not chunk:
            raise ConnectionError("Connection closed")
        data += chunk
    return data

# GETQ — 静默获取
# 当 key 不存在时,不返回任何响应
# 适用于管道化请求:发送多个 GETQ,只有存在的 key 才返回响应

def binary_get_multi(sock, keys: list[str]) -> dict:
    """使用 GETKQ 批量获取"""
    for i, key in enumerate(keys):
        key_bytes = key.encode()
        header = struct.pack(
            "!BBHBBHIIQ",
            0x80, 0x0D, len(key_bytes), 0, 0x00, 0, len(key_bytes), i, 0
        )
        sock.sendall(header + key_bytes)

    # 发送 NOOP 作为结束标记
    noop_header = struct.pack(
        "!BBHBBHIIQ",
        0x80, 0x0A, 0, 0, 0x00, 0, 0, len(keys), 0
    )
    sock.sendall(noop_header)

    # 读取响应直到 NOOP 响应
    result = {}
    while True:
        resp_header = _recv_exact(sock, 24)
        (_, opcode, key_len, extras_len, _, status, body_len, opaque, cas) = \
            struct.unpack("!BBHBBHIIQ", resp_header)

        if opcode == 0x0A:  # NOOP
            break

        body = _recv_exact(sock, body_len)
        if status == 0x0000:
            flags = struct.unpack(">I", body[:extras_len])[0]
            key = body[extras_len:extras_len + key_len].decode()
            value = body[extras_len + key_len:]
            result[key] = {'value': value, 'flags': flags, 'cas': cas}

    return result

7.2 SET / ADD / REPLACE

Extras 字段

存储命令的 Extras 固定为 8 字节:

 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                         Flags (uint32)                        |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                       Expiration (uint32)                     |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

SET 请求帧

Header:
  Magic:      0x80
  Opcode:     0x01
  Key Length: len(key)
  Extras:     8
  Body Length: 8 + len(key) + len(value)
  CAS:        0 (新写入) 或 cas_unique (CAS更新)

Body:
  Extras:     <flags: uint32> <exptime: uint32>
  Key:        <key bytes>
  Value:      <value bytes>

SET 响应帧

Header:
  Magic:      0x81
  Opcode:     0x01
  Status:     0x0000 (成功)
  CAS:        <new cas_unique>

Body:         (空)

完整实现

def binary_set(sock, key: str, value: bytes, flags: int = 0,
               exptime: int = 0, cas: int = 0,
               opcode: int = 0x01) -> dict:
    """
    二进制 SET/ADD/REPLACE 命令
    opcode: 0x01=SET, 0x02=ADD, 0x03=REPLACE
    """
    key_bytes = key.encode()
    extras = struct.pack(">II", flags, exptime)
    body_length = len(extras) + len(key_bytes) + len(value)

    header = struct.pack(
        "!BBHBBHIIQ",
        0x80,           # magic
        opcode,         # opcode
        len(key_bytes), # key length
        len(extras),    # extras length
        0x00,           # data type
        0,              # reserved
        body_length,    # body length
        0,              # opaque
        cas             # cas
    )

    sock.sendall(header + extras + key_bytes + value)

    # 读取响应
    resp_header = _recv_exact(sock, 24)
    (_, _, _, _, _, status, _, _, new_cas) = \
        struct.unpack("!BBHBBHIIQ", resp_header)

    return {
        'success': status == 0x0000,
        'status': status,
        'cas': new_cas
    }

操作码对比

操作OpcodeKey 不存在Key 已存在
SET0x01创建覆盖
ADD0x02创建0x0005 Not Stored
REPLACE0x030x0001 Not Found覆盖

7.3 DELETE

DELETE 请求帧

Header:
  Magic:      0x80
  Opcode:     0x04
  Key Length: len(key)
  Extras:     0
  Body Length: len(key)

Body:
  Key:        <key bytes>

DELETE 响应帧

Header:
  Magic:      0x81
  Opcode:     0x04
  Status:     0x0000 (成功) 或 0x0001 (未找到)

Body:         (空)

实现

def binary_delete(sock, key: str) -> bool:
    key_bytes = key.encode()
    header = struct.pack(
        "!BBHBBHIIQ",
        0x80, 0x04, len(key_bytes), 0, 0x00, 0, len(key_bytes), 0, 0
    )
    sock.sendall(header + key_bytes)

    resp_header = _recv_exact(sock, 24)
    (_, _, _, _, _, status, _, _, _) = \
        struct.unpack("!BBHBBHIIQ", resp_header)

    return status == 0x0000

7.4 INCREMENT / DECREMENT

Extras 字段

 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                    Delta (uint64, 8 bytes)                     |
|                                                               |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                 Initial Value (uint64, 8 bytes)                |
|                                                               |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                    Expiration (uint32)                         |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  • Delta: 递增/递减的步长
  • Initial Value: key 不存在时的初始值
  • Expiration: 初始创建时的过期时间

响应

Body:
  <result: uint64>  — 递增/递减后的值

实现

def binary_incr_decr(sock, key: str, delta: int, initial: int = 0,
                     exptime: int = 0, incr: bool = True) -> int | None:
    key_bytes = key.encode()
    extras = struct.pack(">QQI", delta, initial, exptime)
    opcode = 0x05 if incr else 0x06

    header = struct.pack(
        "!BBHBBHIIQ",
        0x80, opcode, len(key_bytes), len(extras), 0x00, 0,
        len(extras) + len(key_bytes), 0, 0
    )

    sock.sendall(header + extras + key_bytes)

    resp_header = _recv_exact(sock, 24)
    (_, _, _, _, _, status, body_len, _, _) = \
        struct.unpack("!BBHBBHIIQ", resp_header)

    if status == 0x0001:  # Not Found
        return None

    body = _recv_exact(sock, body_len)
    result = struct.unpack(">Q", body)[0]
    return result

7.5 APPEND / PREPEND

APPEND 请求帧

Header:
  Magic:      0x80
  Opcode:     0x0E (Append) 或 0x0F (Prepend)
  Key Length: len(key)
  Extras:     0
  Body Length: len(key) + len(value)

Body:
  Key:        <key bytes>
  Value:      <value bytes>  — 要追加/前插的数据

实现

def binary_append(sock, key: str, value: bytes, prepend: bool = False) -> bool:
    key_bytes = key.encode()
    opcode = 0x0F if prepend else 0x0E

    header = struct.pack(
        "!BBHBBHIIQ",
        0x80, opcode, len(key_bytes), 0, 0x00, 0,
        len(key_bytes) + len(value), 0, 0
    )

    sock.sendall(header + key_bytes + value)

    resp_header = _recv_exact(sock, 24)
    (_, _, _, _, _, status, _, _, _) = \
        struct.unpack("!BBHBBHIIQ", resp_header)

    return status == 0x0000

7.6 VERSION

VERSION 请求帧

Header:
  Magic:      0x80
  Opcode:     0x0B
  Key Length: 0
  Extras:     0
  Body Length: 0

Body:         (空)

VERSION 响应帧

Header:
  Magic:      0x81
  Opcode:     0x0B
  Status:     0x0000
  Body Length: len(version_string)

Body:
  Value:      <version string bytes>

7.7 FLUSH

FLUSH 请求帧

Header:
  Magic:      0x80
  Opcode:     0x08
  Key Length: 0
  Extras:     0 或 4
  Body Length: 0 或 4

Body:
  Extras:     <delay: uint32>  (可选,延迟秒数)

实现

def binary_flush(sock, delay: int = 0) -> bool:
    if delay > 0:
        extras = struct.pack(">I", delay)
        header = struct.pack(
            "!BBHBBHIIQ",
            0x80, 0x08, 0, 4, 0x00, 0, 4, 0, 0
        )
        sock.sendall(header + extras)
    else:
        header = struct.pack(
            "!BBHBBHIIQ",
            0x80, 0x08, 0, 0, 0x00, 0, 0, 0, 0
        )
        sock.sendall(header)

    resp_header = _recv_exact(sock, 24)
    (_, _, _, _, _, status, _, _, _) = \
        struct.unpack("!BBHBBHIIQ", resp_header)

    return status == 0x0000

7.8 STAT

STAT 请求帧

Header:
  Magic:      0x80
  Opcode:     0x10
  Key Length: 0 或 len(key)
  Extras:     0
  Body Length: 0 或 len(key)

Body:
  Key:        <stat key>  (可选,指定特定统计项)

STAT 响应

服务端返回多个响应帧,每帧包含一个统计键值对:

# 每个统计项一个响应帧
Header:
  Magic:      0x81
  Opcode:     0x10
  Key Length: len(stat_name)
  Body Length: len(stat_name) + len(stat_value)
  Status:     0x0000

Body:
  Key:        <stat name>
  Value:      <stat value>

最后一个响应帧的 Key Length = 0,Body Length = 0,表示统计结束。

实现

def binary_stats(sock) -> dict[str, str]:
    header = struct.pack(
        "!BBHBBHIIQ",
        0x80, 0x10, 0, 0, 0x00, 0, 0, 0, 0
    )
    sock.sendall(header)

    result = {}
    while True:
        resp_header = _recv_exact(sock, 24)
        (_, _, key_len, _, _, status, body_len, _, _) = \
            struct.unpack("!BBHBBHIIQ", resp_header)

        if key_len == 0 and body_len == 0:
            break

        body = _recv_exact(sock, body_len)
        key = body[:key_len].decode()
        value = body[key_len:].decode()
        result[key] = value

    return result

7.9 NOOP

NOOP 帧

Header:
  Magic:      0x80
  Opcode:     0x0A
  Key Length: 0
  Extras:     0
  Body Length: 0

Body:         (空)

用途

  1. 管道化边界标记: 在静默命令(Q 系列)之后发送 NOOP,用 NOOP 响应标记所有静默响应的结束
  2. 心跳检测: 检测连接是否存活
def binary_noop(sock) -> bool:
    header = struct.pack(
        "!BBHBBHIIQ",
        0x80, 0x0A, 0, 0, 0x00, 0, 0, 0, 0
    )
    sock.sendall(header)

    resp_header = _recv_exact(sock, 24)
    (_, opcode, _, _, _, status, _, _, _) = \
        struct.unpack("!BBHBBHIIQ", resp_header)

    return opcode == 0x0A and status == 0x0000

7.10 Q 系列命令(静默命令)

概述

Q 系列命令(命令名带 Q 后缀)在操作成功时不返回响应,只在失败时返回。

命令Opcode成功时响应
SETQ0x11
ADDQ0x12
REPLACEQ0x13
DELETEQ0x14
INCREMENTQ0x15
DECREMENTQ0x16
GETQ0x09无(key 不存在时不返回)
GETKQ0x0D无(key 不存在时不返回)
APPENDQ0x19
PREPENDQ0x1A
FLUSHQ0x18
QUITQ0x17

管道化模式

def binary_multi_set(sock, items: list[tuple[str, bytes, int]]):
    """
    使用 SETQ 管道化批量写入
    items: [(key, value, exptime), ...]
    """
    for i, (key, value, exptime) in enumerate(items):
        key_bytes = key.encode()
        extras = struct.pack(">II", 0, exptime)
        body_len = len(extras) + len(key_bytes) + len(value)

        header = struct.pack(
            "!BBHBBHIIQ",
            0x80, 0x11, len(key_bytes), len(extras), 0x00, 0,
            body_len, i, 0
        )
        sock.sendall(header + extras + key_bytes + value)

    # 发送 NOOP 确认所有操作完成
    noop_header = struct.pack(
        "!BBHBBHIIQ",
        0x80, 0x0A, 0, 0, 0x00, 0, 0, len(items), 0
    )
    sock.sendall(noop_header)

    # 读取 NOOP 响应(所有 SETQ 成功则只有 NOOP 响应)
    while True:
        resp_header = _recv_exact(sock, 24)
        (_, opcode, _, _, _, _, _, opaque, _) = \
            struct.unpack("!BBHBBHIIQ", resp_header)

        if opcode == 0x0A:  # NOOP
            break

        # 如果收到非 NOOP 响应,说明某个 SETQ 失败
        body_len = struct.unpack("!I", resp_header[8:12])[0]
        if body_len > 0:
            _recv_exact(sock, body_len)

    return True

7.11 SASL 认证

SASL 机制列表请求

Header:
  Magic:      0x80
  Opcode:     0x20
  Body Length: 0

SASL 认证请求

Header:
  Magic:      0x80
  Opcode:     0x21
  Key Length: len(mechanism)
  Body Length: len(mechanism) + len(challenge)

Body:
  Key:        "PLAIN" (机制名称)
  Value:      <sasl challenge bytes>

PLAIN 认证示例

def binary_sasl_auth(sock, username: str, password: str) -> bool:
    """SASL PLAIN 认证"""
    # PLAIN 格式: \0username\0password
    challenge = b"\0" + username.encode() + b"\0" + password.encode()
    mechanism = b"PLAIN"

    header = struct.pack(
        "!BBHBBHIIQ",
        0x80, 0x21, len(mechanism), 0, 0x00, 0,
        len(mechanism) + len(challenge), 0, 0
    )
    sock.sendall(header + mechanism + challenge)

    resp_header = _recv_exact(sock, 24)
    (_, _, _, _, _, status, body_len, _, _) = \
        struct.unpack("!BBHBBHIIQ", resp_header)

    if body_len > 0:
        _recv_exact(sock, body_len)

    return status == 0x0000

7.12 完整二进制客户端

#!/usr/bin/env python3
"""full_binary_client.py — 完整的 Memcached 二进制协议客户端"""

import struct
import socket
from typing import Optional

class FullBinaryMemcachedClient:
    def __init__(self, host='127.0.0.1', port=11211):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.connect((host, port))
        self._opaque = 0

    def _next_opaque(self) -> int:
        self._opaque += 1
        return self._opaque

    def _send_recv(self, opcode: int, key: bytes = b"",
                   value: bytes = b"", extras: bytes = b"",
                   cas: int = 0) -> dict:
        body_len = len(extras) + len(key) + len(value)
        header = struct.pack(
            "!BBHBBHIIQ",
            0x80, opcode, len(key), len(extras), 0x00, 0,
            body_len, self._next_opaque(), cas
        )
        self.sock.sendall(header + extras + key + value)

        resp = _recv_exact(self.sock, 24)
        (_, opcode, key_len, extras_len, _, status, body_len, opaque, cas) = \
            struct.unpack("!BBHBBHIIQ", resp)

        body = _recv_exact(self.sock, body_len) if body_len > 0 else b""
        return {
            'opcode': opcode, 'status': status,
            'key': body[extras_len:extras_len + key_len],
            'value': body[extras_len + key_len:],
            'extras': body[:extras_len], 'cas': cas, 'opaque': opaque
        }

    def version(self) -> str:
        r = self._send_recv(0x0B)
        return r['value'].decode()

    def set(self, key: str, value: bytes, flags: int = 0,
            exptime: int = 0, cas: int = 0) -> dict:
        extras = struct.pack(">II", flags, exptime)
        return self._send_recv(0x01, key.encode(), value, extras, cas)

    def add(self, key: str, value: bytes, flags: int = 0,
            exptime: int = 0) -> dict:
        extras = struct.pack(">II", flags, exptime)
        return self._send_recv(0x02, key.encode(), value, extras)

    def replace(self, key: str, value: bytes, flags: int = 0,
                exptime: int = 0) -> dict:
        extras = struct.pack(">II", flags, exptime)
        return self._send_recv(0x03, key.encode(), value, extras)

    def get(self, key: str) -> Optional[tuple[bytes, int, int]]:
        r = self._send_recv(0x00, key.encode())
        if r['status'] == 0x0001:
            return None
        flags = struct.unpack(">I", r['extras'])[0] if r['extras'] else 0
        return r['value'], flags, r['cas']

    def delete(self, key: str) -> bool:
        r = self._send_recv(0x04, key.encode())
        return r['status'] == 0x0000

    def incr(self, key: str, delta: int, initial: int = 0,
             exptime: int = 0) -> Optional[int]:
        extras = struct.pack(">QQI", delta, initial, exptime)
        r = self._send_recv(0x05, key.encode(), extras=extras)
        if r['status'] == 0x0001:
            return None
        return struct.unpack(">Q", r['value'])[0]

    def decr(self, key: str, delta: int, initial: int = 0,
             exptime: int = 0) -> Optional[int]:
        extras = struct.pack(">QQI", delta, initial, exptime)
        r = self._send_recv(0x06, key.encode(), extras=extras)
        if r['status'] == 0x0001:
            return None
        return struct.unpack(">Q", r['value'])[0]

    def flush(self, delay: int = 0) -> bool:
        if delay > 0:
            extras = struct.pack(">I", delay)
            r = self._send_recv(0x08, extras=extras)
        else:
            r = self._send_recv(0x08)
        return r['status'] == 0x0000

    def noop(self) -> bool:
        r = self._send_recv(0x0A)
        return r['status'] == 0x0000

    def close(self):
        self._send_recv(0x07)  # QUIT
        self.sock.close()


# 测试
client = FullBinaryMemcachedClient()
print(f"Version: {client.version()}")

client.set("bin:test", b"hello binary", flags=0, exptime=300)
result = client.get("bin:test")
if result:
    print(f"Value: {result[0].decode()}, Flags: {result[1]}, CAS: {result[2]}")

print(f"Incr: {client.incr('bin:counter', 1, initial=100)}")
print(f"Incr: {client.incr('bin:counter', 1)}")

client.delete("bin:test")
client.delete("bin:counter")
client.close()

7.13 注意事项

编号注意事项说明
1Q 系列命令静默成功时不返回响应,需要 NOOP 标记结束
2INCR/DECR 初始值二进制协议支持 key 不存在时设置初始值(文本协议不支持)
3CAS 为 0 语义二进制协议中 CAS=0 表示"不做 CAS 检查"
4Extras 长度固定每个命令的 Extras 长度是固定的
5Body 长度验证extras_len + key_len + value_len == body_len

7.14 扩展阅读


上一章: 第06章 二进制协议基础 下一章: 第08章 Meta 协议 — 探索 Memcached 1.6+ 的 Meta 协议高级特性。