MySQL 传输协议精讲 / 10 - 复制协议
第 10 章:复制协议
10.1 复制协议概述
MySQL 复制协议(Replication Protocol)是主从复制(Master-Slave Replication)的底层通信协议。从库(Slave/Replica)作为特殊的"客户端"连接主库(Master/Source),请求并接收二进制日志事件。
复制架构
┌──────────────┐ 复制协议 ┌──────────────┐
│ Master │ ───────────────────→ │ Slave │
│ (Source) │ Binlog Events │ (Replica) │
│ │ │ │
│ - Binlog │ COM_BINLOG_DUMP │ - SQL Thread │
│ - IO Thread │ ←───────────────── │ - IO Thread │
└──────────────┘ └──────────────┘
复制流程
Slave Master
│ │
│ ──── TCP 连接 (端口 3306) ─────────────→ │
│ │
│ ←─── HandshakeV10 ──────────────────── │
│ ──── HandshakeResponse (特殊用户) ───→ │
│ ←─── OK ───────────────────────────── │
│ │
│ ──── COM_REGISTER_SLAVE ─────────────→ │ 注册从库
│ ←─── OK ───────────────────────────── │
│ │
│ ──── COM_BINLOG_DUMP ────────────────→ │ 请求 Binlog
│ binlog_file = "mysql-bin.000001" │
│ binlog_pos = 4 │
│ │
│ ←─── Binlog Event ─────────────────── │ 事件流
│ ←─── Binlog Event ─────────────────── │
│ ←─── Binlog Event ─────────────────── │
│ ... (持续接收) ... │
10.2 COM_REGISTER_SLAVE(0x15)
从库向主库注册自己。
命令格式
偏移量 大小 字段 说明
────────────────────────────────────────────────
0 1 字节 0x15 命令类型
1 4 字节 server_id 从库的唯一服务器 ID
5 变长 slaves_hostname 从库主机名 (长度编码)
变长 slaves_user 用户名 (长度编码)
变长 slaves_password 密码 (长度编码)
2 字节 slaves_port 从库端口
4 字节 replication_rank 复制优先级
4 字节 master_id 主库 ID (通常为 0)
Python 实现
"""
mysql_replication.py
MySQL 复制协议实现
"""
import socket
import struct
import hashlib
def encode_packet(payload: bytes, seq: int) -> bytes:
return struct.pack('<I', len(payload))[:3] + struct.pack('B', seq) + payload
def encode_length_string(s: str) -> bytes:
encoded = s.encode('utf-8')
if len(encoded) < 0xFB:
return struct.pack('B', len(encoded)) + encoded
elif len(encoded) < 0x10000:
return b'\xFC' + struct.pack('<H', len(encoded)) + encoded
def send_register_slave(sock, server_id, hostname='', user='',
password='', port=3306, rank=0, master_id=0):
"""发送 COM_REGISTER_SLAVE"""
payload = bytearray()
payload.append(0x15)
payload.extend(struct.pack('<I', server_id))
payload.extend(encode_length_string(hostname))
payload.extend(encode_length_string(user))
payload.extend(encode_length_string(password))
payload.extend(struct.pack('<H', port))
payload.extend(struct.pack('<I', rank))
payload.extend(struct.pack('<I', master_id))
sock.send(encode_packet(bytes(payload), 0))
response = read_response(sock)
return response
def read_response(sock):
"""读取简单响应"""
header = b''
while len(header) < 4:
header += sock.recv(4 - len(header))
pkt_len = struct.unpack('<I', header[0:3] + b'\x00')[0]
payload = sock.recv(pkt_len)
return payload
10.3 COM_BINLOG_DUMP(0x12)
从库请求主库发送二进制日志事件。
命令格式
偏移量 大小 字段 说明
────────────────────────────────────────────────
0 1 字节 0x12 命令类型
1 4 字节 binlog_pos 起始位置
5 2 字节 flags 标志 (0x0001=非阻塞)
7 4 字节 server_id 从库服务器 ID
11 变长 binlog_filename Binlog 文件名
Python 实现
def send_binlog_dump(sock, binlog_pos=4, server_id=2,
binlog_filename='mysql-bin.000001', flags=0):
"""发送 COM_BINLOG_DUMP"""
payload = bytearray()
payload.append(0x12)
payload.extend(struct.pack('<I', binlog_pos))
payload.extend(struct.pack('<H', flags))
payload.extend(struct.pack('<I', server_id))
payload.extend(binlog_filename.encode('utf-8'))
sock.send(encode_packet(bytes(payload), 0))
print(f"[+] 发送 COM_BINLOG_DUMP: file={binlog_filename}, pos={binlog_pos}")
COM_BINLOG_DUMP_GTID(0x1E)
MySQL 5.6 引入基于 GTID 的复制:
偏移量 大小 字段
────────────────────────────────────────
0 1 字节 0x1E
1 2 字节 flags
3 4 字节 server_id
7 4 字节 binlog_name_len
11 变长 binlog_filename
8 字节 binlog_pos
4 字节 data_size
变长 UUID_SET (GTID 集合)
10.4 二进制日志事件格式
主库通过复制连接发送的每个**二进制日志事件(Binlog Event)**都有统一的格式。
事件头格式(19 字节,MySQL 5.0+)
偏移量 大小 字段 说明
──────────────────────────────────────────────────────
0 4 字节 timestamp 事件时间戳 (Unix 时间)
4 1 字节 type_code 事件类型代码
5 4 字节 server_id 产生事件的服务器 ID
9 4 字节 event_length 事件总长度(含头和数据)
13 4 字节 next_position 下一个事件的位置
17 2 字节 flags 事件标志
常见事件类型
| 类型码 | 事件名称 | 说明 |
|---|---|---|
| 0 | UNKNOWN | 未知事件 |
| 1 | START_EVENT_V3 | Binlog 开始 (旧版本) |
| 2 | QUERY_EVENT | SQL 语句事件 |
| 3 | STOP_EVENT | Binlog 停止 |
| 4 | ROTATE_EVENT | Binlog 文件切换 |
| 5 | INTVAR_EVENT | 自增值 |
| 7 | APPEND_BLOCK_EVENT | 数据块追加 |
| 8 | DELETE_FILE_EVENT | 文件删除 |
| 9 | RAND_EVENT | RAND() 种子 |
| 10 | XID_EVENT | 事务提交 |
| 11 | TABLE_MAP_EVENT | 表定义映射 |
| 13 | WRITE_ROWS_EVENTv1 | 行写入 (v1) |
| 14 | UPDATE_ROWS_EVENTv1 | 行更新 (v1) |
| 15 | DELETE_ROWS_EVENTv1 | 行删除 (v1) |
| 16 | INCIDENT_EVENT | 异常事件 |
| 17 | HEARTBEAT_EVENT | 心跳事件 |
| 19 | WRITE_ROWS_EVENTv2 | 行写入 (v2) |
| 20 | UPDATE_ROWS_EVENTv2 | 行更新 (v2) |
| 21 | DELETE_ROWS_EVENTv2 | 行删除 (v2) |
| 33 | GTID_LOG_EVENT | GTID 事件 |
| 34 | ANONYMOUS_GTID_LOG_EVENT | 匿名 GTID |
| 35 | PREVIOUS_GTIDS_LOG_EVENT | 前序 GTID 集合 |
Python 事件解析
from dataclasses import dataclass
from typing import Optional
from enum import IntEnum
class BinlogEventType(IntEnum):
UNKNOWN = 0
START_EVENT_V3 = 1
QUERY_EVENT = 2
STOP_EVENT = 3
ROTATE_EVENT = 4
INTVAR_EVENT = 5
TABLE_MAP_EVENT = 19 # 实际为 11, 此处用 v2
WRITE_ROWS_EVENT = 23 # 实际为 19
UPDATE_ROWS_EVENT = 24 # 实际为 20
DELETE_ROWS_EVENT = 25 # 实际为 21
XID_EVENT = 16 # 实际为 10
GTID_LOG_EVENT = 33
HEARTBEAT_EVENT = 27 # 实际为 17
@dataclass
class BinlogEventHeader:
timestamp: int
type_code: int
server_id: int
event_length: int
next_position: int
flags: int
@dataclass
class BinlogEvent:
header: BinlogEventHeader
data: bytes
def parse_event_header(data: bytes) -> BinlogEventHeader:
"""解析事件头(19 字节)"""
if len(data) < 19:
raise ValueError(f"事件头数据不足: {len(data)} < 19")
return BinlogEventHeader(
timestamp=struct.unpack('<I', data[0:4])[0],
type_code=data[4],
server_id=struct.unpack('<I', data[5:9])[0],
event_length=struct.unpack('<I', data[9:13])[0],
next_position=struct.unpack('<I', data[13:17])[0],
flags=struct.unpack('<H', data[17:19])[0],
)
def read_binlog_event(sock) -> Optional[BinlogEvent]:
"""从复制连接读取一个 Binlog 事件"""
# 复制事件以普通 MySQL 数据包格式传输
header = b''
while len(header) < 4:
chunk = sock.recv(4 - len(header))
if not chunk:
return None
header += chunk
pkt_len = struct.unpack('<I', header[0:3] + b'\x00')[0]
seq = header[3]
payload = b''
while len(payload) < pkt_len:
chunk = sock.recv(pkt_len - len(payload))
if not chunk:
return None
payload += chunk
# 检查是否是 ERR 包
if payload[0] == 0xFF:
error_code = struct.unpack('<H', payload[1:3])[0]
message = payload[9:].decode('utf-8', errors='replace')
print(f"[!] 错误: [{error_code}] {message}")
return None
# 解析事件头
event_header = parse_event_header(payload)
return BinlogEvent(
header=event_header,
data=payload,
)
def parse_query_event(data: bytes) -> dict:
"""
解析 QUERY_EVENT
QUERY_EVENT 包含执行的 SQL 语句
格式:
4 bytes: thread_id
4 bytes: exec_time
1 byte: db_name_length
2 bytes: error_code
2 bytes: status_vars_length
变长: status_vars
变长: db_name (null terminated)
变长: sql_statement
"""
offset = 19 # 跳过事件头
thread_id = struct.unpack('<I', data[offset:offset+4])[0]
offset += 4
exec_time = struct.unpack('<I', data[offset:offset+4])[0]
offset += 4
db_name_length = data[offset]
offset += 1
error_code = struct.unpack('<H', data[offset:offset+2])[0]
offset += 2
status_vars_length = struct.unpack('<H', data[offset:offset+2])[0]
offset += 2
# 跳过状态变量
offset += status_vars_length
# 数据库名 (null 结尾)
db_name_end = data.index(b'\x00', offset)
db_name = data[offset:db_name_end].decode('utf-8', errors='replace')
offset = db_name_end + 1
# SQL 语句 (剩余部分)
sql = data[offset:].decode('utf-8', errors='replace')
return {
'thread_id': thread_id,
'exec_time': exec_time,
'db_name': db_name,
'error_code': error_code,
'sql': sql,
}
def parse_rotate_event(data: bytes) -> dict:
"""
解析 ROTATE_EVENT
格式:
8 bytes: position
变长: new_binlog_filename
"""
offset = 19 # 跳过事件头
position = struct.unpack('<Q', data[offset:offset+8])[0]
offset += 8
filename = data[offset:].decode('utf-8', errors='replace').rstrip('\x00')
return {
'position': position,
'filename': filename,
}
def parse_xid_event(data: bytes) -> dict:
"""
解析 XID_EVENT (事务提交)
格式:
8 bytes: xid (事务 ID)
"""
offset = 19
xid = struct.unpack('<Q', data[offset:offset+8])[0]
return {'xid': xid}
def parse_heartbeat_event(data: bytes) -> dict:
"""
解析 HEARTBEAT_EVENT
格式:
4 bytes: timestamp
8 bytes: current_binlog_pos
2 bytes: flags
变长: server_uuid (16 bytes)
变长: binlog_filename
"""
offset = 19
return {'type': 'heartbeat'}
# 事件名称映射
EVENT_NAMES = {
0: "UNKNOWN", 1: "START_V3", 2: "QUERY", 3: "STOP",
4: "ROTATE", 5: "INTVAR", 7: "APPEND_BLOCK", 8: "DELETE_FILE",
9: "RAND", 10: "XID", 11: "TABLE_MAP", 13: "WRITE_ROWS",
14: "UPDATE_ROWS", 15: "DELETE_ROWS", 16: "INCIDENT",
17: "HEARTBEAT", 19: "WRITE_ROWS_V2", 20: "UPDATE_ROWS_V2",
21: "DELETE_ROWS_V2", 33: "GTID", 34: "ANONYMOUS_GTID",
35: "PREVIOUS_GTIDS",
}
10.5 复制过滤
主库可以通过 binlog-do-db 和 binlog-ignore-db 过滤事件,但这在事件产生时生效,而非在复制传输时过滤。
10.6 半同步复制
MySQL 5.5 引入半同步复制(Semi-Synchronous Replication),确保至少一个从库确认收到事件后才返回客户端。
协议差异
普通异步复制:
Client → Master → OK (立即返回)
Master → Slave (异步发送 Binlog)
半同步复制:
Client → Master → 执行事务
Master → Slave → 发送 Binlog
Slave → Master → ACK (确认收到)
Master → Client → OK (收到 ACK 后返回)
半同步复制使用额外的插件协议交互:
Master → Slave: Binlog Event
Slave → Master: Semi-sync ACK
包格式: [1 字节标识 (0xE0)] + [8 字节 binlog_pos] + [binlog_filename]
10.7 GTID 复制
MySQL 5.6 引入全局事务标识符(GTID),每个事务有全局唯一标识。
GTID 格式
source_id:transaction_id
示例: 3E11FA47-71CA-11E1-9E33-C80AA9429562:23
GTID 事件
GTID_LOG_EVENT(类型码 33)格式:
偏移量 大小 字段
────────────────────────────────────────
19 1 字节 flags (commit flag)
20 16 字节 UUID (source_id)
36 8 字节 transaction_id (小端序)
44 4 字节 logical_clock (组提交)
10.8 心跳机制
主库定期发送 HEARTBEAT_EVENT(类型码 17)以保持连接活跃并通知从库当前位置。
def handle_heartbeat(event: BinlogEvent):
"""处理心跳事件"""
offset = 19
timestamp = struct.unpack('<I', event.data[offset:offset+4])[0]
binlog_pos = struct.unpack('<Q', event.data[offset+4:offset+12])[0]
print(f"[心跳] timestamp={timestamp}, pos={binlog_pos}")
10.9 完整的从库复制客户端
"""
mysql_replica_client.py
简化版 MySQL 从库客户端
"""
def connect_as_replica(host, port, user, password, server_id,
binlog_file, binlog_pos):
"""连接到主库并接收 Binlog 事件"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
# 握手认证
handshake = read_response(sock)
print(f"[+] 收到 HandshakeV10")
# 发送认证(需要有 REPLICATION SLAVE 权限的用户)
auth_response = build_auth_response(handshake, user, password)
sock.send(encode_packet(auth_response, 1))
auth_result = read_response(sock)
if auth_result[0] != 0x00:
print("[!] 认证失败")
return
print("[+] 认证成功")
# 注册从库
send_register_slave(sock, server_id)
print("[+] 注册从库成功")
# 请求 Binlog
send_binlog_dump(sock, binlog_pos, server_id, binlog_file)
print(f"[+] 请求 Binlog: {binlog_file}:{binlog_pos}")
# 接收事件
event_count = 0
while True:
event = read_binlog_event(sock)
if event is None:
print("[!] 连接断开")
break
event_count += 1
event_name = EVENT_NAMES.get(event.header.type_code,
f"UNKNOWN({event.header.type_code})")
print(f" 事件 #{event_count}: {event_name} "
f"ts={event.header.timestamp} "
f"len={event.header.event_length} "
f"pos={event.header.next_position}")
# 解析特定事件类型
if event.header.type_code == 2: # QUERY_EVENT
info = parse_query_event(event.data)
print(f" SQL: {info['sql'][:80]}")
elif event.header.type_code == 4: # ROTATE_EVENT
info = parse_rotate_event(event.data)
print(f" 轮转到: {info['filename']}:{info['position']}")
elif event.header.type_code == 10: # XID_EVENT
info = parse_xid_event(event.data)
print(f" 事务提交: XID={info['xid']}")
elif event.header.type_code == 17: # HEARTBEAT_EVENT
print(" [心跳]")
sock.close()
if __name__ == '__main__':
connect_as_replica(
host='127.0.0.1',
port=3306,
user='repl_user',
password='repl_pass',
server_id=100,
binlog_file='mysql-bin.000001',
binlog_pos=4
)
10.10 注意事项
重要提醒
复制用户权限:从库连接需要
REPLICATION SLAVE和REPLICATION CLIENT权限。server_id 唯一性:复制拓扑中的每个服务器必须有唯一的
server_id。Binlog 格式:推荐使用
ROW格式(binlog_format=ROW),避免STATEMENT格式的不确定性。网络稳定性:复制对网络稳定性敏感,短暂的网络中断可能导致复制延迟或中断。
GTID 限制:GTID 不支持
CREATE TABLE ... SELECT和CREATE TEMPORARY TABLE在事务中的操作。半同步降级:半同步复制在超时后会自动降级为异步复制,可能导致数据丢失。
10.11 业务场景
场景一:数据库迁移
使用复制协议将数据从旧主库迁移到新主库,实现零停机迁移。
场景二:数据变更捕获(CDC)
通过解析 Binlog 事件实现实时数据变更捕获,常见的 CDC 工具包括:
- Debezium:基于 Kafka Connect 的 CDC 平台
- Canal:阿里巴巴开源的 Binlog 增量订阅组件
- Maxwell:将 Binlog 转为 JSON 的守护进程
场景二的代码示例
# 使用复制协议实现简单的 CDC
def capture_changes(host, port, user, password, tables):
"""捕获指定表的数据变更"""
connect_as_replica(host, port, user, password,
server_id=999,
binlog_file='mysql-bin.000001',
binlog_pos=4)
# 在事件循环中过滤 TABLE_MAP_EVENT 匹配目标表
# 解析 WRITE/UPDATE/DELETE_ROWS_EVENT 获取变更数据
场景三:延迟从库
通过设置 relay_log_recovery 和延迟 SQL 线程,实现数据回滚的安全网。
10.12 扩展阅读
- MySQL Internals: Replication Protocol
- MySQL Internals: Binlog Event Format
- MySQL Reference: Replication
- Debezium Documentation
- Canal Wiki
上一章:09 - 结果集详解 下一章:11 - 代理与中间件 —— 深入了解如何实现 MySQL 代理和中间件。