MySQL 传输协议精讲 / 09 - 结果集详解
第 09 章:结果集详解
9.1 结果集的完整结构
无论使用文本协议还是二进制协议,结果集的基本结构是相同的:
1. Column Count Packet ← 告知列数
2. Column Definition Packets × N ← 每列的元数据
3. EOF / OK Packet ← 列定义结束
4. Row Data Packets × M ← 每行的数据
5. EOF / OK Packet ← 行数据结束
包的顺序与类型判定
接收第一个包 →
若首字节为 0xFF → ERR 包
若首字节为 0x00 且长度 < 9 → OK 包(DML 语句)
若首字节为 0xFB → LOCAL INFILE 请求
其他 → Column Count(结果集开始)
后续包 →
根据当前阶段判定类型:
- 列定义阶段: Column Definition
- 行数据阶段: Row Data 或 EOF/OK
9.2 字段描述详解
Column Definition 全字段
catalog (长度编码字符串)
→ 固定为 "def"
→ 不携带实际信息
schema (长度编码字符串)
→ 数据库名称
→ 示例: "mydb"
table_alias (长度编码字符串)
→ 表别名(AS 之后的名字)
→ 示例: SELECT t.name AS t_name 中的 "t"
table_name (长度编码字符串)
→ 真实表名
→ 示例: "users"
column_alias (长度编码字符串)
→ 列别名
→ 示例: SELECT name AS user_name 中的 "user_name"
column_name (长度编码字符串)
→ 真实列名
→ 示例: "name"
固定字段标记 (1 字节)
→ 固定值 0x0C(表示后续固定字段长度为 12 字节)
character_set (2 字节, 小端序)
→ 字符集 ID
→ 0x21 = latin1 (33), 0x2D = utf8mb4 (45), 0x3F = binary (63)
column_length (4 字节, 小端序)
→ 列最大长度(字节数)
→ VARCHAR(255) 在 utf8mb4 下 = 255 × 4 = 1020
column_type (1 字节)
→ 字段类型 ID
→ 0x03 = INT, 0xFD = VAR_STRING, ...
flags (2 字节, 小端序)
→ 列标志位
→ 0x0001 = NOT NULL, 0x0020 = UNSIGNED, ...
decimals (1 字节)
→ 小数位数
→ INT = 0, DECIMAL(10,2) = 2
filler (2 字节)
→ 保留,固定 0x00 0x00
字段类型与 SQL 类型的对应
| SQL 类型 | 类型 ID | column_length (utf8mb4) | flags | decimals |
|---|---|---|---|---|
| INT | 0x03 | 11 | 视情况 | 0 |
| BIGINT | 0x08 | 20 | 视情况 | 0 |
| DECIMAL(10,2) | 0xF6 | 12 | - | 2 |
| VARCHAR(100) | 0xFD | 400 | - | 0 |
| CHAR(50) | 0xFE | 200 | - | 0 |
| TEXT | 0xFC | 262140 | 0x0010 | 0 |
| BLOB | 0xFC | 262140 | 0x0010 | 0 |
| DATE | 0x0A | 10 | - | 0 |
| DATETIME | 0x0C | 19 | - | 0 |
| TIMESTAMP | 0x07 | 19 | 0x0400 | 0 |
| FLOAT | 0x04 | 12 | - | 31 |
| DOUBLE | 0x05 | 22 | - | 31 |
| ENUM | 0xF7 | 视最长值 | 0x0100 | 0 |
| SET | 0xF8 | 视最长值 | 0x0800 | 0 |
| JSON | 0xF5 | 4294967295 | 0x0010 | 0 |
9.3 行数据的类型转换
文本协议中的值转换
在文本协议中,所有值以字符串传输。客户端驱动需要根据 column_type 转换:
"""
mysql_type_converter.py
将文本结果集的字符串值转换为 Python 类型
"""
from datetime import datetime, date, time, timedelta
from decimal import Decimal
def convert_text_value(value: str, column_type: int, flags: int = 0,
decimals: int = 0, charset: int = 45):
"""
将文本结果集的字符串值转换为 Python 类型
参数:
value: 文本值 (可能为 None)
column_type: 列类型 ID
flags: 列标志
decimals: 小数位数
charset: 字符集 ID
返回:
转换后的 Python 值
"""
if value is None:
return None
# 数值类型
if column_type == 0x01: # TINYINT
return int(value)
elif column_type == 0x02: # SMALLINT
return int(value)
elif column_type == 0x03: # INT
return int(value)
elif column_type == 0x04: # FLOAT
return float(value)
elif column_type == 0x05: # DOUBLE
return float(value)
elif column_type == 0x08: # BIGINT
return int(value)
elif column_type == 0x09: # MEDIUMINT
return int(value)
# 定点数
elif column_type == 0x00: # DECIMAL
return Decimal(value)
elif column_type == 0xF6: # NEWDECIMAL
return Decimal(value)
# 日期时间
elif column_type == 0x0A: # DATE
return datetime.strptime(value, '%Y-%m-%d').date()
elif column_type == 0x0B: # TIME
return parse_mysql_time(value)
elif column_type == 0x0C: # DATETIME
return datetime.strptime(value, '%Y-%m-%d %H:%M:%S')
elif column_type == 0x07: # TIMESTAMP
return datetime.strptime(value, '%Y-%m-%d %H:%M:%S')
elif column_type == 0x0D: # YEAR
return int(value)
# 字符串
elif column_type in (0x0F, 0xF5, 0xF7, 0xF8, 0xFD, 0xFE):
return value
# BLOB
elif column_type in (0xF9, 0xFA, 0xFB, 0xFC):
if charset == 63: # binary
return value.encode('latin1')
return value
# BIT
elif column_type == 0x10:
return int.from_bytes(value.encode('latin1'), 'big')
else:
return value
def parse_mysql_time(value: str) -> timedelta:
"""解析 MySQL TIME 值为 timedelta"""
negative = value.startswith('-')
if negative:
value = value[1:]
parts = value.split(':')
hours = int(parts[0])
minutes = int(parts[1])
seconds = float(parts[2]) if len(parts) > 2 else 0
total_seconds = hours * 3600 + minutes * 60 + seconds
td = timedelta(seconds=total_seconds)
return -td if negative else td
def demo():
"""演示类型转换"""
print("MySQL 文本值类型转换演示")
print()
test_cases = [
("42", 0x03, 0, 0, "INT → Python int"),
("3.14", 0x05, 0, 0, "DOUBLE → Python float"),
("12345.67", 0xF6, 0, 2, "DECIMAL(10,2) → Python Decimal"),
("hello", 0xFD, 0, 0, "VARCHAR → Python str"),
("2024-01-15", 0x0A, 0, 0, "DATE → Python date"),
("2024-01-15 10:30:00", 0x0C, 0, 0, "DATETIME → Python datetime"),
("-838:59:59", 0x0B, 0, 0, "TIME → Python timedelta"),
("1", 0x01, 0, 0, "TINYINT → Python int"),
]
for value, type_id, flags, decimals, desc in test_cases:
result = convert_text_value(value, type_id, flags, decimals)
print(f" {desc}")
print(f" 输入: {value!r}")
print(f" 输出: {result!r} ({type(result).__name__})")
print()
if __name__ == '__main__':
demo()
9.4 流式读取
对于大结果集,不应将所有行加载到内存,而应逐行读取和处理。
为什么需要流式读取?
| 方式 | 内存使用 | 适用场景 |
|---|---|---|
| 缓冲读取 | O(n) — 全部结果集 | 小结果集 |
| 流式读取 | O(1) — 每次只处理一行 | 大结果集、ETL、数据迁移 |
Python 流式读取实现
"""
mysql_streaming.py
流式读取 MySQL 结果集
"""
import socket
import struct
from typing import Generator, List, Any
class StreamingResultSet:
"""流式结果集读取器"""
def __init__(self, sock: socket.socket):
self.sock = sock
self.columns = []
self.status_flags = 0
self.warnings = 0
self._read_column_count()
self._read_column_definitions()
def _read_packet(self) -> bytes:
"""读取一个数据包"""
header = b''
while len(header) < 4:
header += self.sock.recv(4 - len(header))
pkt_len = struct.unpack('<I', header[0:3] + b'\x00')[0]
seq = header[3]
payload = b''
while len(payload) < pkt_len:
payload += self.sock.recv(pkt_len - len(payload))
return payload
def _read_column_count(self):
"""读取列数"""
payload = self._read_packet()
self.column_count = payload[0]
def _read_column_definitions(self):
"""读取列定义"""
for _ in range(self.column_count):
payload = self._read_packet()
# 简化解析,只提取列名
self.columns.append(self._parse_column_name(payload))
# EOF
eof = self._read_packet()
def _parse_column_name(self, payload: bytes) -> str:
"""从列定义中提取列名(简化版)"""
offset = 0
for _ in range(5): # 跳过 catalog, schema, table_alias, table_name
length = payload[offset]
if length >= 0xFB:
offset += 2
else:
offset += 1 + length
# 现在 offset 指向 column_name
length = payload[offset]
return payload[offset+1:offset+1+length].decode('utf-8', errors='replace')
def rows(self) -> Generator[List[Any], None, None]:
"""生成器:逐行返回结果"""
while True:
payload = self._read_packet()
# 检查 EOF
if payload[0] == 0xFE and len(payload) < 9:
if len(payload) >= 5:
self.status_flags = struct.unpack('<H', payload[3:5])[0]
break
# 解析行数据
row = self._parse_row(payload)
yield row
def _parse_row(self, payload: bytes) -> List[Any]:
"""解析行数据(文本协议)"""
row = []
offset = 0
for _ in range(self.column_count):
if payload[offset] == 0xFB: # NULL
row.append(None)
offset += 1
else:
# 长度编码字符串
length, offset = self._read_le_int(payload, offset)
value = payload[offset:offset+length].decode('utf-8', errors='replace')
row.append(value)
offset += length
return row
def _read_le_int(self, data, offset):
"""读取长度编码整数"""
first = data[offset]
if first < 0xFB:
return first, offset + 1
elif first == 0xFC:
return struct.unpack('<H', data[offset+1:offset+3])[0], offset + 3
elif first == 0xFD:
return struct.unpack('<I', data[offset+1:offset+4] + b'\x00')[0], offset + 4
elif first == 0xFE:
return struct.unpack('<Q', data[offset+1:offset+9])[0], offset + 9
def process_large_table(sock, sql):
"""流式处理大表"""
# 发送 COM_QUERY
payload = b'\x03' + sql.encode('utf-8')
packet = struct.pack('<I', len(payload))[:3] + struct.pack('B', 0) + payload
sock.send(packet)
# 创建流式读取器
reader = StreamingResultSet(sock)
print(f"列名: {reader.columns}")
print("逐行处理中...")
total = 0
for row in reader.rows():
# 处理每一行
total += 1
if total % 10000 == 0:
print(f" 已处理 {total:,} 行")
print(f"处理完成: {total:,} 行")
9.5 大结果集处理策略
策略一:设置 fetch size
# 使用游标 + FETCH 分批获取
stmt = prepare("SELECT * FROM huge_table")
execute_with_cursor(stmt, cursor_type=READ_ONLY)
while True:
rows = fetch(stmt, num_rows=1000)
if not rows:
break
process(rows)
策略二:LIMIT + OFFSET 分页
def paginate_query(conn, sql, page_size=1000):
"""分页查询"""
offset = 0
while True:
page_sql = f"{sql} LIMIT {page_size} OFFSET {offset}"
rows = execute(conn, page_sql)
if not rows:
break
yield from rows
offset += page_size
注意:
LIMIT OFFSET在大偏移量时性能较差(需要扫描前 OFFSET 行)。更好的方式是使用游标分页(基于上一页最后一行的 ID)。
策略三:游标分页(Keyset Pagination)
def keyset_pagination(conn, table, last_id=0, page_size=1000):
"""基于键的游标分页"""
while True:
sql = f"SELECT * FROM {table} WHERE id > %s ORDER BY id LIMIT %s"
rows = execute(conn, sql, [last_id, page_size])
if not rows:
break
last_id = rows[-1]['id']
yield from rows
9.6 SERVER_MORE_RESULTS_EXISTS
当 SQL 语句返回多个结果集时(存储过程、多语句),服务器通过状态标志通知客户端。
多结果集处理
def read_all_result_sets(sock):
"""读取所有结果集"""
results = []
while True:
result = read_result_set(sock)
results.append(result)
if not (result.status_flags & 0x0008): # SERVER_MORE_RESULTS_EXISTS
break
return results
# 示例:存储过程返回多个结果集
# CALL get_user_stats()
# 结果集1: 用户总数
# 结果集2: 活跃用户
# 结果集3: 最近注册
9.7 结果集元数据(CLIENT_OPTIONAL_RESULTSET_METADATA)
MySQL 8.0.3 引入 CLIENT_OPTIONAL_RESULTSET_METADATA,允许客户端选择是否接收完整元数据。
元数据选项
| 选项 | 说明 |
|---|---|
RESULTSET_METADATA_FULL (0) | 返回完整元数据(默认) |
RESULTSET_METADATA_SKIP_RESULTSET_METADATA (1) | 跳过元数据 |
当跳过元数据时,客户端必须已经通过 COM_STMT_PREPARE 获取了列定义信息。
9.8 本地文件加载(LOCAL INFILE)
当服务器请求客户端上传本地文件时,返回特殊的 LOCAL INFILE 包。
流程
Client: COM_QUERY "LOAD DATA LOCAL INFILE '/tmp/data.csv' INTO TABLE t"
Server: LOCAL_INFILE_Request (首字节 0xFB + 文件名)
Client: 文件数据 (一个或多个包)
Client: 空包 (文件结束标志)
Server: OK 或 ERR
def handle_local_infile(sock, filename):
"""处理 LOCAL INFILE 请求"""
with open(filename, 'rb') as f:
while True:
chunk = f.read(65535)
if not chunk:
break
packet = encode_packet(chunk, 0)
sock.send(packet)
# 发送空包表示文件结束
sock.send(encode_packet(b'', 0))
安全提示:
LOAD DATA LOCAL INFILE存在安全风险,恶意服务器可以请求客户端的任意文件。生产环境建议禁用。
9.9 会话状态追踪
MySQL 5.7 引入会话状态追踪,当会话状态改变时在 OK 包中返回变化信息。
状态变化类型
| 类型 | 说明 |
|---|---|
| 0 | SESSION_TRACK_SYSTEM_VARIABLES |
| 1 | SESSION_TRACK_SCHEMA |
| 2 | SESSION_TRACK_STATE_CHANGE |
| 3 | SESSION_TRACK_GTIDS |
| 4 | SESSION_TRACK_TRANSACTION_CHARACTERISTICS |
| 5 | SESSION_TRACK_TRANSACTION_STATE |
9.10 注意事项
重要提醒
流式 vs 缓冲:驱动默认可能使用缓冲模式读取整个结果集。处理大表时应显式启用流式模式。
文本 vs 二进制:文本结果集的数值以字符串传输,二进制结果集使用原生格式。选择合适的协议影响性能和精度。
NULL 处理:文本协议中 NULL 是
0xFB,二进制协议中是 bitmap 中的 bit。两种方式都需要正确处理。字符集转换:服务器根据连接字符集转换字符串值。如果表和连接字符集不一致,可能出现乱码。
BLOB 大小:大 BLOB 值可能导致超过
max_allowed_packet,需要使用游标或分块读取。结果集过大的风险:未加 LIMIT 的全表查询可能导致内存溢出或长时间阻塞。
9.11 业务场景
场景一:数据导出工具
def export_to_csv(sock, table, output_file, batch_size=10000):
"""将表数据导出为 CSV"""
import csv
sql = f"SELECT * FROM {table}"
send_query(sock, sql)
reader = StreamingResultSet(sock)
with open(output_file, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(reader.columns)
count = 0
for row in reader.rows():
writer.writerow(row)
count += 1
print(f"导出 {count} 行到 {output_file}")
场景二:数据复制/同步
def replicate_table(source_sock, dest_sock, table):
"""复制表数据到目标数据库"""
send_query(source_sock, f"SELECT * FROM {table}")
reader = StreamingResultSet(source_sock)
# 在目标端预处理 INSERT
placeholders = ", ".join(["?" for _ in reader.columns])
stmt = send_prepare(dest_sock, f"INSERT INTO {table} VALUES ({placeholders})")
for row in reader.rows():
send_execute(dest_sock, stmt, row)
send_close(dest_sock, stmt.statement_id)
9.12 扩展阅读
- MySQL Internals: COM_QUERY Response
- MySQL Internals: Binary Protocol Resultset
- MySQL Reference: LOAD DATA INFILE
上一章:08 - 预处理语句详解 下一章:10 - 复制协议 —— 深入了解 MySQL 主从复制的协议细节。