MessagePack 序列化完全指南 / 03 - Python 实践 / Python Implementation
Python 实践 / Python Implementation
本章介绍如何在 Python 中使用 msgpack 库进行序列化和反序列化,包括自定义类型、流式处理和性能优化。
This chapter covers using the msgpack library in Python for serialization and deserialization, including custom types, streaming, and performance optimization.
📖 库概览 / Library Overview
Python 的官方 MessagePack 库为 msgpack(PyPI 包名),底层由 C 扩展实现,性能优异。
| 属性 | 值 |
|---|---|
| 包名 | msgpack |
| PyPI | https://pypi.org/project/msgpack/ |
| GitHub | https://github.com/msgpack/msgpack-python |
| 最低 Python | 3.8+ |
| 核心特性 | C 扩展加速、流式解码、自定义类型钩子 |
安装
# 推荐方式
pip install msgpack
# 指定版本
pip install msgpack>=1.0.0
# 验证安装
python -c "import msgpack; print(msgpack.version)"
💻 基础序列化 / Basic Serialization
packb / unpackb
import msgpack
# ========== 序列化 ==========
data = {
"id": 1001,
"name": "Alice",
"scores": [95, 87, 92],
"active": True,
"address": None
}
# packb: 将 Python 对象序列化为 bytes
packed = msgpack.packb(data)
print(type(packed)) # <class 'bytes'>
print(len(packed)) # 43 bytes (vs JSON ~70 bytes)
print(packed.hex()) # 十六进制查看
# ========== 反序列化 ==========
# unpackb: 将 bytes 反序列化为 Python 对象
unpacked = msgpack.unpackb(packed)
print(unpacked)
# {b'id': 1001, b'name': b'Alice', ...}
⚠️ 关键陷阱: 默认情况下,字符串被解码为
bytes,映射键也被解码为bytes。这是最常见的问题。
正确处理字符串 (raw=False)
import msgpack
data = {"name": "Alice", "city": "北京"}
packed = msgpack.packb(data)
# ❌ 默认行为: 字符串变为 bytes
bad = msgpack.unpackb(packed)
print(bad) # {b'name': b'Alice', b'city': b'\xe5\x8c\x97\xe4\xba\xac'}
# ✅ 正确做法: 使用 raw=False
good = msgpack.unpackb(packed, raw=False)
print(good) # {'name': 'Alice', 'city': '北京'}
📝 建议: 在所有
unpackb调用中始终加上raw=False。
pack / unpack(流式)
import msgpack
import io
data = {"hello": "world"}
# pack: 写入文件对象
buf = io.BytesIO()
msgpack.pack(data, buf)
# unpack: 从文件对象读取
buf.seek(0)
result = msgpack.unpack(buf, raw=False)
print(result) # {'hello': 'world'}
💻 类型映射 / Type Mapping
Python → MessagePack
| Python 类型 | MessagePack 类型 | 说明 |
|---|---|---|
None | nil | 空值 |
bool | boolean | true/false |
int | int/uint | 自动选择最小编码 |
float | float64 | 双精度浮点 |
str | str | UTF-8 字符串 |
bytes | bin | 原始字节 |
bytearray | bin | 原始字节 |
list | array | 数组 |
tuple | array | 元组也序列化为数组 |
dict | map | 映射 |
MessagePack → Python (raw=False)
| MessagePack 类型 | Python 类型 |
|---|---|
| nil | None |
| boolean | bool |
| int/uint | int |
| float32/64 | float |
| str | str |
| bin | bytes |
| array | list |
| map | dict |
| ext | msgpack.ExtType |
处理 bytes vs str
import msgpack
# 当数据中同时包含文本和二进制时
data = {
"name": "Alice", # 应为 str
"avatar": b"\x89PNG...", # 应为 bytes
}
packed = msgpack.packb(data)
# raw=False 时,str 解码为 str,bin 解码为 bytes
result = msgpack.unpackb(packed, raw=False)
assert isinstance(result["name"], str)
assert isinstance(result["avatar"], bytes)
💻 自定义类型序列化 / Custom Type Serialization
使用 default 钩子
当 MessagePack 不认识某个类型时,会调用 default 函数:
import msgpack
from datetime import datetime, date
from decimal import Decimal
from uuid import UUID
def custom_encoder(obj):
"""自定义类型编码器"""
if isinstance(obj, datetime):
return {"__type__": "datetime", "value": obj.isoformat()}
elif isinstance(obj, date):
return {"__type__": "date", "value": obj.isoformat()}
elif isinstance(obj, Decimal):
return {"__type__": "decimal", "value": str(obj)}
elif isinstance(obj, UUID):
return {"__type__": "uuid", "value": str(obj)}
elif isinstance(obj, set):
return {"__type__": "set", "value": list(obj)}
elif isinstance(obj, complex):
return {"__type__": "complex", "real": obj.real, "imag": obj.imag}
raise TypeError(f"Unknown type: {type(obj)}")
# 使用
data = {
"created": datetime(2024, 1, 15, 10, 30, 0),
"price": Decimal("99.99"),
"tags": {"python", "msgpack"},
}
packed = msgpack.packb(data, default=custom_encoder, use_bin_type=True)
result = msgpack.unpackb(packed, raw=False)
print(result)
# {'created': {'__type__': 'datetime', 'value': '2024-01-15T10:30:00'},
# 'price': {'__type__': 'decimal', 'value': '99.99'},
# 'tags': {'__type__': 'set', 'value': ['python', 'msgpack']}}
使用 object_hook 钩子
import msgpack
from datetime import datetime
from decimal import Decimal
from uuid import UUID
def custom_decoder(obj):
"""自定义类型解码器"""
if isinstance(obj, dict) and "__type__" in obj:
type_name = obj["__type__"]
if type_name == "datetime":
return datetime.fromisoformat(obj["value"])
elif type_name == "date":
return datetime.fromisoformat(obj["value"]).date()
elif type_name == "decimal":
return Decimal(obj["value"])
elif type_name == "uuid":
return UUID(obj["value"])
elif type_name == "set":
return set(obj["value"])
elif type_name == "complex":
return complex(obj["real"], obj["imag"])
return obj
# 使用
result = msgpack.unpackb(packed, raw=False, object_hook=custom_decoder)
print(type(result["created"])) # <class 'datetime.datetime'>
print(type(result["price"])) # <class 'decimal.Decimal'>
print(type(result["tags"])) # <class 'set'>
使用 ExtType(推荐用于性能敏感场景)
ExtType 是 MessagePack 原生的扩展类型,比 JSON 风格的 __type__ 更紧凑:
import msgpack
from datetime import datetime
import struct
# ========== 编码 ==========
def encode_datetime(obj):
if isinstance(obj, datetime):
# 使用 MessagePack timestamp 扩展 (type=-1)
ts = int(obj.timestamp())
nsec = obj.microsecond * 1000
if nsec == 0:
data = struct.pack(">I", ts & 0x3FFFFFFFF)
else:
data = struct.pack(">QI", ts, nsec)
return msgpack.ExtType(-1, data)
raise TypeError(f"Unknown type: {type(obj)}")
# ========== 解码 ==========
def decode_datetime(code, data):
if code == -1: # timestamp
if len(data) == 4:
ts = struct.unpack(">I", data)[0]
elif len(data) == 8:
value = struct.unpack(">Q", data)[0]
ts = value & 0x3FFFFFFFF
nsec = value >> 34
return datetime.fromtimestamp(ts).replace(microsecond=nsec // 1000)
elif len(data) == 12:
ts, nsec = struct.unpack(">QI", data)
return datetime.fromtimestamp(ts).replace(microsecond=nsec // 1000)
return datetime.fromtimestamp(ts)
return msgpack.ExtType(code, data)
# ========== 使用 ==========
data = {"event": "login", "time": datetime(2024, 6, 15, 14, 30, 0)}
packed = msgpack.packb(data, default=encode_datetime, use_bin_type=True)
result = msgpack.unpackb(packed, raw=False, ext_hook=decode_datetime)
print(result)
# {'event': 'login', 'time': datetime.datetime(2024, 6, 15, 14, 30)}
print(f"ExtType 编码大小: {len(packed)} bytes")
# 比 __type__ 包装方式更紧凑
💻 流式处理 / Streaming
批量打包多个消息
import msgpack
import io
def pack_multiple(messages):
"""将多个消息打包到一个 buffer"""
buf = io.BytesIO()
for msg in messages:
msgpack.pack(msg, buf)
return buf.getvalue()
def unpack_multiple(data, raw=False):
"""从 buffer 中解包多个消息"""
buf = io.BytesIO(data)
unpacker = msgpack.Unpacker(buf, raw=raw)
return list(unpacker)
# 使用
messages = [
{"type": "greeting", "data": "hello"},
{"type": "data", "values": [1, 2, 3]},
{"type": "farewell", "data": "bye"},
]
packed = pack_multiple(messages)
results = unpack_multiple(packed)
print(f"打包了 {len(results)} 条消息")
for msg in results:
print(f" {msg}")
使用 Unpacker 处理流数据
import msgpack
import socket
def handle_connection(conn):
"""处理网络连接的流式解包"""
# Unpacker 可以从流中自动提取消息
unpacker = msgpack.Unpacker(conn, raw=False, max_buffer_size=1024*1024)
for msg in unpacker:
# 每次循环处理一条完整消息
process_message(msg)
def process_message(msg):
print(f"收到: {msg}")
# 模拟网络场景
# handle_connection(socket_connection)
自定义流式处理(带长度前缀)
import msgpack
import struct
import io
def pack_with_length(data):
"""添加长度前缀的消息打包"""
packed = msgpack.packb(data, use_bin_type=True)
# 4 字节大端长度前缀
length = struct.pack(">I", len(packed))
return length + packed
def unpack_stream(stream):
"""从流中解包带长度前缀的消息"""
while True:
# 读取 4 字节长度头
header = stream.read(4)
if len(header) < 4:
break
length = struct.unpack(">I", header)[0]
# 读取消息体
data = stream.read(length)
if len(data) < length:
break
yield msgpack.unpackb(data, raw=False)
# 使用
buf = io.BytesIO()
buf.write(pack_with_length({"msg": "hello"}))
buf.write(pack_with_length({"msg": "world"}))
buf.seek(0)
for msg in unpack_stream(buf):
print(msg)
# {'msg': 'hello'}
# {'msg': 'world'}
💻 性能优化 / Performance Optimization
选择合适的参数
import msgpack
data = {"count": 42, "name": "test"}
# 基准测试
import timeit
# 1. use_bin_type=True (推荐,区分 str 和 bytes)
t1 = timeit.timeit(
lambda: msgpack.unpackb(msgpack.packb(data, use_bin_type=True), raw=False),
number=100000
)
print(f"use_bin_type=True: {t1:.3f}s")
# 2. use_single_float=True (浮点数用 float32,节省空间)
float_data = {"pi": 3.14159, "e": 2.71828}
t2 = timeit.timeit(
lambda: msgpack.unpackb(msgpack.packb(float_data, use_single_float=True), raw=False),
number=100000
)
print(f"use_single_float: {t2:.3f}s")
# 3. 禁用自动类型推断(严格模式)
t3 = timeit.timeit(
lambda: msgpack.packb(42),
number=1000000
)
print(f"简单整数序列化: {t3:.3f}s")
大数据处理优化
import msgpack
import io
def efficient_batch_process(data_list, batch_size=1000):
"""高效批量处理"""
results = []
buf = io.BytesIO()
for i, item in enumerate(data_list):
msgpack.pack(item, buf)
if (i + 1) % batch_size == 0:
# 每批处理一次
buf.seek(0)
unpacker = msgpack.Unpacker(buf, raw=False)
for msg in unpacker:
results.append(msg)
buf = io.BytesIO()
# 处理剩余
buf.seek(0)
unpacker = msgpack.Unpacker(buf, raw=False)
for msg in unpacker:
results.append(msg)
return results
# 生成测试数据
data = [{"id": i, "value": f"item_{i}"} for i in range(10000)]
results = efficient_batch_process(data)
print(f"处理了 {len(results)} 条记录")
内存优化:使用 Unpacker 避免一次性加载
import msgpack
# ❌ 不好: 一次性加载全部数据
def bad_approach(huge_data):
return msgpack.unpackb(huge_data, raw=False)
# ✅ 好: 流式处理,逐条读取
def good_approach(huge_data):
results = []
unpacker = msgpack.Unpacker(io.BytesIO(huge_data), raw=False)
for item in unpacker:
process_item(item) # 逐条处理,不全部加载到内存
return results
💻 与常见框架集成 / Framework Integration
与 Redis 集成
import msgpack
import redis
class MsgPackRedis:
def __init__(self, redis_client):
self.redis = redis_client
def set(self, key, value, ex=None):
"""使用 MessagePack 序列化存储"""
packed = msgpack.packb(value, use_bin_type=True, default=str)
self.redis.set(key, packed, ex=ex)
def get(self, key):
"""读取并反序列化"""
data = self.redis.get(key)
if data is None:
return None
return msgpack.unpackb(data, raw=False)
# 使用
r = redis.Redis()
cache = MsgPackRedis(r)
cache.set("user:1001", {
"name": "Alice",
"roles": ["admin", "editor"],
"login_count": 42
})
user = cache.get("user:1001")
print(user) # {'name': 'Alice', 'roles': ['admin', 'editor'], 'login_count': 42}
与 FastAPI 集成
from fastapi import FastAPI, Request, Response
import msgpack
app = FastAPI()
class MsgPackResponse(Response):
"""MessagePack 响应类"""
media_type = "application/x-msgpack"
def render(self, content) -> bytes:
return msgpack.packb(content, use_bin_type=True)
@app.get("/api/users/{user_id}")
async def get_user(user_id: int):
user = {"id": user_id, "name": "Alice", "scores": [95, 87, 92]}
return MsgPackResponse(content=user)
@app.post("/api/users")
async def create_user(request: Request):
body = await request.body()
data = msgpack.unpackb(body, raw=False)
# 处理 data...
return MsgPackResponse(content={"status": "ok", "user": data})
⚠️ 注意事项 / Pitfalls
1. 始终设置 raw=False
# ❌ 常见错误
data = msgpack.unpackb(packed) # 字符串变成 bytes!
data = msgpack.unpackb(packed, raw=False) # ✅ 正确
2. use_bin_type=True
# ❌ 默认行为: str 和 bytes 都编码为同一个类型
msgpack.packb("hello") # str → fixstr
msgpack.packb(b"hello") # bytes → fixstr (混淆!)
# ✅ 明确区分
msgpack.packb("hello", use_bin_type=True) # str → str
msgpack.packb(b"hello", use_bin_type=True) # bytes → bin
3. max_buffer_size 限制
# 处理大文件时,注意缓冲区限制
unpacker = msgpack.Unpacker(
file_obj,
raw=False,
max_buffer_size=1024 * 1024 * 100 # 100MB
)
4. 元组会变成列表
import msgpack
original = (1, 2, 3)
packed = msgpack.packb(original)
result = msgpack.unpackb(packed)
print(type(result)) # <class 'list'> — 元组丢失了!
5. dict 键类型限制
import msgpack
# ❌ 非字符串键
data = {1: "one", 2: "two"}
packed = msgpack.packb(data)
result = msgpack.unpackb(packed, raw=False)
print(result) # {1: 'one', 2: 'two'} — 整数键保留,但不推荐
# ✅ 推荐: 使用字符串键
data = {"1": "one", "2": "two"}
🔗 扩展阅读 / Further Reading
📝 下一章 / Next: 第 4 章 - JavaScript 实践 / JavaScript Implementation — 在浏览器和 Node.js 中使用 MessagePack。