强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

MessagePack 序列化完全指南 / 08 - 流式处理 / Streaming

流式处理 / Streaming

本章讲解如何使用 MessagePack 进行流式消息处理,包括无边界消息拆分、多消息编解码、网络协议设计和 RPC 框架集成。

This chapter covers streaming message processing with MessagePack, including boundary-free message splitting, multi-message encoding/decoding, network protocol design, and RPC framework integration.


📖 为什么需要流式处理? / Why Streaming?

MessagePack 本身不包含**消息边界(Message Boundary)**信息。当多条消息通过同一连接传输时,接收端无法自动区分消息的起止位置。

MessagePack itself does not contain message boundary information. When multiple messages are transmitted over the same connection, the receiver cannot automatically determine where each message starts and ends.

┌────────────────────────────────────────────────────┐
│  TCP 流 / TCP Stream                               │
│                                                    │
│  [msg1 bytes][msg2 bytes][msg3 bytes]...           │
│       ↑          ↑          ↑                      │
│    如何区分?  如何区分?  如何区分?               │
└────────────────────────────────────────────────────┘

常见解决方案

方案原理适用场景
长度前缀每条消息前加 4 字节长度最常用,TCP/文件
分隔符特殊字节序列分割消息文本协议(不推荐二进制)
自定界格式消息本身可判断结束小消息场景
HTTP chunked利用 HTTP 分块传输Web API
WebSocket frameWebSocket 帧自带边界实时通信

📖 长度前缀协议 / Length-Prefixed Protocol

协议设计

最常用的方案是 Length-Prefixed Framing

┌──────────────────┬──────────────────────┐
│ 长度头 (4 字节)   │ 消息体 (N 字节)       │
│ Big-Endian uint32 │ MessagePack 数据      │
└──────────────────┴──────────────────────┘

多条消息:
[4B len][msg1 bytes][4B len][msg2 bytes][4B len][msg3 bytes]

Python 实现

import msgpack
import struct
import socket
import io

class LengthPrefixedProtocol:
    """长度前缀协议编解码器"""
    
    HEADER_SIZE = 4  # 4 字节长度头
    
    @staticmethod
    def pack(data: dict) -> bytes:
        """打包一条消息:4字节长度头 + msgpack 体"""
        body = msgpack.packb(data, use_bin_type=True)
        header = struct.pack(">I", len(body))
        return header + body
    
    @staticmethod
    def pack_multi(messages: list) -> bytes:
        """打包多条消息"""
        buf = io.BytesIO()
        for msg in messages:
            body = msgpack.packb(msg, use_bin_type=True)
            buf.write(struct.pack(">I", len(body)))
            buf.write(body)
        return buf.getvalue()
    
    @staticmethod
    def unpack_stream(stream) -> list:
        """从流中解包所有消息"""
        results = []
        while True:
            header = stream.read(LengthPrefixedProtocol.HEADER_SIZE)
            if len(header) < LengthPrefixedProtocol.HEADER_SIZE:
                break
            
            length = struct.unpack(">I", header)[0]
            body = stream.read(length)
            if len(body) < length:
                break
            
            results.append(msgpack.unpackb(body, raw=False))
        return results

# 使用示例
protocol = LengthPrefixedProtocol()

# 打包
data = protocol.pack({"type": "greeting", "msg": "hello"})
print(f"打包大小: {len(data)} bytes")  # 4 + N

# 流式解包
stream = io.BytesIO(data)
messages = protocol.unpack_stream(stream)
print(f"解包消息: {messages}")

Python 异步流式处理

import asyncio
import msgpack
import struct

class AsyncMsgPackStream:
    """异步 MessagePack 流处理器"""
    
    def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
        self.reader = reader
        self.writer = writer
    
    async def send(self, data: dict):
        """发送一条消息"""
        body = msgpack.packb(data, use_bin_type=True)
        header = struct.pack(">I", len(body))
        self.writer.write(header + body)
        await self.writer.drain()
    
    async def receive(self) -> dict:
        """接收一条消息"""
        header = await self.reader.readexactly(4)
        length = struct.unpack(">I", header)[0]
        body = await self.reader.readexactly(length)
        return msgpack.unpackb(body, raw=False)
    
    async def __aiter__(self):
        """异步迭代接收消息"""
        try:
            while True:
                yield await self.receive()
        except asyncio.IncompleteReadError:
            pass

# 服务端
async def handle_client(reader, writer):
    stream = AsyncMsgPackStream(reader, writer)
    
    async for msg in stream:
        print(f"收到: {msg}")
        await stream.send({"type": "ack", "id": msg.get("id")})

async def server():
    server = await asyncio.start_server(handle_client, "127.0.0.1", 8080)
    async with server:
        await server.serve_forever()

# 客户端
async def client():
    reader, writer = await asyncio.open_connection("127.0.0.1", 8080)
    stream = AsyncMsgPackStream(reader, writer)
    
    await stream.send({"type": "ping", "id": 1})
    response = await stream.receive()
    print(f"响应: {response}")
    
    writer.close()
    await writer.wait_closed()

Go 实现

package main

import (
    "bufio"
    "encoding/binary"
    "fmt"
    "io"
    "net"

    "github.com/vmihailenco/msgpack/v5"
)

// LengthPrefixedCodec 长度前缀编解码器
type LengthPrefixedCodec struct {
    conn    net.Conn
    reader  *bufio.Reader
    writer  *bufio.Writer
}

func NewLengthPrefixedCodec(conn net.Conn) *LengthPrefixedCodec {
    return &LengthPrefixedCodec{
        conn:   conn,
        reader: bufio.NewReader(conn),
        writer: bufio.NewWriter(conn),
    }
}

// Send 发送一条消息
func (c *LengthPrefixedCodec) Send(v interface{}) error {
    body, err := msgpack.Marshal(v)
    if err != nil {
        return err
    }

    // 写入 4 字节长度头
    header := make([]byte, 4)
    binary.BigEndian.PutUint32(header, uint32(len(body)))
    
    if _, err := c.writer.Write(header); err != nil {
        return err
    }
    if _, err := c.writer.Write(body); err != nil {
        return err
    }
    return c.writer.Flush()
}

// Receive 接收一条消息
func (c *LengthPrefixedCodec) Receive(v interface{}) error {
    // 读取 4 字节长度头
    header := make([]byte, 4)
    if _, err := io.ReadFull(c.reader, header); err != nil {
        return err
    }
    length := binary.BigEndian.Uint32(header)

    // 读取消息体
    body := make([]byte, length)
    if _, err := io.ReadFull(c.reader, body); err != nil {
        return err
    }

    return msgpack.Unmarshal(body, v)
}

// 消息类型定义
type Message struct {
    Type    string      `msgpack:"type"`
    ID      int         `msgpack:"id"`
    Payload interface{} `msgpack:"payload,omitempty"`
}

// 服务端示例
func startServer(addr string) error {
    ln, err := net.Listen("tcp", addr)
    if err != nil {
        return err
    }
    defer ln.Close()

    for {
        conn, err := ln.Accept()
        if err != nil {
            continue
        }
        go handleConnection(conn)
    }
}

func handleConnection(conn net.Conn) {
    defer conn.Close()
    codec := NewLengthPrefixedCodec(conn)

    for {
        var msg Message
        if err := codec.Receive(&msg); err != nil {
            if err == io.EOF {
                break
            }
            fmt.Printf("接收错误: %v\n", err)
            break
        }

        fmt.Printf("收到: %+v\n", msg)

        // 发送响应
        response := Message{Type: "ack", ID: msg.ID}
        if err := codec.Send(response); err != nil {
            fmt.Printf("发送错误: %v\n", err)
            break
        }
    }
}

JavaScript (Node.js) 实现

import { encode, decode } from "@msgpack/msgpack";
import net from "net";

class LengthPrefixedCodec {
  constructor(socket) {
    this.socket = socket;
    this.buffer = Buffer.alloc(0);
  }

  // 发送消息
  send(data) {
    const body = encode(data);
    const header = Buffer.alloc(4);
    header.writeUInt32BE(body.length, 0);
    this.socket.write(Buffer.concat([header, body]));
  }

  // 处理接收数据
  onData(callback) {
    this.socket.on("data", (chunk) => {
      this.buffer = Buffer.concat([this.buffer, chunk]);
      this._processBuffer(callback);
    });
  }

  _processBuffer(callback) {
    while (this.buffer.length >= 4) {
      const length = this.buffer.readUInt32BE(0);
      
      if (this.buffer.length < 4 + length) break; // 数据不完整
      
      const body = this.buffer.subarray(4, 4 + length);
      this.buffer = this.buffer.subarray(4 + length);
      
      const message = decode(body);
      callback(message);
    }
  }
}

// 服务端
const server = net.createServer((socket) => {
  const codec = new LengthPrefixedCodec(socket);
  
  codec.onData((msg) => {
    console.log("收到:", msg);
    codec.send({ type: "ack", id: msg.id });
  });
});

server.listen(8080, () => console.log("监听 8080"));

// 客户端
const client = net.createConnection(8080, () => {
  const codec = new LengthPrefixedCodec(client);
  codec.send({ type: "ping", id: 1 });
  
  codec.onData((msg) => {
    console.log("响应:", msg);
    client.end();
  });
});

📖 RPC 集成 / RPC Integration

简单 RPC 协议设计

请求格式:
┌──────────┬──────────┬──────────┬──────────┐
│ type (1B) │ id (4B)  │ method   │ params   │
│ 0x01=req  │ 请求ID   │ (msgpack)│ (msgpack)│
└──────────┴──────────┴──────────┴──────────┘

响应格式:
┌──────────┬──────────┬──────────┬──────────┐
│ type (1B) │ id (4B)  │ result   │ error    │
│ 0x02=res  │ 对应reqID│ (msgpack)│ (msgpack)│
└──────────┴──────────┴──────────┴──────────┘

Python RPC 实现

import msgpack
import struct
import asyncio
import uuid
from typing import Any, Callable

class MsgPackRPC:
    """基于 MessagePack 的简单 RPC 框架"""
    
    TYPE_REQUEST = 0x01
    TYPE_RESPONSE = 0x02
    TYPE_NOTIFICATION = 0x03
    
    def __init__(self):
        self.methods: dict[str, Callable] = {}
        self.pending: dict[int, asyncio.Future] = {}
    
    def register(self, name: str, func: Callable):
        """注册 RPC 方法"""
        self.methods[name] = func
    
    def _pack_message(self, msg_type: int, msg_id: int, payload: dict) -> bytes:
        body = msgpack.packb(payload, use_bin_type=True)
        header = struct.pack(">BI", msg_type, msg_id)
        length = struct.pack(">I", len(header) + len(body))
        return length + header + body
    
    async def send_request(self, writer, method: str, params: list) -> Any:
        """发送 RPC 请求并等待响应"""
        msg_id = int.from_bytes(uuid.uuid4().bytes[:4], 'big')
        payload = {"method": method, "params": params}
        
        future = asyncio.get_event_loop().create_future()
        self.pending[msg_id] = future
        
        data = self._pack_message(self.TYPE_REQUEST, msg_id, payload)
        writer.write(data)
        await writer.drain()
        
        return await future
    
    async def handle_connection(self, reader, writer):
        """处理入站连接"""
        while True:
            try:
                # 读取消息
                len_bytes = await reader.readexactly(4)
                length = struct.unpack(">I", len_bytes)[0]
                
                msg_bytes = await reader.readexactly(length)
                msg_type = msg_bytes[0]
                msg_id = struct.unpack(">I", msg_bytes[1:5])[0]
                payload = msgpack.unpackb(msg_bytes[5:], raw=False)
                
                if msg_type == self.TYPE_REQUEST:
                    # 处理请求
                    method = payload["method"]
                    params = payload.get("params", [])
                    
                    try:
                        if method in self.methods:
                            result = self.methods[method](*params)
                            response = {"result": result}
                        else:
                            response = {"error": f"未知方法: {method}"}
                    except Exception as e:
                        response = {"error": str(e)}
                    
                    data = self._pack_message(self.TYPE_RESPONSE, msg_id, response)
                    writer.write(data)
                    await writer.drain()
                
                elif msg_type == self.TYPE_RESPONSE:
                    # 处理响应
                    if msg_id in self.pending:
                        future = self.pending.pop(msg_id)
                        if "error" in payload:
                            future.set_exception(Exception(payload["error"]))
                        else:
                            future.set_result(payload.get("result"))
                            
            except asyncio.IncompleteReadError:
                break

# 使用示例
async def main():
    rpc = MsgPackRPC()
    
    # 注册方法
    rpc.register("add", lambda a, b: a + b)
    rpc.register("multiply", lambda a, b: a * b)
    rpc.register("greet", lambda name: f"你好, {name}!")
    
    # 启动服务
    server = await asyncio.start_server(rpc.handle_connection, "127.0.0.1", 8080)
    print("RPC 服务启动在 8080")
    
    async with server:
        await server.serve_forever()

asyncio.run(main())

Go RPC 实现

package main

import (
    "context"
    "encoding/binary"
    "fmt"
    "io"
    "net"
    "sync"

    "github.com/vmihailenco/msgpack/v5"
)

// MsgType 消息类型
type MsgType uint8

const (
    MsgRequest      MsgType = 0x01
    MsgResponse     MsgType = 0x02
    MsgNotification MsgType = 0x03
)

// RPCRequest 请求消息
type RPCRequest struct {
    Method string      `msgpack:"method"`
    Params interface{} `msgpack:"params"`
}

// RPCResponse 响应消息
type RPCResponse struct {
    Result interface{} `msgpack:"result,omitempty"`
    Error  string      `msgpack:"error,omitempty"`
}

// RPCServer RPC 服务器
type RPCServer struct {
    methods map[string]func(params interface{}) (interface{}, error)
    mu      sync.RWMutex
}

func NewRPCServer() *RPCServer {
    return &RPCServer{
        methods: make(map[string]func(params interface{}) (interface{}, error)),
    }
}

func (s *RPCServer) Register(name string, handler func(params interface{}) (interface{}, error)) {
    s.mu.Lock()
    s.methods[name] = handler
    s.mu.Unlock()
}

func (s *RPCServer) Handle(conn net.Conn) {
    defer conn.Close()

    for {
        // 读取长度
        var length uint32
        if err := binary.Read(conn, binary.BigEndian, &length); err != nil {
            if err == io.EOF {
                return
            }
            fmt.Printf("读取长度错误: %v\n", err)
            return
        }

        // 读取消息体
        body := make([]byte, length)
        if _, err := io.ReadFull(conn, body); err != nil {
            fmt.Printf("读取消息错误: %v\n", err)
            return
        }

        msgType := MsgType(body[0])
        msgID := binary.BigEndian.Uint32(body[1:5])

        switch msgType {
        case MsgRequest:
            var req RPCRequest
            if err := msgpack.Unmarshal(body[5:], &req); err != nil {
                s.sendError(conn, msgID, err.Error())
                continue
            }

            s.mu.RLock()
            handler, ok := s.methods[req.Method]
            s.mu.RUnlock()

            if !ok {
                s.sendError(conn, msgID, fmt.Sprintf("未知方法: %s", req.Method))
                continue
            }

            result, err := handler(req.Params)
            if err != nil {
                s.sendError(conn, msgID, err.Error())
                continue
            }

            s.sendResult(conn, msgID, result)
        }
    }
}

func (s *RPCServer) sendResult(conn net.Conn, msgID uint32, result interface{}) {
    resp := RPCResponse{Result: result}
    s.sendResponse(conn, msgID, resp)
}

func (s *RPCServer) sendError(conn net.Conn, msgID uint32, errMsg string) {
    resp := RPCResponse{Error: errMsg}
    s.sendResponse(conn, msgID, resp)
}

func (s *RPCServer) sendResponse(conn net.Conn, msgID uint32, resp RPCResponse) {
    body, _ := msgpack.Marshal(resp)
    header := make([]byte, 5)
    header[0] = byte(MsgResponse)
    binary.BigEndian.PutUint32(header[1:], msgID)

    lengthBytes := make([]byte, 4)
    binary.BigEndian.PutUint32(lengthBytes, uint32(5+len(body)))

    conn.Write(lengthBytes)
    conn.Write(header)
    conn.Write(body)
}

// RPCClient RPC 客户端
type RPCClient struct {
    conn    net.Conn
    pending map[uint32]chan RPCResponse
    mu      sync.RWMutex
    nextID  uint32
}

func NewRPCClient(addr string) (*RPCClient, error) {
    conn, err := net.Dial("tcp", addr)
    if err != nil {
        return nil, err
    }
    client := &RPCClient{
        conn:    conn,
        pending: make(map[uint32]chan RPCResponse),
    }
    go client.readLoop()
    return client, nil
}

func (c *RPCClient) Call(ctx context.Context, method string, params interface{}) (interface{}, error) {
    c.mu.Lock()
    c.nextID++
    msgID := c.nextID
    ch := make(chan RPCResponse, 1)
    c.pending[msgID] = ch
    c.mu.Unlock()

    // 发送请求
    req := RPCRequest{Method: method, Params: params}
    body, _ := msgpack.Marshal(req)
    
    header := make([]byte, 5)
    header[0] = byte(MsgRequest)
    binary.BigEndian.PutUint32(header[1:], msgID)

    lengthBytes := make([]byte, 4)
    binary.BigEndian.PutUint32(lengthBytes, uint32(5+len(body)))

    c.conn.Write(lengthBytes)
    c.conn.Write(header)
    c.conn.Write(body)

    // 等待响应
    select {
    case resp := <-ch:
        if resp.Error != "" {
            return nil, fmt.Errorf("RPC error: %s", resp.Error)
        }
        return resp.Result, nil
    case <-ctx.Done():
        return nil, ctx.Err()
    }
}

func (c *RPCClient) readLoop() {
    for {
        var length uint32
        if err := binary.Read(c.conn, binary.BigEndian, &length); err != nil {
            return
        }

        body := make([]byte, length)
        if _, err := io.ReadFull(c.conn, body); err != nil {
            return
        }

        msgID := binary.BigEndian.Uint32(body[1:5])
        var resp RPCResponse
        msgpack.Unmarshal(body[5:], &resp)

        c.mu.RLock()
        ch, ok := c.pending[msgID]
        c.mu.RUnlock()

        if ok {
            ch <- resp
            c.mu.Lock()
            delete(c.pending, msgID)
            c.mu.Unlock()
        }
    }
}

func (c *RPCClient) Close() error {
    return c.conn.Close()
}

📖 与消息队列集成 / Message Queue Integration

Redis Pub/Sub + MessagePack

import redis
import msgpack
import json

class MsgPackRedisPubSub:
    def __init__(self, redis_url="redis://localhost"):
        self.redis = redis.from_url(redis_url)
        self.pubsub = self.redis.pubsub()
    
    def publish(self, channel: str, data: dict):
        """发布 MessagePack 编码的消息"""
        packed = msgpack.packb(data, use_bin_type=True)
        self.redis.publish(channel, packed)
    
    def subscribe(self, *channels):
        """订阅频道"""
        self.pubsub.subscribe(*channels)
    
    def listen(self):
        """监听消息"""
        for message in self.pubsub.listen():
            if message["type"] == "message":
                data = msgpack.unpackb(message["data"], raw=False)
                yield message["channel"].decode(), data

# 发布者
pub = MsgPackRedisPubSub()
pub.publish("events", {"type": "user_login", "user_id": 123, "ts": 1700000000})

# 订阅者
sub = MsgPackRedisPubSub()
sub.subscribe("events")
for channel, event in sub.listen():
    print(f"[{channel}] {event}")

Kafka + MessagePack

from kafka import KafkaProducer, KafkaConsumer
import msgpack

# 生产者
class MsgPackKafkaProducer:
    def __init__(self, bootstrap_servers):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: msgpack.packb(v, use_bin_type=True),
            key_serializer=lambda k: k.encode("utf-8") if isinstance(k, str) else k,
        )
    
    def send(self, topic: str, value: dict, key: str = None):
        self.producer.send(topic, value=value, key=key)
        self.producer.flush()

# 消费者
class MsgPackKafkaConsumer:
    def __init__(self, bootstrap_servers, topic, group_id):
        self.consumer = KafkaConsumer(
            topic,
            bootstrap_servers=bootstrap_servers,
            group_id=group_id,
            value_deserializer=lambda v: msgpack.unpackb(v, raw=False),
            key_deserializer=lambda k: k.decode("utf-8") if k else None,
            auto_offset_reset="earliest",
        )
    
    def consume(self):
        for message in self.consumer:
            yield {
                "topic": message.topic,
                "partition": message.partition,
                "offset": message.offset,
                "key": message.key,
                "value": message.value,
            }

# 使用
producer = MsgPackKafkaProducer(["localhost:9092"])
producer.send("user-events", {"action": "click", "page": "/home"}, key="user:123")

consumer = MsgPackKafkaConsumer(["localhost:9092"], "user-events", "my-group")
for msg in consumer.consume():
    print(msg)

📖 缓冲区管理 / Buffer Management

环形缓冲区

import struct
import msgpack

class RingBuffer:
    """环形缓冲区,用于流式 MessagePack 解析"""
    
    def __init__(self, capacity: int = 65536):
        self.buffer = bytearray(capacity)
        self.capacity = capacity
        self.head = 0
        self.tail = 0
        self.size = 0
    
    def write(self, data: bytes):
        """写入数据"""
        if len(data) > self.capacity - self.size:
            raise BufferError("缓冲区已满")
        
        for byte in data:
            self.buffer[self.tail] = byte
            self.tail = (self.tail + 1) % self.capacity
            self.size += 1
    
    def read_message(self) -> dict | None:
        """尝试读取一条完整消息"""
        if self.size < 4:
            return None
        
        # 预读长度(不移动 head)
        length = self._peek_uint32()
        if self.size < 4 + length:
            return None
        
        # 读取 header
        header = self._read_bytes(4)
        
        # 读取 body
        body = self._read_bytes(length)
        
        return msgpack.unpackb(body, raw=False)
    
    def _peek_uint32(self) -> int:
        b = bytes([
            self.buffer[(self.head + i) % self.capacity] for i in range(4)
        ])
        return struct.unpack(">I", b)[0]
    
    def _read_bytes(self, n: int) -> bytes:
        result = bytearray(n)
        for i in range(n):
            result[i] = self.buffer[self.head]
            self.head = (self.head + 1) % self.capacity
            self.size -= 1
        return bytes(result)

自适应缓冲区

import io
import msgpack

class AdaptiveBuffer:
    """自适应缓冲区,根据数据量自动扩缩容"""
    
    MIN_SIZE = 1024        # 最小 1KB
    MAX_SIZE = 16 * 1024 * 1024  # 最大 16MB
    GROW_FACTOR = 1.5      # 增长因子
    SHRINK_THRESHOLD = 0.3 # 缩容阈值
    
    def __init__(self):
        self._buf = io.BytesIO()
        self._capacity = self.MIN_SIZE
        self._buf.truncate(self._capacity)
    
    def write_message(self, data: dict):
        """写入一条消息"""
        body = msgpack.packb(data, use_bin_type=True)
        header = len(body).to_bytes(4, 'big')
        
        new_size = self._buf.tell() + 4 + len(body)
        
        # 需要扩容
        if new_size > self._capacity:
            self._grow(new_size)
        
        self._buf.write(header)
        self._buf.write(body)
    
    def read_messages(self) -> list:
        """读取所有完整消息"""
        self._buf.seek(0)
        data = self._buf.read()
        
        messages = []
        offset = 0
        
        while offset + 4 <= len(data):
            length = int.from_bytes(data[offset:offset+4], 'big')
            if offset + 4 + length > len(data):
                break
            
            body = data[offset+4:offset+4+length]
            messages.append(msgpack.unpackb(body, raw=False))
            offset += 4 + length
        
        # 重置缓冲区
        remaining = data[offset:]
        self._buf = io.BytesIO()
        self._buf.write(remaining)
        
        # 检查是否需要缩容
        self._maybe_shrink()
        
        return messages
    
    def _grow(self, needed: int):
        while self._capacity < needed:
            self._capacity = int(self._capacity * self.GROW_FACTOR)
        self._capacity = min(self._capacity, self.MAX_SIZE)
    
    def _maybe_shrink(self):
        usage = self._buf.tell() / self._capacity
        if usage < self.SHRINK_THRESHOLD and self._capacity > self.MIN_SIZE:
            self._capacity = max(self.MIN_SIZE, self._capacity // 2)

💻 大数据流处理 / Large Data Streaming

分块传输大数组

import msgpack
import struct
import io

class ChunkedTransfer:
    """分块传输大数组"""
    
    CHUNK_SIZE = 1000  # 每块 1000 条
    
    @staticmethod
    def send_chunked(writer, items: list):
        """分块发送大数组"""
        total_chunks = (len(items) + ChunkedTransfer.CHUNK_SIZE - 1) // ChunkedTransfer.CHUNK_SIZE
        
        # 发送头部
        header = msgpack.packb({
            "type": "chunked_start",
            "total_items": len(items),
            "total_chunks": total_chunks,
        }, use_bin_type=True)
        LengthPrefixedProtocol._write(writer, header)
        
        # 分块发送
        for i in range(0, len(items), ChunkedTransfer.CHUNK_SIZE):
            chunk = items[i:i + ChunkedTransfer.CHUNK_SIZE]
            chunk_data = msgpack.packb({
                "type": "chunk",
                "index": i // ChunkedTransfer.CHUNK_SIZE,
                "data": chunk,
            }, use_bin_type=True)
            LengthPrefixedProtocol._write(writer, chunk_data)
        
        # 发送结束标记
        end = msgpack.packb({"type": "chunked_end"}, use_bin_type=True)
        LengthPrefixedProtocol._write(writer, end)
    
    @staticmethod
    def receive_chunked(reader) -> list:
        """接收分块数据"""
        items = []
        expected_chunks = 0
        chunks_received = 0
        
        while True:
            msg = LengthPrefixedProtocol._read(reader)
            if msg is None:
                break
            
            if msg["type"] == "chunked_start":
                expected_chunks = msg["total_chunks"]
            elif msg["type"] == "chunk":
                items.extend(msg["data"])
                chunks_received += 1
            elif msg["type"] == "chunked_end":
                break
            
            if chunks_received >= expected_chunks:
                break
        
        return items

⚠️ 注意事项 / Pitfalls

1. 消息边界丢失

❌ 错误: 直接使用 msgpack.unpackb 读取 TCP 流
问题: 多条消息粘包时,只能解析第一条,后续数据丢失

✅ 正确: 使用长度前缀或其他分帧协议

2. 缓冲区溢出

# ⚠️ 注意最大消息长度限制
MAX_MSG_SIZE = 16 * 1024 * 1024  # 16MB

def safe_read(reader, length):
    if length > MAX_MSG_SIZE:
        raise ValueError(f"消息过大: {length} bytes")
    return reader.read(length)

3. 字节序一致性

⚠️ 确保所有客户端和服务端使用相同的字节序
MessagePack 内部使用大端序,长度头也应使用大端序

4. 错误恢复

# 流中某条消息损坏时的恢复策略
def resilient_unpacker(stream):
    while True:
        try:
            yield read_message(stream)
        except msgpack.UnpackException:
            # 跳过损坏的消息,尝试重新同步
            sync_to_next_message(stream)

🔗 扩展阅读 / Further Reading

资源链接
TCP 粘包问题https://en.wikipedia.org/wiki/Transmission_Control_Protocol
MessagePack-RPC 规范https://github.com/msgpack-rpc/msgpack-rpc
gRPC 消息分帧https://grpc.io/docs/what-is-grpc/core-concepts/
WebSocket 协议https://tools.ietf.org/html/rfc6455

📝 下一章 / Next: 第 9 章 - Docker 中的使用 / Docker Usage — 在容器化环境中使用 MessagePack。