HTTP 协议详解教程 / 第 13 章:WebSocket 协议
第 13 章:WebSocket 协议
WebSocket 提供全双工通信能力,是实现实时应用(聊天、游戏、协同编辑)的首选协议。
13.1 WebSocket 概述
HTTP vs WebSocket
| 特性 | HTTP | WebSocket |
|---|
| 通信模式 | 请求-响应 | 全双工 |
| 连接方式 | 短连接/长轮询 | 持久连接 |
| 服务端推送 | 不支持(需要 SSE) | 原生支持 |
| 数据格式 | 文本 | 文本 + 二进制 |
| 协议开销 | 每次请求头部 | 首次握手后极小 |
适用场景
| 场景 | 说明 |
|---|
| 即时通讯 | 聊天室、私信 |
| 实时协作 | 多人编辑、白板 |
| 在线游戏 | 实时状态同步 |
| 金融行情 | 实时股价推送 |
| 通知系统 | 实时通知推送 |
| 监控面板 | 实时数据展示 |
13.2 WebSocket 握手
握手流程
WebSocket 使用 HTTP 升级机制建立连接:
# 客户端请求(HTTP Upgrade)
GET /chat HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
Origin: https://example.com
# 服务器响应
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
握手关键头部
| 头部 | 说明 |
|---|
Upgrade: websocket | 请求升级到 WebSocket |
Connection: Upgrade | 表示连接升级 |
Sec-WebSocket-Key | 客户端随机 Base64 key |
Sec-WebSocket-Accept | 服务器确认(key + 魔法字符串的 SHA1) |
Sec-WebSocket-Version | 协议版本(始终为 13) |
Sec-WebSocket-Protocol | 子协议协商 |
Sec-WebSocket-Extensions | 扩展协商 |
Node.js 手动实现握手
const http = require('http');
const crypto = require('crypto');
const MAGIC_STRING = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11';
const server = http.createServer();
server.on('upgrade', (req, socket, head) => {
const key = req.headers['sec-websocket-key'];
const acceptKey = crypto
.createHash('sha1')
.update(key + MAGIC_STRING)
.digest('base64');
socket.write(
'HTTP/1.1 101 Switching Protocols\r\n' +
'Upgrade: websocket\r\n' +
'Connection: Upgrade\r\n' +
`Sec-WebSocket-Accept: ${acceptKey}\r\n` +
'\r\n'
);
// 握手完成,可以开始 WebSocket 通信
console.log('WebSocket 连接已建立');
});
server.listen(3000);
13.3 WebSocket 帧格式
帧结构
0 1 2 3
0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+-------------------------------+
| Extended payload length continued, if payload len == 127 |
+-------------------------------+-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------+-------------------------------+
操作码(Opcode)
| 值 | 含义 | 说明 |
|---|
| 0x0 | Continuation | 延续帧 |
| 0x1 | Text | 文本帧(UTF-8) |
| 0x2 | Binary | 二进制帧 |
| 0x8 | Close | 关闭连接 |
| 0x9 | Ping | 心跳请求 |
| 0xA | Pong | 心跳响应 |
13.4 使用 ws 库
服务端
const WebSocket = require('ws');
const http = require('http');
const server = http.createServer();
const wss = new WebSocket.Server({ server });
wss.on('connection', (ws, req) => {
console.log('新连接:', req.socket.remoteAddress);
// 接收消息
ws.on('message', (data, isBinary) => {
if (isBinary) {
console.log('收到二进制数据:', data.length, '字节');
} else {
const message = data.toString();
console.log('收到消息:', message);
// 广播给所有客户端
wss.clients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(message);
}
});
}
});
// 发送欢迎消息
ws.send(JSON.stringify({
type: 'welcome',
message: '连接成功',
timestamp: Date.now()
}));
// 连接关闭
ws.on('close', (code, reason) => {
console.log('连接关闭:', code, reason.toString());
});
// 错误处理
ws.on('error', (error) => {
console.error('WebSocket 错误:', error);
});
});
server.listen(3000, () => {
console.log('WebSocket 服务器运行在 ws://localhost:3000');
});
客户端
// 浏览器端
const ws = new WebSocket('ws://localhost:3000');
// 连接打开
ws.onopen = () => {
console.log('连接已建立');
ws.send(JSON.stringify({ type: 'greeting', data: '你好' }));
};
// 接收消息
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log('收到:', data);
};
// 连接关闭
ws.onclose = (event) => {
console.log('连接关闭:', event.code, event.reason);
};
// 错误处理
ws.onerror = (error) => {
console.error('WebSocket 错误:', error);
};
// 发送消息
ws.send('Hello WebSocket');
ws.send(JSON.stringify({ type: 'message', content: '你好' }));
ws.send(arrayBuffer); // 发送二进制数据
13.5 心跳保活
Ping/Pong 机制
// 服务端心跳
class HeartbeatManager {
constructor(wss, interval = 30000) {
this.wss = wss;
this.interval = interval;
this.start();
}
start() {
this.timer = setInterval(() => {
this.wss.clients.forEach(ws => {
if (ws.isAlive === false) {
console.log('连接超时,断开');
return ws.terminate();
}
ws.isAlive = false;
ws.ping(); // 发送 Ping
});
}, this.interval);
}
stop() {
clearInterval(this.timer);
}
}
// 使用
const wss = new WebSocket.Server({ server });
wss.on('connection', (ws) => {
ws.isAlive = true;
ws.on('pong', () => {
ws.isAlive = true; // 收到 Pong,连接正常
});
});
const heartbeat = new HeartbeatManager(wss, 30000);
应用层心跳
// 应用层心跳(兼容性更好)
class AppHeartbeat {
constructor(ws, interval = 25000) {
this.ws = ws;
this.interval = interval;
this.timeout = null;
this.start();
}
start() {
// 定期发送 ping
this.pingTimer = setInterval(() => {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ type: 'ping', ts: Date.now() }));
}
}, this.interval);
// 等待 pong 响应
this.ws.on('message', (data) => {
const msg = JSON.parse(data.toString());
if (msg.type === 'pong') {
clearTimeout(this.timeout);
}
});
}
stop() {
clearInterval(this.pingTimer);
clearTimeout(this.timeout);
}
}
13.6 断线重连策略
指数退避重连
class ReconnectingWebSocket {
constructor(url, options = {}) {
this.url = url;
this.maxRetries = options.maxRetries || 10;
this.baseDelay = options.baseDelay || 1000;
this.maxDelay = options.maxDelay || 30000;
this.retryCount = 0;
this.connect();
}
connect() {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
console.log('连接成功');
this.retryCount = 0; // 重置重试计数
this.onopen?.();
};
this.ws.onmessage = (event) => {
this.onmessage?.(event);
};
this.ws.onclose = (event) => {
console.log('连接关闭:', event.code);
this.onclose?.(event);
if (!event.wasClean && this.retryCount < this.maxRetries) {
this.scheduleReconnect();
}
};
this.ws.onerror = (error) => {
console.error('连接错误:', error);
this.onerror?.(error);
};
}
scheduleReconnect() {
this.retryCount++;
// 指数退避 + 随机抖动
const delay = Math.min(
this.baseDelay * Math.pow(2, this.retryCount - 1) + Math.random() * 1000,
this.maxDelay
);
console.log(`将在 ${delay}ms 后重连 (第 ${this.retryCount} 次)`);
setTimeout(() => this.connect(), delay);
}
send(data) {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(data);
} else {
console.warn('连接未就绪,消息未发送');
}
}
close() {
this.maxRetries = 0; // 禁止自动重连
this.ws.close();
}
}
// 使用
const ws = new ReconnectingWebSocket('ws://localhost:3000');
ws.onmessage = (event) => console.log('收到:', event.data);
ws.send('Hello');
13.7 房间/频道管理
// 多房间聊天室
const WebSocket = require('ws');
class ChatServer {
constructor(server) {
this.wss = new WebSocket.Server({ server });
this.rooms = new Map(); // room -> Set<ws>
this.clients = new Map(); // ws -> { rooms: Set, username: string }
this.wss.on('connection', (ws) => this.handleConnection(ws));
}
handleConnection(ws) {
this.clients.set(ws, { rooms: new Set(), username: null });
ws.on('message', (data) => {
const msg = JSON.parse(data.toString());
this.handleMessage(ws, msg);
});
ws.on('close', () => {
const client = this.clients.get(ws);
client.rooms.forEach(room => this.leaveRoom(ws, room));
this.clients.delete(ws);
});
}
handleMessage(ws, msg) {
switch (msg.type) {
case 'join':
this.joinRoom(ws, msg.room);
break;
case 'leave':
this.leaveRoom(ws, msg.room);
break;
case 'message':
this.broadcastToRoom(msg.room, {
type: 'message',
from: this.clients.get(ws).username,
content: msg.content,
timestamp: Date.now()
}, ws);
break;
case 'set_username':
this.clients.get(ws).username = msg.username;
break;
}
}
joinRoom(ws, room) {
if (!this.rooms.has(room)) {
this.rooms.set(room, new Set());
}
this.rooms.get(room).add(ws);
this.clients.get(ws).rooms.add(room);
this.broadcastToRoom(room, {
type: 'system',
message: `${this.clients.get(ws).username} 加入了房间`
});
}
leaveRoom(ws, room) {
this.rooms.get(room)?.delete(ws);
this.clients.get(ws).rooms.delete(room);
this.broadcastToRoom(room, {
type: 'system',
message: `${this.clients.get(ws).username} 离开了房间`
});
}
broadcastToRoom(room, message, excludeWs = null) {
const clients = this.rooms.get(room);
if (!clients) return;
const data = JSON.stringify(message);
clients.forEach(client => {
if (client !== excludeWs && client.readyState === WebSocket.OPEN) {
client.send(data);
}
});
}
}
module.exports = ChatServer;
13.8 Nginx WebSocket 代理
# WebSocket 代理配置
location /ws/ {
proxy_pass http://backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
# 超时设置
proxy_read_timeout 3600s;
proxy_send_timeout 3600s;
}
13.9 安全考虑
| 安全措施 | 说明 |
|---|
| WSS 协议 | 生产环境必须使用 wss://(加密) |
| Origin 验证 | 检查 Origin 头防止跨站 WebSocket 劫持 |
| 认证 | 握手时验证 Token 或 Cookie |
| 限流 | 限制消息频率和连接数 |
| 输入验证 | 验证所有消息格式和内容 |
// 认证中间件
wss.on('connection', (ws, req) => {
// 验证 Origin
const origin = req.headers.origin;
if (!allowedOrigins.includes(origin)) {
ws.close(1008, '不允许的来源');
return;
}
// 验证 Token
const token = new URL(req.url, 'http://localhost').searchParams.get('token');
try {
const payload = jwt.verify(token, SECRET);
ws.user = payload;
} catch (err) {
ws.close(1008, '认证失败');
return;
}
// 限流
ws.messageCount = 0;
ws.lastMessageTime = Date.now();
});
⚠️ 注意事项
- 使用 WSS:生产环境必须加密
- 心跳保活:防止连接被中间设备断开
- 指数退避重连:避免重连风暴
- 消息大小限制:限制消息最大大小防止内存溢出
- 连接数限制:限制每个用户的连接数
🔗 扩展阅读
下一章:第 14 章:HTTP/2 — 多路复用、头部压缩、服务器推送、帧结构