强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

MySQL 传输协议精讲 / 09 - 结果集详解

第 09 章:结果集详解

9.1 结果集的完整结构

无论使用文本协议还是二进制协议,结果集的基本结构是相同的:

1. Column Count Packet          ← 告知列数
2. Column Definition Packets × N ← 每列的元数据
3. EOF / OK Packet              ← 列定义结束
4. Row Data Packets × M         ← 每行的数据
5. EOF / OK Packet              ← 行数据结束

包的顺序与类型判定

接收第一个包 →
  若首字节为 0xFF → ERR 包
  若首字节为 0x00 且长度 < 9 → OK 包(DML 语句)
  若首字节为 0xFB → LOCAL INFILE 请求
  其他 → Column Count(结果集开始)

后续包 →
  根据当前阶段判定类型:
  - 列定义阶段: Column Definition
  - 行数据阶段: Row Data 或 EOF/OK

9.2 字段描述详解

Column Definition 全字段

catalog (长度编码字符串)
  → 固定为 "def"
  → 不携带实际信息

schema (长度编码字符串)
  → 数据库名称
  → 示例: "mydb"

table_alias (长度编码字符串)
  → 表别名(AS 之后的名字)
  → 示例: SELECT t.name AS t_name 中的 "t"

table_name (长度编码字符串)
  → 真实表名
  → 示例: "users"

column_alias (长度编码字符串)
  → 列别名
  → 示例: SELECT name AS user_name 中的 "user_name"

column_name (长度编码字符串)
  → 真实列名
  → 示例: "name"

固定字段标记 (1 字节)
  → 固定值 0x0C(表示后续固定字段长度为 12 字节)

character_set (2 字节, 小端序)
  → 字符集 ID
  → 0x21 = latin1 (33), 0x2D = utf8mb4 (45), 0x3F = binary (63)

column_length (4 字节, 小端序)
  → 列最大长度(字节数)
  → VARCHAR(255) 在 utf8mb4 下 = 255 × 4 = 1020

column_type (1 字节)
  → 字段类型 ID
  → 0x03 = INT, 0xFD = VAR_STRING, ...

flags (2 字节, 小端序)
  → 列标志位
  → 0x0001 = NOT NULL, 0x0020 = UNSIGNED, ...

decimals (1 字节)
  → 小数位数
  → INT = 0, DECIMAL(10,2) = 2

filler (2 字节)
  → 保留,固定 0x00 0x00

字段类型与 SQL 类型的对应

SQL 类型类型 IDcolumn_length (utf8mb4)flagsdecimals
INT0x0311视情况0
BIGINT0x0820视情况0
DECIMAL(10,2)0xF612-2
VARCHAR(100)0xFD400-0
CHAR(50)0xFE200-0
TEXT0xFC2621400x00100
BLOB0xFC2621400x00100
DATE0x0A10-0
DATETIME0x0C19-0
TIMESTAMP0x07190x04000
FLOAT0x0412-31
DOUBLE0x0522-31
ENUM0xF7视最长值0x01000
SET0xF8视最长值0x08000
JSON0xF542949672950x00100

9.3 行数据的类型转换

文本协议中的值转换

在文本协议中,所有值以字符串传输。客户端驱动需要根据 column_type 转换:

"""
mysql_type_converter.py
将文本结果集的字符串值转换为 Python 类型
"""
from datetime import datetime, date, time, timedelta
from decimal import Decimal


def convert_text_value(value: str, column_type: int, flags: int = 0,
                       decimals: int = 0, charset: int = 45):
    """
    将文本结果集的字符串值转换为 Python 类型

    参数:
        value: 文本值 (可能为 None)
        column_type: 列类型 ID
        flags: 列标志
        decimals: 小数位数
        charset: 字符集 ID

    返回:
        转换后的 Python 值
    """
    if value is None:
        return None

    # 数值类型
    if column_type == 0x01:  # TINYINT
        return int(value)
    elif column_type == 0x02:  # SMALLINT
        return int(value)
    elif column_type == 0x03:  # INT
        return int(value)
    elif column_type == 0x04:  # FLOAT
        return float(value)
    elif column_type == 0x05:  # DOUBLE
        return float(value)
    elif column_type == 0x08:  # BIGINT
        return int(value)
    elif column_type == 0x09:  # MEDIUMINT
        return int(value)

    # 定点数
    elif column_type == 0x00:  # DECIMAL
        return Decimal(value)
    elif column_type == 0xF6:  # NEWDECIMAL
        return Decimal(value)

    # 日期时间
    elif column_type == 0x0A:  # DATE
        return datetime.strptime(value, '%Y-%m-%d').date()
    elif column_type == 0x0B:  # TIME
        return parse_mysql_time(value)
    elif column_type == 0x0C:  # DATETIME
        return datetime.strptime(value, '%Y-%m-%d %H:%M:%S')
    elif column_type == 0x07:  # TIMESTAMP
        return datetime.strptime(value, '%Y-%m-%d %H:%M:%S')
    elif column_type == 0x0D:  # YEAR
        return int(value)

    # 字符串
    elif column_type in (0x0F, 0xF5, 0xF7, 0xF8, 0xFD, 0xFE):
        return value

    # BLOB
    elif column_type in (0xF9, 0xFA, 0xFB, 0xFC):
        if charset == 63:  # binary
            return value.encode('latin1')
        return value

    # BIT
    elif column_type == 0x10:
        return int.from_bytes(value.encode('latin1'), 'big')

    else:
        return value


def parse_mysql_time(value: str) -> timedelta:
    """解析 MySQL TIME 值为 timedelta"""
    negative = value.startswith('-')
    if negative:
        value = value[1:]

    parts = value.split(':')
    hours = int(parts[0])
    minutes = int(parts[1])
    seconds = float(parts[2]) if len(parts) > 2 else 0

    total_seconds = hours * 3600 + minutes * 60 + seconds
    td = timedelta(seconds=total_seconds)
    return -td if negative else td


def demo():
    """演示类型转换"""
    print("MySQL 文本值类型转换演示")
    print()

    test_cases = [
        ("42",        0x03, 0, 0, "INT → Python int"),
        ("3.14",      0x05, 0, 0, "DOUBLE → Python float"),
        ("12345.67",  0xF6, 0, 2, "DECIMAL(10,2) → Python Decimal"),
        ("hello",     0xFD, 0, 0, "VARCHAR → Python str"),
        ("2024-01-15", 0x0A, 0, 0, "DATE → Python date"),
        ("2024-01-15 10:30:00", 0x0C, 0, 0, "DATETIME → Python datetime"),
        ("-838:59:59", 0x0B, 0, 0, "TIME → Python timedelta"),
        ("1",         0x01, 0, 0, "TINYINT → Python int"),
    ]

    for value, type_id, flags, decimals, desc in test_cases:
        result = convert_text_value(value, type_id, flags, decimals)
        print(f"  {desc}")
        print(f"    输入: {value!r}")
        print(f"    输出: {result!r} ({type(result).__name__})")
        print()


if __name__ == '__main__':
    demo()

9.4 流式读取

对于大结果集,不应将所有行加载到内存,而应逐行读取和处理。

为什么需要流式读取?

方式内存使用适用场景
缓冲读取O(n) — 全部结果集小结果集
流式读取O(1) — 每次只处理一行大结果集、ETL、数据迁移

Python 流式读取实现

"""
mysql_streaming.py
流式读取 MySQL 结果集
"""
import socket
import struct
from typing import Generator, List, Any


class StreamingResultSet:
    """流式结果集读取器"""

    def __init__(self, sock: socket.socket):
        self.sock = sock
        self.columns = []
        self.status_flags = 0
        self.warnings = 0
        self._read_column_count()
        self._read_column_definitions()

    def _read_packet(self) -> bytes:
        """读取一个数据包"""
        header = b''
        while len(header) < 4:
            header += self.sock.recv(4 - len(header))
        pkt_len = struct.unpack('<I', header[0:3] + b'\x00')[0]
        seq = header[3]
        payload = b''
        while len(payload) < pkt_len:
            payload += self.sock.recv(pkt_len - len(payload))
        return payload

    def _read_column_count(self):
        """读取列数"""
        payload = self._read_packet()
        self.column_count = payload[0]

    def _read_column_definitions(self):
        """读取列定义"""
        for _ in range(self.column_count):
            payload = self._read_packet()
            # 简化解析,只提取列名
            self.columns.append(self._parse_column_name(payload))

        # EOF
        eof = self._read_packet()

    def _parse_column_name(self, payload: bytes) -> str:
        """从列定义中提取列名(简化版)"""
        offset = 0
        for _ in range(5):  # 跳过 catalog, schema, table_alias, table_name
            length = payload[offset]
            if length >= 0xFB:
                offset += 2
            else:
                offset += 1 + length
        # 现在 offset 指向 column_name
        length = payload[offset]
        return payload[offset+1:offset+1+length].decode('utf-8', errors='replace')

    def rows(self) -> Generator[List[Any], None, None]:
        """生成器:逐行返回结果"""
        while True:
            payload = self._read_packet()

            # 检查 EOF
            if payload[0] == 0xFE and len(payload) < 9:
                if len(payload) >= 5:
                    self.status_flags = struct.unpack('<H', payload[3:5])[0]
                break

            # 解析行数据
            row = self._parse_row(payload)
            yield row

    def _parse_row(self, payload: bytes) -> List[Any]:
        """解析行数据(文本协议)"""
        row = []
        offset = 0
        for _ in range(self.column_count):
            if payload[offset] == 0xFB:  # NULL
                row.append(None)
                offset += 1
            else:
                # 长度编码字符串
                length, offset = self._read_le_int(payload, offset)
                value = payload[offset:offset+length].decode('utf-8', errors='replace')
                row.append(value)
                offset += length
        return row

    def _read_le_int(self, data, offset):
        """读取长度编码整数"""
        first = data[offset]
        if first < 0xFB:
            return first, offset + 1
        elif first == 0xFC:
            return struct.unpack('<H', data[offset+1:offset+3])[0], offset + 3
        elif first == 0xFD:
            return struct.unpack('<I', data[offset+1:offset+4] + b'\x00')[0], offset + 4
        elif first == 0xFE:
            return struct.unpack('<Q', data[offset+1:offset+9])[0], offset + 9


def process_large_table(sock, sql):
    """流式处理大表"""
    # 发送 COM_QUERY
    payload = b'\x03' + sql.encode('utf-8')
    packet = struct.pack('<I', len(payload))[:3] + struct.pack('B', 0) + payload
    sock.send(packet)

    # 创建流式读取器
    reader = StreamingResultSet(sock)

    print(f"列名: {reader.columns}")
    print("逐行处理中...")

    total = 0
    for row in reader.rows():
        # 处理每一行
        total += 1
        if total % 10000 == 0:
            print(f"  已处理 {total:,} 行")

    print(f"处理完成: {total:,} 行")

9.5 大结果集处理策略

策略一:设置 fetch size

# 使用游标 + FETCH 分批获取
stmt = prepare("SELECT * FROM huge_table")
execute_with_cursor(stmt, cursor_type=READ_ONLY)

while True:
    rows = fetch(stmt, num_rows=1000)
    if not rows:
        break
    process(rows)

策略二:LIMIT + OFFSET 分页

def paginate_query(conn, sql, page_size=1000):
    """分页查询"""
    offset = 0
    while True:
        page_sql = f"{sql} LIMIT {page_size} OFFSET {offset}"
        rows = execute(conn, page_sql)
        if not rows:
            break
        yield from rows
        offset += page_size

注意LIMIT OFFSET 在大偏移量时性能较差(需要扫描前 OFFSET 行)。更好的方式是使用游标分页(基于上一页最后一行的 ID)。

策略三:游标分页(Keyset Pagination)

def keyset_pagination(conn, table, last_id=0, page_size=1000):
    """基于键的游标分页"""
    while True:
        sql = f"SELECT * FROM {table} WHERE id > %s ORDER BY id LIMIT %s"
        rows = execute(conn, sql, [last_id, page_size])
        if not rows:
            break
        last_id = rows[-1]['id']
        yield from rows

9.6 SERVER_MORE_RESULTS_EXISTS

当 SQL 语句返回多个结果集时(存储过程、多语句),服务器通过状态标志通知客户端。

多结果集处理

def read_all_result_sets(sock):
    """读取所有结果集"""
    results = []
    while True:
        result = read_result_set(sock)
        results.append(result)

        if not (result.status_flags & 0x0008):  # SERVER_MORE_RESULTS_EXISTS
            break

    return results


# 示例:存储过程返回多个结果集
# CALL get_user_stats()
# 结果集1: 用户总数
# 结果集2: 活跃用户
# 结果集3: 最近注册

9.7 结果集元数据(CLIENT_OPTIONAL_RESULTSET_METADATA)

MySQL 8.0.3 引入 CLIENT_OPTIONAL_RESULTSET_METADATA,允许客户端选择是否接收完整元数据。

元数据选项

选项说明
RESULTSET_METADATA_FULL (0)返回完整元数据(默认)
RESULTSET_METADATA_SKIP_RESULTSET_METADATA (1)跳过元数据

当跳过元数据时,客户端必须已经通过 COM_STMT_PREPARE 获取了列定义信息。


9.8 本地文件加载(LOCAL INFILE)

当服务器请求客户端上传本地文件时,返回特殊的 LOCAL INFILE 包。

流程

Client:  COM_QUERY "LOAD DATA LOCAL INFILE '/tmp/data.csv' INTO TABLE t"
Server:  LOCAL_INFILE_Request (首字节 0xFB + 文件名)
Client:  文件数据 (一个或多个包)
Client:  空包 (文件结束标志)
Server:  OK 或 ERR
def handle_local_infile(sock, filename):
    """处理 LOCAL INFILE 请求"""
    with open(filename, 'rb') as f:
        while True:
            chunk = f.read(65535)
            if not chunk:
                break
            packet = encode_packet(chunk, 0)
            sock.send(packet)

    # 发送空包表示文件结束
    sock.send(encode_packet(b'', 0))

安全提示LOAD DATA LOCAL INFILE 存在安全风险,恶意服务器可以请求客户端的任意文件。生产环境建议禁用。


9.9 会话状态追踪

MySQL 5.7 引入会话状态追踪,当会话状态改变时在 OK 包中返回变化信息。

状态变化类型

类型说明
0SESSION_TRACK_SYSTEM_VARIABLES
1SESSION_TRACK_SCHEMA
2SESSION_TRACK_STATE_CHANGE
3SESSION_TRACK_GTIDS
4SESSION_TRACK_TRANSACTION_CHARACTERISTICS
5SESSION_TRACK_TRANSACTION_STATE

9.10 注意事项

重要提醒

  1. 流式 vs 缓冲:驱动默认可能使用缓冲模式读取整个结果集。处理大表时应显式启用流式模式。

  2. 文本 vs 二进制:文本结果集的数值以字符串传输,二进制结果集使用原生格式。选择合适的协议影响性能和精度。

  3. NULL 处理:文本协议中 NULL 是 0xFB,二进制协议中是 bitmap 中的 bit。两种方式都需要正确处理。

  4. 字符集转换:服务器根据连接字符集转换字符串值。如果表和连接字符集不一致,可能出现乱码。

  5. BLOB 大小:大 BLOB 值可能导致超过 max_allowed_packet,需要使用游标或分块读取。

  6. 结果集过大的风险:未加 LIMIT 的全表查询可能导致内存溢出或长时间阻塞。


9.11 业务场景

场景一:数据导出工具

def export_to_csv(sock, table, output_file, batch_size=10000):
    """将表数据导出为 CSV"""
    import csv

    sql = f"SELECT * FROM {table}"
    send_query(sock, sql)

    reader = StreamingResultSet(sock)

    with open(output_file, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(reader.columns)

        count = 0
        for row in reader.rows():
            writer.writerow(row)
            count += 1

    print(f"导出 {count} 行到 {output_file}")

场景二:数据复制/同步

def replicate_table(source_sock, dest_sock, table):
    """复制表数据到目标数据库"""
    send_query(source_sock, f"SELECT * FROM {table}")
    reader = StreamingResultSet(source_sock)

    # 在目标端预处理 INSERT
    placeholders = ", ".join(["?" for _ in reader.columns])
    stmt = send_prepare(dest_sock, f"INSERT INTO {table} VALUES ({placeholders})")

    for row in reader.rows():
        send_execute(dest_sock, stmt, row)

    send_close(dest_sock, stmt.statement_id)

9.12 扩展阅读


上一章08 - 预处理语句详解 下一章10 - 复制协议 —— 深入了解 MySQL 主从复制的协议细节。