TCP/UDP 网络协议教程 / 10-Socket API 基础
10 - Socket API 基础
10.1 Socket 概述
Socket 是网络编程的 API 接口,提供端到端的通信抽象。
Socket 类型:
• SOCK_STREAM (TCP) - 可靠、有序、字节流
• SOCK_DGRAM (UDP) - 不可靠、无序、数据报
• SOCK_RAW (RAW) - 直接访问 IP 层
地址族:
• AF_INET - IPv4
• AF_INET6 - IPv6
• AF_UNIX - 本地通信
10.2 Socket 创建与基本操作
import socket
# 创建 TCP Socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 创建 UDP Socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 常用选项
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 地址重用
sock.setblocking(False) # 非阻塞模式
sock.settimeout(10) # 超时时间
# 关闭
sock.close()
# 推荐使用 with 语句
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect(('example.com', 80))
sock.sendall(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n')
data = sock.recv(4096)
10.3 bind 绑定地址
"""绑定到指定地址和端口"""
import socket
# 创建服务器 Socket
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 绑定地址
# 格式:(host, port)
server_sock.bind(('0.0.0.0', 8080)) # 所有网卡,端口 8080
server_sock.bind(('127.0.0.1', 8080)) # 仅本地
server_sock.bind(('192.168.1.100', 8080)) # 特定网卡
# 绑定到随机端口
server_sock.bind(('0.0.0.0', 0))
print(f"绑定到: {server_sock.getsockname()}") # ('0.0.0.0', 随机端口)
特殊地址:
• '0.0.0.0' - 监听所有接口
• '127.0.0.1' - 仅本地
• '' - 等同于 '0.0.0.0'
端口 0:由系统分配随机端口
10.4 listen 监听连接
"""TCP 服务器监听"""
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_sock.bind(('0.0.0.0', 8080))
# 开始监听
# backlog: 等待连接的队列长度
server_sock.listen(128)
print("服务器启动,等待连接...")
backlog 参数:
┌─────────────────┐
SYN ───→ │ 半连接队列 │ (SYN Queue)
└────────┬────────┘
│ 三次握手完成
▼
┌─────────────────┐
accept()← │ 全连接队列 │ (Accept Queue)
└─────────────────┘
• 半连接队列:SYN_RECV 状态的连接
• 全连接队列:ESTABLISHED 等待 accept()
backlog 参数控制全连接队列大小
实际值 = min(backlog, somaxconn)
10.5 accept 接受连接
"""阻塞式接受连接"""
import socket
import threading
def handle_client(client_sock, addr):
"""处理客户端连接"""
print(f"新连接: {addr}")
try:
while True:
data = client_sock.recv(1024)
if not data:
break
client_sock.sendall(data) # 回显
except ConnectionResetError:
pass
finally:
client_sock.close()
print(f"连接关闭: {addr}")
def tcp_server():
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('0.0.0.0', 8080))
server.listen(128)
while True:
client_sock, addr = server.accept() # 阻塞等待
# 多线程处理
t = threading.Thread(target=handle_client, args=(client_sock, addr))
t.daemon = True
t.start()
if __name__ == '__main__':
tcp_server()
10.6 connect 连接服务器
"""TCP 客户端连接"""
import socket
def tcp_client():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(10)
# 连接服务器
try:
sock.connect(('127.0.0.1', 8080))
except ConnectionRefusedError:
print("连接被拒绝")
return
except socket.timeout:
print("连接超时")
return
# 发送数据
sock.sendall(b"Hello Server")
# 接收响应
data = sock.recv(1024)
print(f"收到: {data.decode()}")
tcp_client()
10.7 send 和 recv
"""
TCP 发送注意事项:
• send() 不保证发送所有数据
• sendall() 保证发送所有数据
• 返回实际发送的字节数
TCP 接收注意事项:
• recv() 返回实际接收到的数据
• 返回空 bytes 表示连接关闭
• 需要循环接收直到收到完整消息
"""
def send_all(sock, data):
"""确保发送所有数据"""
total_sent = 0
while total_sent < len(data):
sent = sock.send(data[total_sent:])
if sent == 0:
raise ConnectionError("发送失败")
total_sent += sent
return total_sent
def recv_all(sock, size):
"""确保接收所有数据"""
data = bytearray()
while len(data) < size:
chunk = sock.recv(size - len(data))
if not chunk:
raise ConnectionError("连接关闭")
data.extend(chunk)
return bytes(data)
10.8 UDP 收发
"""UDP 发送和接收"""
import socket
def udp_sender():
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 发送数据报
sock.sendto(b"Hello UDP", ('127.0.0.1', 9999))
# 接收响应
data, addr = sock.recvfrom(1024)
print(f"来自 {addr}: {data.decode()}")
sock.close()
def udp_server():
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('0.0.0.0', 9999))
while True:
data, addr = sock.recvfrom(1024) # 接收数据报和来源地址
print(f"来自 {addr}: {data.decode()}")
# 回复
sock.sendto(b"ACK", addr)
# sendto(data, address) - 发送到指定地址
# recvfrom(bufsize) - 返回 (data, address)
10.9 IO 多路复用
select(全平台)
"""select 模型"""
import socket
import select
def select_server():
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('0.0.0.0', 8080))
server.listen(128)
server.setblocking(False)
inputs = [server] # 监听读事件
outputs = [] # 监听写事件
while True:
readable, writable, exceptional = select.select(inputs, outputs, inputs)
for sock in readable:
if sock is server:
# 新连接
client, addr = server.accept()
client.setblocking(False)
inputs.append(client)
else:
# 数据到达
data = sock.recv(1024)
if data:
print(f"收到: {data}")
if sock not in outputs:
outputs.append(sock)
else:
# 连接关闭
if sock in outputs:
outputs.remove(sock)
inputs.remove(sock)
sock.close()
for sock in writable:
# 可以发送数据
sock.sendall(b"ACK")
outputs.remove(sock)
for sock in exceptional:
# 异常
inputs.remove(sock)
if sock in outputs:
outputs.remove(sock)
sock.close()
poll(Linux/Unix)
"""poll 模型"""
import socket
import select
def poll_server():
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('0.0.0.0', 8080))
server.listen(128)
poll = select.poll()
poll.register(server, select.POLLIN)
fd_to_socket = {server.fileno(): server}
while True:
events = poll.poll()
for fd, event in events:
sock = fd_to_socket[fd]
if sock is server:
client, addr = server.accept()
poll.register(client, select.POLLIN)
fd_to_socket[client.fileno()] = client
elif event & select.POLLIN:
data = sock.recv(1024)
if data:
sock.sendall(data)
else:
poll.unregister(sock)
sock.close()
del fd_to_socket[fd]
epoll(Linux 推荐)
"""epoll 模型"""
import socket
import select
def epoll_server():
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('0.0.0.0', 8080))
server.listen(128)
server.setblocking(False)
epoll = select.epoll()
epoll.register(server.fileno(), select.EPOLLIN)
fd_to_socket = {server.fileno(): server}
try:
while True:
events = epoll.poll()
for fd, event in events:
sock = fd_to_socket[fd]
if sock is server:
client, addr = server.accept()
client.setblocking(False)
epoll.register(client.fileno(), select.EPOLLIN)
fd_to_socket[client.fileno()] = client
elif event & select.EPOLLIN:
data = sock.recv(1024)
if data:
# 修改为监听写事件
epoll.modify(fd, select.EPOLLOUT)
else:
epoll.unregister(fd)
sock.close()
del fd_to_socket[fd]
elif event & select.EPOLLOUT:
sock.sendall(b"ACK")
epoll.modify(fd, select.EPOLLIN)
finally:
epoll.unregister(server.fileno())
epoll.close()
server.close()
IO 多路复用对比
| 方法 | 最大连接数 | 时间复杂度 | 平台 | 特点 |
|---|---|---|---|---|
| select | 1024 | O(n) | 全平台 | 简单,有数量限制 |
| poll | 无限 | O(n) | Unix | 无数量限制 |
| epoll | 无限 | O(1) | Linux | 高性能,边缘触发 |
| kqueue | 无限 | O(1) | BSD/macOS | 类似 epoll |
10.10 非阻塞 IO
"""非阻塞 Socket"""
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(False)
try:
sock.connect(('example.com', 80))
except BlockingIOError:
pass # 连接正在进行中
# 使用 select 等待连接完成
import select
_, writable, _ = select.select([], [sock], [], 5)
if writable:
# 连接已建立
sock.sendall(b"GET / HTTP/1.0\r\n\r\n")
10.11 注意事项
⚠️ SO_REUSEADDR:服务器必须设置,否则重启时可能绑定失败
⚠️ 阻塞 vs 非阻塞:阻塞模式简单但不支持并发,需要多线程或多路复用
⚠️ epoll 事件处理:边缘触发必须一次性读完所有数据,否则可能丢失事件
⚠️ Socket 关闭顺序:先 shutdown 再 close,确保数据发送完毕
10.12 扩展阅读
下一章:11 - TCP 编程实战 - 客户端/服务器、并发模型、粘包处理