强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

MySQL 传输协议精讲 / 05 - 命令与请求

第 05 章:命令与请求

5.1 命令概述

握手认证完成后,客户端与服务器进入命令阶段(Command Phase)。客户端发送命令包,服务器返回响应包。

命令包格式

┌────────── 3 字节包长度 ──────────┐
│         1 字节序列号              │
├──────────────────────────────────┤
│   1 字节命令类型 (COM_xxx)       │
│   变长命令参数                    │
└──────────────────────────────────┘

第一个字节标识命令类型,后续字节是命令的参数。

完整命令列表

命令说明引入版本
COM_SLEEP0x00无操作(内部使用)3.x
COM_QUIT0x01关闭连接3.x
COM_INIT_DB0x02切换数据库3.x
COM_QUERY0x03执行 SQL 查询3.x
COM_FIELD_LIST0x04获取字段信息(已废弃)3.x
COM_REFRESH0x07刷新操作3.x
COM_STATISTICS0x08获取服务器统计信息3.x
COM_PROCESS_INFO0x0A获取进程列表(已废弃)3.x
COM_PROCESS_KILL0x0C终止连接(已废弃,用 KILL SQL)3.x
COM_DEBUG0x0D触发服务器 dump3.x
COM_PING0x0E心跳检测4.0
COM_CHANGE_USER0x11更改用户4.1
COM_RESET_CONNECTION0x1F重置连接状态5.7
COM_SET_OPTION0x1B设置选项4.1
COM_STMT_PREPARE0x16预处理 SQL 语句4.1
COM_STMT_EXECUTE0x17执行预处理语句4.1
COM_STMT_SEND_LONG_DATA0x18发送长参数数据4.1
COM_STMT_CLOSE0x19关闭预处理语句4.1
COM_STMT_RESET0x1A重置预处理语句4.1
COM_STMT_FETCH0x1C获取预处理语句游标数据5.0

5.2 COM_QUERY(0x03)

COM_QUERY 是最常用的命令,用于执行任意 SQL 语句。

命令格式

字节偏移   大小      字段
──────────────────────────────
0          1 字节    0x03 (命令类型)
1          变长      SQL 语句 (UTF-8 编码)

Python 实现

"""
mysql_com_query.py
发送 COM_QUERY 命令并解析响应
"""
import socket
import struct


def encode_packet(payload: bytes, seq: int) -> bytes:
    return struct.pack('<I', len(payload))[:3] + struct.pack('B', seq) + payload


def decode_packet(data: bytes):
    if len(data) < 4:
        return None, None, data
    pkt_len = struct.unpack('<I', data[0:3] + b'\x00')[0]
    seq = data[3]
    if len(data) < 4 + pkt_len:
        return None, None, data
    payload = data[4:4 + pkt_len]
    return payload, seq, data[4 + pkt_len:]


def send_com_query(sock: socket.socket, sql: str, seq: int = 0):
    """发送 COM_QUERY 命令"""
    payload = b'\x03' + sql.encode('utf-8')
    packet = encode_packet(payload, seq)
    sock.send(packet)


def read_response(sock: socket.socket) -> dict:
    """读取并解析服务器响应"""
    data = b''
    while True:
        chunk = sock.recv(4096)
        if not chunk:
            break
        data += chunk

        # 尝试解析第一个包来确定类型
        if len(data) >= 4:
            pkt_len = struct.unpack('<I', data[0:3] + b'\x00')[0]
            if len(data) >= 4 + pkt_len:
                break

    payload, seq, remaining = decode_packet(data)
    if payload is None:
        return {'type': 'INCOMPLETE'}

    first_byte = payload[0]

    if first_byte == 0x00:
        return parse_ok_packet(payload)
    elif first_byte == 0xFF:
        return parse_err_packet(payload)
    elif first_byte == 0xFB:
        return {'type': 'LOCAL_INFILE'}
    else:
        return parse_result_set(data)


def parse_ok_packet(payload: bytes) -> dict:
    """解析 OK 包"""
    offset = 1  # 跳过 0x00 标识

    affected_rows, offset = read_length_encoded_int(payload, offset)
    last_insert_id, offset = read_length_encoded_int(payload, offset)
    status_flags = struct.unpack('<H', payload[offset:offset+2])[0]
    offset += 2
    warnings = struct.unpack('<H', payload[offset:offset+2])[0]

    return {
        'type': 'OK',
        'affected_rows': affected_rows,
        'last_insert_id': last_insert_id,
        'status_flags': status_flags,
        'warnings': warnings,
    }


def parse_err_packet(payload: bytes) -> dict:
    """解析 ERR 包"""
    error_code = struct.unpack('<H', payload[1:3])[0]
    # 检查 SQL state 标记
    sql_state_marker = ''
    sql_state = ''
    message = ''
    if len(payload) > 3:
        if payload[3:4] == b'#':
            sql_state = payload[4:9].decode('ascii', errors='replace')
            message = payload[9:].decode('utf-8', errors='replace')
        else:
            message = payload[3:].decode('utf-8', errors='replace')

    return {
        'type': 'ERR',
        'error_code': error_code,
        'sql_state': sql_state,
        'message': message,
    }


def parse_result_set(data: bytes) -> dict:
    """解析结果集(简化版)"""
    # 后续章节详解
    return {'type': 'RESULT_SET', 'raw': data[:100].hex()}


def read_length_encoded_int(data: bytes, offset: int) -> tuple:
    first = data[offset]
    if first < 0xFB:
        return first, offset + 1
    elif first == 0xFC:
        return struct.unpack('<H', data[offset+1:offset+3])[0], offset + 3
    elif first == 0xFD:
        return struct.unpack('<I', data[offset+1:offset+4] + b'\x00')[0], offset + 4
    elif first == 0xFE:
        return struct.unpack('<Q', data[offset+1:offset+9])[0], offset + 9


# 完整示例:连接、认证、执行查询
def full_example():
    """完整的连接-认证-查询流程"""
    # 注意:此处省略了完整的认证流程
    # 实际使用请参考 PyMySQL 或其他驱动
    print("COM_QUERY 命令格式演示:")
    print()

    # 模拟 COM_QUERY 数据包
    sql = "SELECT id, name FROM users WHERE age > 18"
    payload = b'\x03' + sql.encode('utf-8')
    packet = encode_packet(payload, seq=0)

    print(f"SQL: {sql}")
    print(f"命令字节: 0x{payload[0]:02X}")
    print(f"完整 payload: {payload.hex()}")
    print(f"数据包大小: {len(packet)} 字节")
    print(f"数据包头: {packet[:4].hex()}")
    print()

    # 解析
    print("解析 COM_QUERY 数据包:")
    pkt_payload, pkt_seq, _ = decode_packet(packet)
    command = pkt_payload[0]
    sql_text = pkt_payload[1:].decode('utf-8')
    print(f"  命令类型: COM_QUERY (0x{command:02X})")
    print(f"  序列号: {pkt_seq}")
    print(f"  SQL: {sql_text}")


if __name__ == '__main__':
    full_example()

COM_QUERY 的响应类型

响应首字节说明
OK0x00DDL/DML 成功(INSERT、UPDATE、DELETE 等)
ERR0xFF执行出错
Result Set列数SELECT、SHOW、DESCRIBE 等返回结果集的语句
LOCAL_INFILE0xFBLOAD DATA LOCAL 请求

5.3 COM_INIT_DB(0x02)

切换当前数据库,等价于 SQL 语句 USE database_name

命令格式

字节偏移   大小      字段
──────────────────────────────
0          1 字节    0x02
1          变长      数据库名 (不带 null 终止符)

Python 实现

def send_com_init_db(sock, database_name, seq=0):
    """发送 COM_INIT_DB 命令"""
    payload = b'\x02' + database_name.encode('utf-8')
    packet = encode_packet(payload, seq)
    sock.send(packet)
    return read_response(sock)

5.4 COM_PING(0x0E)

心跳检测,不带任何参数。服务器返回 OK 包。

命令格式

字节偏移   大小      字段
──────────────────────────────
0          1 字节    0x0E

Python 实现

def send_com_ping(sock, seq=0):
    """发送 COM_PING 命令"""
    payload = b'\x0E'
    packet = encode_packet(payload, seq)
    sock.send(packet)
    return read_response(sock)


# 连接池心跳检测示例
def check_connection_alive(sock, timeout=2):
    """检查连接是否存活"""
    import select
    sock.setblocking(False)
    try:
        send_com_ping(sock)
        ready = select.select([sock], [], [], timeout)
        if ready[0]:
            response = read_response(sock)
            return response.get('type') == 'OK'
        return False
    except Exception:
        return False
    finally:
        sock.setblocking(True)

5.5 COM_QUIT(0x01)

关闭连接,服务器收到后关闭连接。不返回响应。

命令格式

字节偏移   大小      字段
──────────────────────────────
0          1 字节    0x01
def send_com_quit(sock, seq=0):
    """发送 COM_QUIT 命令"""
    payload = b'\x01'
    packet = encode_packet(payload, seq)
    sock.send(packet)
    # COM_QUIT 不需要等待响应

5.6 COM_STMT_PREPARE(0x16)

预处理 SQL 语句,返回语句句柄和参数/列元数据。详细内容见第 08 章。

命令格式

字节偏移   大小      字段
──────────────────────────────
0          1 字节    0x16
1          变长      SQL 语句 (含 ? 占位符)

响应格式

成功时返回 COM_STMT_PREPARE Response

字节偏移   大小      字段
──────────────────────────────
0          1 字节    0x00 (OK 标识)
1          4 字节    statement_id (语句句柄)
5          2 字节    num_columns (列数)
7          2 字节    num_params (参数数)
9          1 字节    0x00 (填充)
10         2 字节    warnings (如果有)
后续       变长      参数定义 + 列定义
def send_com_stmt_prepare(sock, sql, seq=0):
    """发送 COM_STMT_PREPARE 命令"""
    payload = b'\x16' + sql.encode('utf-8')
    packet = encode_packet(payload, seq)
    sock.send(packet)
    return read_response(sock)


# 示例
# response = send_com_stmt_prepare(sock, "SELECT * FROM users WHERE id = ? AND name = ?")
# statement_id = response['statement_id']  # 4 字节句柄
# num_params = 2                            # 2 个参数
# num_columns = ...                         # 返回的列数

5.7 COM_STMT_EXECUTE(0x17)

执行已预处理的语句,绑定参数值。详细内容见第 07、08 章。

命令格式

字节偏移   大小      字段
──────────────────────────────
0          1 字节    0x17
1          4 字节    statement_id
5          1 字节    flags (cursor type 等)
6          4 字节    iteration_count (固定为 1)
10         变长      null_bitmap
          变长      new_params_bound_flag
          变长      参数类型 + 参数值
def send_com_stmt_execute(sock, statement_id, params=None, seq=0):
    """发送 COM_STMT_EXECUTE 命令"""
    payload = bytearray()
    payload.append(0x17)                            # 命令类型
    payload.extend(struct.pack('<I', statement_id))  # 语句 ID
    payload.append(0x00)                            # flags: CURSOR_TYPE_NO_CURSOR
    payload.extend(struct.pack('<I', 1))             # iteration_count

    if params:
        num_params = len(params)
        # null_bitmap: 每个参数占 1 bit
        null_bitmap_size = (num_params + 7) // 8
        null_bitmap = bytearray(null_bitmap_size)
        for i, param in enumerate(params):
            if param is None:
                null_bitmap[i // 8] |= (1 << (i % 8))
        payload.extend(null_bitmap)
        payload.append(0x01)  # new_params_bound_flag: 发送新参数

        # 参数类型和值
        for param in params:
            if param is None:
                continue
            if isinstance(param, int):
                payload.extend(struct.pack('<B', 0x08))  # MYSQL_TYPE_LONGLONG
                payload.extend(struct.pack('<q', param))
            elif isinstance(param, str):
                encoded = param.encode('utf-8')
                payload.extend(struct.pack('<B', 0x0F))  # MYSQL_TYPE_VAR_STRING
                payload.extend(write_length_encoded_int(len(encoded)))
                payload.extend(encoded)
            elif isinstance(param, float):
                payload.extend(struct.pack('<B', 0x05))  # MYSQL_TYPE_DOUBLE
                payload.extend(struct.pack('<d', param))
    else:
        payload.extend(b'\x00' * 1)  # 空 null_bitmap
        payload.append(0x00)         # new_params_bound_flag: 0

    packet = encode_packet(bytes(payload), seq)
    sock.send(packet)
    return read_response(sock)

5.8 COM_STMT_FETCH(0x1C)

从预处理语句的游标中获取指定数量的行。需要在 COM_STMT_EXECUTE 时启用游标。

命令格式

字节偏移   大小      字段
──────────────────────────────
0          1 字节    0x1C
1          4 字节    statement_id
5          4 字节    num_rows (要获取的行数)

5.9 COM_STMT_CLOSE(0x19)

关闭预处理语句,释放服务器资源。不返回响应

def send_com_stmt_close(sock, statement_id, seq=0):
    """关闭预处理语句"""
    payload = b'\x19' + struct.pack('<I', statement_id)
    packet = encode_packet(payload, seq)
    sock.send(packet)
    # COM_STMT_CLOSE 不返回响应

5.10 COM_RESET_CONNECTION(0x1F)

MySQL 5.7.3 引入,重置连接状态但不关闭连接:

def send_com_reset_connection(sock, seq=0):
    """重置连接状态"""
    payload = b'\x1F'
    packet = encode_packet(payload, seq)
    sock.send(packet)
    return read_response(sock)

重置内容

重置项说明
事务状态回滚未提交的事务
自动提交恢复为服务器默认值
临时表删除所有临时表
会话变量恢复为全局默认值
预处理语句关闭所有预处理语句
用户变量清除所有用户变量

5.11 COM_CHANGE_USER(0x11)

在已建立的连接上切换用户:

字节偏移   大小      字段
──────────────────────────────
0          1 字节    0x11
1          变长      username (null 结尾)
          变长      auth_response (长度编码)
          变长      database (null 结尾)
          1 字节    character_set
          变长      auth_plugin_name (null 结尾)

5.12 COM_STATISTICS(0x08)

获取服务器运行统计信息,返回一个文本字符串:

def send_com_statistics(sock, seq=0):
    """获取服务器统计"""
    payload = b'\x08'
    packet = encode_packet(payload, seq)
    sock.send(packet)

    data = sock.recv(4096)
    payload, _, _ = decode_packet(data)
    return payload.decode('utf-8')
    # 返回类似:
    # Uptime: 12345  Threads: 2  Questions: 100  Slow queries: 0
    # Opens: 50  Flush tables: 1  Open tables: 20  Queries per second avg: 0.008

5.13 完整的命令交互示例

"""
mysql_command_session.py
完整的 MySQL 命令会话示例
"""
import socket
import struct


class MySQLSession:
    """简化的 MySQL 会话(需要已认证的连接)"""

    def __init__(self, sock):
        self.sock = sock
        self.seq = 0

    def _send(self, payload: bytes):
        """发送一个数据包"""
        header = struct.pack('<I', len(payload))[:3] + struct.pack('B', self.seq & 0xFF)
        self.sock.send(header + payload)
        self.seq = (self.seq + 1) & 0xFF

    def _recv(self) -> bytes:
        """接收一个响应"""
        data = b''
        while True:
            chunk = self.sock.recv(65536)
            if not chunk:
                break
            data += chunk
            if len(data) >= 4:
                pkt_len = struct.unpack('<I', data[0:3] + b'\x00')[0]
                if len(data) >= 4 + pkt_len:
                    break
        self.seq = (self.seq + 1) & 0xFF
        return data[4:]  # 去掉包头

    def query(self, sql: str) -> dict:
        """执行 SQL 查询"""
        self.seq = 0
        self._send(b'\x03' + sql.encode('utf-8'))
        response = self._recv()

        first_byte = response[0]
        if first_byte == 0x00:
            return {'type': 'OK', 'data': response}
        elif first_byte == 0xFF:
            error_code = struct.unpack('<H', response[1:3])[0]
            message = response[9:].decode('utf-8', errors='replace')
            return {'type': 'ERR', 'code': error_code, 'message': message}
        else:
            return {'type': 'RESULT', 'data': response}

    def init_db(self, database: str):
        """切换数据库"""
        self.seq = 0
        self._send(b'\x02' + database.encode('utf-8'))
        return self._recv()

    def ping(self):
        """心跳检测"""
        self.seq = 0
        self._send(b'\x0E')
        response = self._recv()
        return response[0] == 0x00

    def quit(self):
        """关闭连接"""
        self.seq = 0
        self._send(b'\x01')
        self.sock.close()

    def statistics(self):
        """获取统计信息"""
        self.seq = 0
        self._send(b'\x08')
        return self._recv().decode('utf-8')


# 使用示例
def demo():
    print("MySQL 命令会话演示")
    print("请确保 MySQL 服务器已启动并完成认证")
    print()
    print("典型命令交互:")
    print("  COM_QUERY   → SELECT * FROM users")
    print("  COM_INIT_DB → USE mydb")
    print("  COM_PING    → 心跳检测")
    print("  COM_STAT    → 获取统计信息")
    print("  COM_QUIT    → 关闭连接")


if __name__ == '__main__':
    demo()

5.14 注意事项

重要提醒

  1. COM_QUERY 的字符集:SQL 语句使用连接握手时协商的字符集编码。推荐 UTF-8。

  2. COM_QUERY 不能多条语句:除非客户端在握手时设置了 CLIENT_MULTI_STATEMENTS 能力标志,否则一次只能发送一条 SQL。

  3. COM_PING 的用途:连接池通常使用 COM_PING 检测连接存活,比 SELECT 1 更轻量。

  4. COM_RESET_CONNECTION vs COM_CHANGE_USER:前者重置所有状态;后者只切换用户,保留数据库等。

  5. 废弃命令COM_FIELD_LISTCOM_PROCESS_INFOCOM_PROCESS_KILL 在 MySQL 8.0 中已废弃,应使用对应的 SQL 语句替代。

  6. 预处理语句资源泄漏:使用 COM_STMT_PREPARE 后必须用 COM_STMT_CLOSE 释放,否则会导致服务器资源泄漏。


5.15 业务场景

场景一:连接池健康检查

连接池(如 HikariCP)在借出连接前先执行 COM_PING 检测:

def borrow_connection(pool):
    conn = pool.get()
    if not conn.ping():
        conn = pool.replace(conn)
    return conn

场景二:批量操作优化

使用 CLIENT_MULTI_STATEMENTS 在一次请求中执行多条语句:

# 启用多语句模式后
sql = "INSERT INTO t1 VALUES(1); INSERT INTO t1 VALUES(2); INSERT INTO t1 VALUES(3)"
sock.send(b'\x03' + sql.encode('utf-8'))
# 接收多个结果集...

场景三:连接状态重置

Web 应用的请求处理完成后,使用 COM_RESET_CONNECTION 将连接归还到干净状态,避免会话变量泄漏。


5.16 扩展阅读


上一章04 - 数据包格式 下一章06 - 文本协议 —— 深入理解 COM_QUERY 的文本结果集格式。