MySQL 传输协议精讲 / 07 - 二进制协议
第 07 章:二进制协议
7.1 二进制协议概述
二进制协议(Binary Protocol)用于预处理语句(Prepared Statement)的参数绑定和结果集传输。与文本协议不同,二进制协议使用原生二进制格式表示数据值,无需字符串转换。
文本协议 vs 二进制协议
| 特性 | 文本协议 | 二进制协议 |
|---|---|---|
| 使用命令 | COM_QUERY | COM_STMT_EXECUTE |
| 值格式 | 字符串 | 原生二进制 |
| NULL 标识 | 0xFB 长度编码 | null_bitmap 位图 |
| 参数传递 | SQL 字符串拼接 | 占位符 + 绑定值 |
| SQL 注入 | 需要转义 | 天然免疫 |
| 类型转换 | 服务端转文本 | 客户端按类型解析 |
| 网络传输 | 较大(字符串) | 较小(二进制) |
| CPU 开销 | 较高 | 较低 |
7.2 二进制参数类型
MySQL 二进制协议中的参数类型 ID 与 Column Definition 中的类型 ID 相同:
参数类型编码
| 类型 ID | 名称 | 字节大小 | Python 对应 |
|---|---|---|---|
| 0x00 | DECIMAL | 变长 | Decimal |
| 0x01 | TINYINT | 1 | int |
| 0x02 | SMALLINT | 2 | int |
| 0x03 | INT | 4 | int |
| 0x04 | FLOAT | 4 | float |
| 0x05 | DOUBLE | 8 | float |
| 0x07 | TIMESTAMP | 变长 | datetime |
| 0x08 | BIGINT | 8 | int |
| 0x0A | DATE | 变长 | date |
| 0x0B | TIME | 变长 | time |
| 0x0C | DATETIME | 变长 | datetime |
| 0x0F | VARCHAR | 变长 | str |
| 0x10 | BIT | 变长 | bytes |
| 0xF5 | JSON | 变长 | str/dict |
| 0xF6 | NEWDECIMAL | 变长 | Decimal |
| 0xFC | BLOB | 变长 | bytes |
| 0xFD | VAR_STRING | 变长 | str |
| 0xFE | STRING | 变长 | str/bytes |
7.3 参数绑定格式
COM_STMT_EXECUTE 中的参数编码
1. 命令头 (10 字节)
- 0x17 (COM_STMT_EXECUTE)
- statement_id (4 字节)
- flags (1 字节)
- iteration_count (4 字节, 固定为 1)
2. null_bitmap (变长)
- 每个参数占 1 bit
- 大小 = (num_params + 7) / 8
- bit = 1 表示该参数为 NULL
3. new_params_bound_flag (1 字节)
- 0x00: 不发送新参数类型(使用上次的)
- 0x01: 发送新参数类型
4. 参数类型数组 (仅当 new_params_bound_flag = 0x01)
- 每个参数 2 字节: [type_id (1字节)] [unsigned_flag (1字节)]
5. 参数值数组
- 每个参数按其类型编码
null_bitmap 的计算
def build_null_bitmap(params: list, num_params: int) -> bytes:
"""构建 null bitmap"""
bitmap_size = (num_params + 7) // 8
bitmap = bytearray(bitmap_size)
for i, param in enumerate(params):
if param is None:
bitmap[i // 8] |= (1 << (i % 8))
return bytes(bitmap)
各类型参数值的编码
整数类型
import struct
def encode_tinyint(value: int) -> bytes:
"""TINYINT (1 字节)"""
return struct.pack('<b', value)
def encode_smallint(value: int) -> bytes:
"""SMALLINT (2 字节)"""
return struct.pack('<h', value)
def encode_int(value: int) -> bytes:
"""INT (4 字节)"""
return struct.pack('<i', value)
def encode_bigint(value: int) -> bytes:
"""BIGINT (8 字节)"""
return struct.pack('<q', value)
浮点类型
def encode_float(value: float) -> bytes:
"""FLOAT (4 字节)"""
return struct.pack('<f', value)
def encode_double(value: float) -> bytes:
"""DOUBLE (8 字节)"""
return struct.pack('<d', value)
字符串类型
def encode_string(value: str) -> bytes:
"""VARCHAR/TEXT: 长度编码 + 字节数据"""
encoded = value.encode('utf-8')
return encode_length(len(encoded)) + encoded
def encode_length(length: int) -> bytes:
"""长度编码"""
if length < 0xFB:
return struct.pack('B', length)
elif length < 0x10000:
return b'\xFC' + struct.pack('<H', length)
elif length < 0x1000000:
return b'\xFD' + struct.pack('<I', length)[:3]
else:
return b'\xFE' + struct.pack('<Q', length)
日期时间类型
def encode_date(year: int, month: int, day: int) -> bytes:
"""DATE: 长度 + 年(2) + 月(1) + 日(1)"""
return struct.pack('BBBB', 4, 0, 0, 0) + struct.pack('<HBB', year, month, day)
# 实际: length=4, year(2), month(1), day(1) = 总共 5 字节
def encode_datetime(year, month, day, hour=0, minute=0, second=0, microsecond=0) -> bytes:
"""DATETIME/TIMESTAMP"""
if microsecond:
length = 11 # 年月日时分秒微秒
return (struct.pack('B', length) +
struct.pack('<HBBBBI', year, month, day, hour, minute, second) +
struct.pack('<I', microsecond))
elif second:
length = 7 # 年月日时分秒
return (struct.pack('B', length) +
struct.pack('<HBBBBB', year, month, day, hour, minute, second))
else:
length = 4 # 年月日
return (struct.pack('B', length) +
struct.pack('<HBB', year, month, day))
def encode_time(negative=False, hours=0, minutes=0, seconds=0, microseconds=0) -> bytes:
"""TIME"""
if microseconds:
length = 12
return (struct.pack('B', length) +
struct.pack('<BiBBBBBI', 1 if negative else 0, 0, 0, 0, hours, minutes, seconds, microseconds))
elif seconds:
length = 8
return (struct.pack('B', length) +
struct.pack('<BiBBBBB', 1 if negative else 0, 0, 0, 0, hours, minutes, seconds))
else:
length = 0
return struct.pack('B', length)
7.4 完整的参数绑定示例
"""
mysql_binary_params.py
二进制协议参数绑定完整实现
"""
import struct
# 参数类型常量
MYSQL_TYPE_DECIMAL = 0x00
MYSQL_TYPE_TINY = 0x01
MYSQL_TYPE_SHORT = 0x02
MYSQL_TYPE_LONG = 0x03
MYSQL_TYPE_FLOAT = 0x04
MYSQL_TYPE_DOUBLE = 0x05
MYSQL_TYPE_NULL = 0x06
MYSQL_TYPE_TIMESTAMP = 0x07
MYSQL_TYPE_LONGLONG = 0x08
MYSQL_TYPE_DATE = 0x0A
MYSQL_TYPE_TIME = 0x0B
MYSQL_TYPE_DATETIME = 0x0C
MYSQL_TYPE_VARCHAR = 0x0F
MYSQL_TYPE_BIT = 0x10
MYSQL_TYPE_JSON = 0xF5
MYSQL_TYPE_NEWDECIMAL = 0xF6
MYSQL_TYPE_BLOB = 0xFC
MYSQL_TYPE_VAR_STRING = 0xFD
MYSQL_TYPE_STRING = 0xFE
# Python 类型到 MySQL 类型的映射
PYTHON_TYPE_MAP = {
type(None): (MYSQL_TYPE_NULL, 0),
bool: (MYSQL_TYPE_TINY, 0),
int: (MYSQL_TYPE_LONGLONG, 0),
float: (MYSQL_TYPE_DOUBLE, 0),
str: (MYSQL_TYPE_VAR_STRING, 0),
bytes: (MYSQL_TYPE_BLOB, 0),
bytearray: (MYSQL_TYPE_BLOB, 0),
}
def infer_param_type(value) -> tuple:
"""推断参数的 MySQL 类型"""
if value is None:
return (MYSQL_TYPE_NULL, 0)
for py_type, mysql_type in PYTHON_TYPE_MAP.items():
if isinstance(value, py_type):
return mysql_type
return (MYSQL_TYPE_VAR_STRING, 0)
def encode_param_value(value) -> bytes:
"""编码参数值为二进制格式"""
if value is None:
return b''
if isinstance(value, bool):
return struct.pack('<b', 1 if value else 0)
if isinstance(value, int):
if -128 <= value <= 127:
return struct.pack('<b', value)
elif -32768 <= value <= 32767:
return struct.pack('<h', value)
elif -2147483648 <= value <= 2147483647:
return struct.pack('<i', value)
else:
return struct.pack('<q', value)
if isinstance(value, float):
return struct.pack('<d', value)
if isinstance(value, str):
encoded = value.encode('utf-8')
return encode_length_coded(len(encoded)) + encoded
if isinstance(value, (bytes, bytearray)):
return encode_length_coded(len(value)) + bytes(value)
raise TypeError(f"不支持的参数类型: {type(value)}")
def encode_length_coded(length: int) -> bytes:
"""长度编码"""
if length < 0xFB:
return struct.pack('B', length)
elif length < 0x10000:
return b'\xFC' + struct.pack('<H', length)
elif length < 0x1000000:
return b'\xFD' + struct.pack('<I', length)[:3]
else:
return b'\xFE' + struct.pack('<Q', length)
def build_execute_packet(statement_id: int, params: list, cursor_type: int = 0) -> bytes:
"""
构建 COM_STMT_EXECUTE 数据包
参数:
statement_id: 预处理语句 ID
params: 参数值列表
cursor_type: 游标类型 (0=无游标, 1=READ_ONLY, 2=FOR_UPDATE, 4=SCROLLABLE)
返回:
完整的数据包(含包头)
"""
num_params = len(params)
# === 构建有效载荷 ===
payload = bytearray()
# 命令类型
payload.append(0x17) # COM_STMT_EXECUTE
# statement_id
payload.extend(struct.pack('<I', statement_id))
# flags (cursor type)
payload.append(cursor_type & 0xFF)
# iteration_count (固定为 1)
payload.extend(struct.pack('<I', 1))
# null_bitmap
bitmap_size = (num_params + 7) // 8
null_bitmap = bytearray(bitmap_size)
for i, param in enumerate(params):
if param is None:
null_bitmap[i // 8] |= (1 << (i % 8))
payload.extend(null_bitmap)
# new_params_bound_flag
if num_params > 0:
payload.append(0x01) # 发送新参数类型
# 参数类型数组
for param in params:
type_id, unsigned_flag = infer_param_type(param)
payload.append(type_id)
payload.append(unsigned_flag)
# 参数值数组
for param in params:
if param is not None:
payload.extend(encode_param_value(param))
else:
payload.append(0x00) # 无参数
# === 添加包头 ===
packet = struct.pack('<I', len(payload))[:3] + struct.pack('B', 0) + bytes(payload)
return packet
def demo():
"""演示二进制参数绑定"""
print("=" * 60)
print("MySQL 二进制协议参数绑定演示")
print("=" * 60)
# 模拟语句: INSERT INTO users (name, age, balance, active) VALUES (?, ?, ?, ?)
statement_id = 1
params = ["Alice", 28, 9999.50, True]
packet = build_execute_packet(statement_id, params)
print(f"\n语句 ID: {statement_id}")
print(f"参数: {params}")
print(f"数据包大小: {len(packet)} 字节")
print(f"数据包头: {packet[:4].hex()}")
print(f"命令: 0x{packet[4]:02X} (COM_STMT_EXECUTE)")
print(f"完整 payload: {packet[4:].hex()}")
print()
# NULL 参数
print("--- 含 NULL 参数 ---")
params_with_null = ["Bob", None, 100.00, None]
packet2 = build_execute_packet(statement_id, params_with_null)
print(f"参数: {params_with_null}")
print(f"数据包大小: {len(packet2)} 字节")
# 解析 null_bitmap
bitmap = packet2[14:14 + 1] # 4 参数 → 1 字节 bitmap
print(f"null_bitmap: 0x{bitmap[0]:02X}")
for i in range(4):
is_null = (bitmap[0] >> i) & 1
print(f" 参数 {i} ({params_with_null[i]}): {'NULL' if is_null else '非 NULL'}")
if __name__ == '__main__':
demo()
7.5 二进制结果集
当使用 COM_STMT_EXECUTE 执行预处理语句时,结果集使用二进制格式返回。
与文本结果集的区别
| 部分 | 文本结果集 | 二进制结果集 |
|---|---|---|
| 列定义 | 相同格式 | 相同格式 |
| 行数据 | 长度编码字符串 | null_bitmap + 二进制值 |
| NULL 表示 | 0xFB | null_bitmap 中的 bit |
| 整数值 | 字符串 “123” | 4 字节小端序整数 |
二进制行数据格式
1. 前导字节 (1 字节)
固定为 0x00
2. null_bitmap (变长)
大小 = (column_count + 7 + 2) / 8
前 2 bit 保留(始终为 0),后续每 bit 对应一列
bit = 1 表示该列为 NULL
3. 列值数据
每列按其类型以二进制格式编码
NULL 列不占用空间(由 bitmap 标识)
Python 二进制结果集解析
"""
mysql_binary_resultset.py
解析二进制结果集
"""
import struct
from datetime import datetime, date, time, timedelta
from decimal import Decimal
from typing import List, Any
# Column type 定义 (同 Column Definition)
TYPE_DECIMAL = 0x00
TYPE_TINY = 0x01
TYPE_SHORT = 0x02
TYPE_LONG = 0x03
TYPE_FLOAT = 0x04
TYPE_DOUBLE = 0x05
TYPE_NULL = 0x06
TYPE_TIMESTAMP = 0x07
TYPE_LONGLONG = 0x08
TYPE_INT24 = 0x09
TYPE_DATE = 0x0A
TYPE_TIME = 0x0B
TYPE_DATETIME = 0x0C
TYPE_YEAR = 0x0D
TYPE_VARCHAR = 0x0F
TYPE_BIT = 0x10
TYPE_JSON = 0xF5
TYPE_NEWDECIMAL = 0xF6
TYPE_ENUM = 0xF7
TYPE_SET = 0xF8
TYPE_TINY_BLOB = 0xF9
TYPE_MEDIUM_BLOB= 0xFA
TYPE_LONG_BLOB = 0xFB
TYPE_BLOB = 0xFC
TYPE_VAR_STRING = 0xFD
TYPE_STRING = 0xFE
TYPE_GEOMETRY = 0xFF
def read_length_encoded_bytes(data: bytes, offset: int) -> tuple:
"""读取长度编码的字节序列"""
length, new_offset = read_length_encoded_int(data, offset)
if length is None:
return None, offset + 1
return data[new_offset:new_offset + length], new_offset + length
def read_length_encoded_int(data: bytes, offset: int) -> tuple:
first = data[offset]
if first < 0xFB:
return first, offset + 1
elif first == 0xFC:
return struct.unpack('<H', data[offset+1:offset+3])[0], offset + 3
elif first == 0xFD:
return struct.unpack('<I', data[offset+1:offset+4] + b'\x00')[0], offset + 4
elif first == 0xFE:
return struct.unpack('<Q', data[offset+1:offset+9])[0], offset + 9
def decode_binary_column(data: bytes, offset: int, column_type: int, flags: int = 0, decimals: int = 0) -> tuple:
"""
解码二进制结果集中的一列
返回: (值, 新偏移量)
"""
if column_type == TYPE_TINY:
if flags & 0x0020: # UNSIGNED
return data[offset], offset + 1
return struct.unpack('<b', data[offset:offset+1])[0], offset + 1
elif column_type == TYPE_SHORT:
if flags & 0x0020:
return struct.unpack('<H', data[offset:offset+2])[0], offset + 2
return struct.unpack('<h', data[offset:offset+2])[0], offset + 2
elif column_type in (TYPE_LONG, TYPE_INT24):
if flags & 0x0020:
return struct.unpack('<I', data[offset:offset+4])[0], offset + 4
return struct.unpack('<i', data[offset:offset+4])[0], offset + 4
elif column_type == TYPE_LONGLONG:
if flags & 0x0020:
return struct.unpack('<Q', data[offset:offset+8])[0], offset + 8
return struct.unpack('<q', data[offset:offset+8])[0], offset + 8
elif column_type == TYPE_FLOAT:
return struct.unpack('<f', data[offset:offset+4])[0], offset + 4
elif column_type == TYPE_DOUBLE:
return struct.unpack('<d', data[offset:offset+8])[0], offset + 8
elif column_type == TYPE_YEAR:
return struct.unpack('<H', data[offset:offset+2])[0], offset + 2
elif column_type in (TYPE_DATE, TYPE_DATETIME, TYPE_TIMESTAMP):
return decode_binary_temporal(data, offset)
elif column_type == TYPE_TIME:
return decode_binary_time(data, offset)
elif column_type == TYPE_NEWDECIMAL:
return decode_binary_decimal(data, offset, decimals)
elif column_type in (TYPE_VARCHAR, TYPE_STRING, TYPE_ENUM, TYPE_SET):
raw, new_offset = read_length_encoded_bytes(data, offset)
if raw is not None:
return raw.decode('utf-8'), new_offset
return None, new_offset
elif column_type in (TYPE_BLOB, TYPE_TINY_BLOB, TYPE_MEDIUM_BLOB,
TYPE_LONG_BLOB, TYPE_JSON, TYPE_BIT, TYPE_GEOMETRY):
raw, new_offset = read_length_encoded_bytes(data, offset)
return raw, new_offset
else:
# 默认当作长度编码字节
raw, new_offset = read_length_encoded_bytes(data, offset)
return raw, new_offset
def decode_binary_temporal(data: bytes, offset: int) -> tuple:
"""解码 DATE/DATETIME/TIMESTAMP 二进制值"""
length = data[offset]
offset += 1
if length == 0:
return None, offset
year = struct.unpack('<H', data[offset:offset+2])[0]
month = data[offset + 2]
day = data[offset + 3]
offset += 4
if length == 4:
return date(year, month, day), offset
hour = data[offset]
minute = data[offset + 1]
second = data[offset + 2]
offset += 3
if length == 7:
return datetime(year, month, day, hour, minute, second), offset
microsecond = struct.unpack('<I', data[offset:offset+4])[0]
offset += 4
return datetime(year, month, day, hour, minute, second, microsecond), offset
def decode_binary_time(data: bytes, offset: int) -> tuple:
"""解码 TIME 二进制值"""
length = data[offset]
offset += 1
if length == 0:
return timedelta(0), offset
is_negative = data[offset]
offset += 1
days = struct.unpack('<I', data[offset:offset+4])[0]
offset += 4
hours = data[offset]
minutes = data[offset + 1]
seconds = data[offset + 2]
offset += 3
total_seconds = days * 86400 + hours * 3600 + minutes * 60 + seconds
if length > 8:
microseconds = struct.unpack('<I', data[offset:offset+4])[0]
total_seconds += microseconds / 1000000
offset += 4
td = timedelta(seconds=total_seconds)
if is_negative:
td = -td
return td, offset
def decode_binary_decimal(data: bytes, offset: int, decimals: int) -> tuple:
"""解码 NEWDECIMAL 二进制值"""
raw, new_offset = read_length_encoded_bytes(data, offset)
if raw is None:
return None, new_offset
# 简化处理:直接转字符串
return Decimal(raw.decode('utf-8')), new_offset
def parse_binary_row(data: bytes, columns: list) -> List[Any]:
"""
解析二进制结果集中的一行
参数:
data: 行数据 payload
columns: 列定义列表 [(name, type_id, flags, decimals), ...]
返回:
值列表
"""
offset = 0
# 前导字节 (0x00)
offset += 1
# null_bitmap
num_cols = len(columns)
bitmap_size = (num_cols + 7 + 2) // 8 # +2 for reserved bits
null_bitmap = data[offset:offset + bitmap_size]
offset += bitmap_size
# 解析每列
row = []
for i, (name, type_id, flags, decimals) in enumerate(columns):
# 检查 null bit (注意: 前 2 bit 保留, 列从 bit 2 开始)
bit_index = i + 2
is_null = (null_bitmap[bit_index // 8] >> (bit_index % 8)) & 1
if is_null:
row.append(None)
else:
value, offset = decode_binary_column(data, offset, type_id, flags, decimals)
row.append(value)
return row
def demo():
"""演示二进制结果集解析"""
print("=" * 60)
print("MySQL 二进制结果集解析演示")
print("=" * 60)
# 模拟列定义
columns = [
("id", TYPE_LONGLONG, 0x0001 | 0x0002, 0), # NOT NULL, PRI
("name", TYPE_VAR_STRING, 0, 0),
("balance", TYPE_NEWDECIMAL, 0x0020, 2), # UNSIGNED
("active", TYPE_TINY, 0, 0),
]
# 构造模拟的二进制行数据
row_data = bytearray()
# 前导字节
row_data.append(0x00)
# null_bitmap: 4 列 → (4+2+7)/8 = 1 字节, 所有非 NULL → 0x00
row_data.append(0x00)
# id (BIGINT, 8 字节)
row_data.extend(struct.pack('<q', 42))
# name (VAR_STRING, 长度编码)
name = "Alice".encode('utf-8')
row_data.append(len(name))
row_data.extend(name)
# balance (NEWDECIMAL)
balance = "12345.67".encode('utf-8')
row_data.append(len(balance))
row_data.extend(balance)
# active (TINYINT)
row_data.append(1)
# 解析
row = parse_binary_row(bytes(row_data), columns)
print("\n列定义:")
for name, type_id, flags, dec in columns:
print(f" {name}: type=0x{type_id:02X}, flags=0x{flags:04X}")
print("\n原始数据:")
print(f" {bytes(row_data).hex()}")
print("\n解析结果:")
for i, (col_def, value) in enumerate(zip(columns, row)):
name = col_def[0]
print(f" {name} = {value!r} (type: {type(value).__name__})")
if __name__ == '__main__':
demo()
7.6 二进制与文本结果集的类型对照
| MySQL 类型 | 文本协议格式 | 二进制协议格式 |
|---|---|---|
| TINYINT | “127” | 1 字节: 0x7F |
| SMALLINT | “1234” | 2 字节: 0xD204 |
| INT | “123456” | 4 字节: 0x40E20100 |
| BIGINT | “123456789” | 8 字节 |
| FLOAT | “3.14” | 4 字节 IEEE 754 |
| DOUBLE | “3.14159265” | 8 字节 IEEE 754 |
| DECIMAL | “1234.56” | 长度编码字符串 |
| VARCHAR | “hello” | 长度编码字符串 |
| BLOB | hex 或 raw | 长度编码字节 |
| DATE | “2024-01-15” | 4+1 字节 (length + year/month/day) |
| DATETIME | “2024-01-15 10:30:00” | 8+1 字节 |
| NULL | 0xFB | null_bitmap 中的 bit |
| BIT | hex 字符串 | 长度编码字节 |
7.7 注意事项
重要提醒
null_bitmap 的偏移:二进制结果集行中的 null_bitmap 有 2 bit 保留位,列对应的 bit 从第 2 bit 开始(即
bit_index = column_index + 2)。有符号/无符号:二进制整数类型需要根据
UNSIGNED_FLAG标志选择正确的解码方式(有符号 vs 无符号)。DECIMAL 的精度:二进制 DECIMAL 使用长度编码字符串传输,而非原生二进制浮点,以保证精度。
BLOB/TEXT 大小:大 BLOB 值可能跨越多个数据包(使用 COM_STMT_SEND_LONG_DATA 预先发送)。
参数类型推断:驱动通常根据 Python/Java 值的类型自动推断 MySQL 参数类型,但显式指定类型更安全。
BIT 类型:BIT 类型以大端序二进制字节传输,需要特殊处理。
7.8 业务场景
场景一:高性能批量插入
使用二进制协议批量插入比文本协议更高效:
# 文本协议 (慢): 每条 INSERT 都要序列化为 SQL 字符串
# INSERT INTO t VALUES (1, 'Alice', 100.00)
# INSERT INTO t VALUES (2, 'Bob', 200.00)
# ...
# 二进制协议 (快): 预处理一次, 绑定执行多次
# PREPARE: INSERT INTO t VALUES (?, ?, ?)
# EXECUTE: [1, 'Alice', 100.00]
# EXECUTE: [2, 'Bob', 200.00]
# ...
场景二:精确的类型映射
当应用需要精确的类型映射时(如金融系统中的 DECIMAL),二进制协议可以避免文本转换的精度损失。
场景三:SQL 注入防护
二进制协议天然免疫 SQL 注入,因为参数值不会被拼接到 SQL 字符串中。
7.9 扩展阅读
- MySQL Internals: Binary Protocol Resultset
- MySQL Internals: COM_STMT_EXECUTE
- Go MySQL Driver: packets.go
上一章:06 - 文本协议 下一章:08 - 预处理语句详解 —— 深入了解预处理语句的完整生命周期。