RTMP 协议精讲 / 06 - 流操作
流操作(Stream Operations)
6.1 流的概念模型
RTMP 中的 流(Stream) 是一个逻辑通道,用于承载音视频数据和相关的控制信息。一个 RTMP 连接(NetConnection)可以包含多个流。
NetConnection (一个 TCP 连接)
├── Stream 0 (默认控制流,用于 NetConnection 级别的命令)
├── Stream 1 (流 "camera_feed")
│ ├── 视频数据 (Type 9)
│ ├── 音频数据 (Type 8)
│ └── 元数据 (Type 18)
├── Stream 2 (流 "screen_share")
│ ├── 视频数据 (Type 9)
│ └── 音频数据 (Type 8)
└── Stream 3 (流 "slides")
└── 视频数据 (Type 9)
流的生命周期
createStream()
┌──────────┐ ────────────────→ ┌──────────┐
│ Uninit │ │ Created │
│ │ │ │
└──────────┘ └──────────┘
│ │
play() │ │ publish()
▼ ▼
┌──────────┐ ┌──────────┐
│ Playing │ │Publishing│
│ │ │ │
└──────────┘ └──────────┘
│ │
deleteStream()
▼
┌──────────┐
│ Closed │
│ │
└──────────┘
6.2 createStream
createStream 创建一个逻辑流,是所有流操作的前提。
请求
def create_create_stream_command(transaction_id: float, stream_name: str = None) -> list:
"""
createStream 命令
注意:createStream 只有一个 Transaction ID 和可选的 Command Object,
没有 stream name 参数。stream name 在 play/publish 中指定。
"""
return [
"createStream", # Command Name
transaction_id, # Transaction ID (1.0, 2.0, ...)
None, # Command Object (null)
]
响应
def create_create_stream_response(
transaction_id: float,
stream_id: float = 1.0
) -> list:
"""createStream 成功响应"""
return [
"_result",
transaction_id,
None,
stream_id, # 分配的流 ID
]
完整交互
Client Server
│ │
│── createStream (txid=4) ──────────────→│ AMF Command
│ │
│←── _result (txid=4, stream_id=1) ─────│ AMF Response
│ │
│ 现在可以使用 stream_id=1 进行 │
│ play/publish 操作 │
6.3 publish(推流)
publish 命令开始向服务端发送音视频数据。
命令格式
class PublishType:
LIVE = "live" # 实时直播,不录制
RECORD = "record" # 录制到文件
APPEND = "append" # 追加到已有文件
def create_publish_command(
stream_name: str,
publish_type: str = PublishType.LIVE,
transaction_id: float = 0
) -> list:
"""
publish 命令
参数:
stream_name: 流名称 (如 "camera_feed")
publish_type: "live", "record", "append"
transaction_id: 通常为 0
"""
return [
"publish",
transaction_id,
None,
stream_name,
publish_type,
]
服务端响应
def create_publish_start(stream_id: int) -> list:
"""onStatus: Publish.Start"""
return [
"onStatus",
0,
None,
{
"level": "status",
"code": "NetStream.Publish.Start",
"description": "Publishing stream.",
}
]
def create_publish_bad_name(stream_id: int) -> list:
"""onStatus: Publish.BadName"""
return [
"onStatus",
0,
None,
{
"level": "error",
"code": "NetStream.Publish.BadName",
"description": "Stream name already in use.",
}
]
推流完整时序
Client Server
│ │
│════ connect 成功 ═══════════════════════│
│ │
│── createStream (txid=4) ──────────────→│
│←── _result (stream_id=1) ─────────────│
│ │
│── publish("mystream", "live") ────────→│
│ │
│←── onStatus (Publish.Start) ──────────│
│ │
│── @setDataFrame (onMetaData) ─────────→│ 含比特率、分辨率等
│ │
│── AAC Sequence Header ────────────────→│ 音频解码器配置
│── AVC Sequence Header ────────────────→│ 视频解码器配置
│ │
│── Video (Keyframe) ───────────────────→│ 第一个关键帧
│── Audio ──────────────────────────────→│
│── Video (Inter) ──────────────────────→│
│── Audio ──────────────────────────────→│
│── Video (Inter) ──────────────────────→│
│── Audio ──────────────────────────────→│
│ ... 持续发送 ... │
onMetaData 消息
推流前通常先发送一个 onMetaData 数据消息,包含流的媒体信息:
def create_on_meta_data(
width: int = 1920,
height: int = 1080,
video_bitrate: int = 2000,
audio_bitrate: int = 128,
fps: float = 30.0,
audio_sample_rate: int = 44100,
audio_channels: int = 2,
video_codec: str = "avc1",
audio_codec: str = "mp4a",
duration: float = 0,
) -> list:
"""
创建 onMetaData 数据消息
注意:这是 Type 18 (AMF0 Data),不是 Type 20 (Command)
"""
return [
"@setDataFrame", # AMF0 Data 标识
"onMetaData", # 数据类型
{
"width": width,
"height": height,
"videodatarate": video_bitrate,
"framerate": fps,
"videocodecid": 7, # AVC
"audiodatarate": audio_bitrate,
"audiosamplerate": audio_sample_rate,
"audiosamplesize": 16,
"stereo": audio_channels == 2,
"audiocodecid": 10, # AAC
"duration": duration,
"encoder": "obs-rtmp/1.0",
}
]
6.4 play(播放)
play 命令请求服务端发送指定流的音视频数据。
命令格式
def create_play_command(
stream_name: str,
start: float = -2,
duration: float = -1,
reset: bool = True,
transaction_id: float = 0
) -> list:
"""
play 命令
参数:
stream_name: 流名称 (如 "mystream")
start: 开始时间
-2: 查找直播流,没有则等待
-1: 从头播放
>=0: 从指定秒数开始
duration: 播放时长(秒)
-1: 播放到结束
-2: 播放到文件末尾
>0: 指定时长
reset: 是否重置流
true: 清除之前的 play 命令
false: 不清除
"""
return [
"play",
transaction_id,
None,
stream_name,
start,
duration,
reset,
]
服务端响应
def create_play_reset() -> list:
"""onStatus: Play.Reset"""
return [
"onStatus", 0, None,
{"level": "status", "code": "NetStream.Play.Reset",
"description": "Resetting stream."}
]
def create_play_start() -> list:
"""onStatus: Play.Start"""
return [
"onStatus", 0, None,
{"level": "status", "code": "NetStream.Play.Start",
"description": "Started playing."}
]
def create_play_stream_not_found() -> list:
"""onStatus: Play.StreamNotFound"""
return [
"onStatus", 0, None,
{"level": "error", "code": "NetStream.Play.StreamNotFound",
"description": "Stream not found."}
]
def create_play_stop() -> list:
"""onStatus: Play.Stop"""
return [
"onStatus", 0, None,
{"level": "status", "code": "NetStream.Play.Stop",
"description": "Stopped playing."}
]
播放完整时序
Client Server
│ │
│── createStream (txid=4) ──────────────→│
│←── _result (stream_id=1) ─────────────│
│ │
│── play("mystream") ───────────────────→│ stream_id=1
│ │
│←── Set Chunk Size ────────────────────│
│←── User Control (Stream Begin, 1) ────│
│←── onStatus (Play.Reset) ─────────────│
│←── onStatus (Play.Start) ─────────────│
│ │
│←── RtmpSampleAccess ─────────────────│ Type 18
│←── onMetaData ────────────────────────│ Type 18
│ │
│←── AAC Sequence Header ───────────────│ Type 8
│←── AVC Sequence Header ───────────────│ Type 9
│ │
│←── Video (Keyframe) ──────────────────│
│←── Audio ─────────────────────────────│
│←── Video (Inter) ─────────────────────│
│←── Audio ─────────────────────────────│
│ ... 持续接收 ... │
6.5 pause / pauseResponse
暂停/恢复流播放。
命令格式
def create_pause_command(pause: bool, position: float) -> list:
"""
pause 命令
参数:
pause: true=暂停, false=恢复
position: 暂停时的时间戳(毫秒)
"""
return [
"pause",
0, # Transaction ID
None, # Command Object
pause,
position,
]
def create_pause_response() -> list:
"""onStatus: Pause.Notify / Unpause.Notify"""
return [
"onStatus", 0, None,
{"level": "status", "code": "NetStream.Pause.Notify",
"description": "Stream paused."}
]
def create_unpause_response() -> list:
return [
"onStatus", 0, None,
{"level": "status", "code": "NetStream.Unpause.Notify",
"description": "Stream unpaused."}
]
暂停/恢复流程
Client Server
│ │
│ 正在接收音视频数据 │
│←── Video ─────────────────────────────│
│←── Audio ─────────────────────────────│
│ │
│── pause(true, 5000) ─────────────────→│ 暂停在 5 秒处
│ │
│←── onStatus (Pause.Notify) ───────────│
│ 服务端停止发送音视频数据 │
│ │
│ ... 用户点击继续 ... │
│ │
│── pause(false, 5000) ────────────────→│ 从 5 秒处恢复
│ │
│←── onStatus (Unpause.Notify) ─────────│
│←── Video ─────────────────────────────│
│←── Audio ─────────────────────────────│
6.6 seek
跳转到流中的指定位置。
命令格式
def create_seek_command(position_ms: float) -> list:
"""
seek 命令
参数:
position_ms: 目标位置(毫秒)
"""
return [
"seek",
0,
None,
position_ms,
]
def create_seek_notify(position_ms: float) -> list:
"""onStatus: Seek.Notify"""
return [
"onStatus", 0, None,
{
"level": "status",
"code": "NetStream.Seek.Notify",
"description": f"Seeking to {position_ms}ms.",
}
]
Seek 流程
Client Server
│ │
│ 正在播放 (时间戳 5000ms) │
│ │
│── seek(10000) ────────────────────────→│ 跳到 10 秒
│ │
│←── onStatus (Seek.Notify) ────────────│
│ │
│←── Video (Keyframe @10s) ─────────────│ 从最近的关键帧开始
│←── Audio ─────────────────────────────│
│←── Video (Inter) ─────────────────────│
│ ... 从 10 秒处继续播放 ... │
6.7 closeStream / deleteStream
关闭流和删除流。
def create_close_stream() -> list:
"""closeStream 命令(客户端发起)"""
return [
"closeStream",
0,
None,
]
def create_delete_stream(stream_id: float) -> list:
"""deleteStream 命令(客户端发起)"""
return [
"deleteStream",
0,
None,
stream_id,
]
流关闭流程
Client Server
│ │
│── closeStream ────────────────────────→│ 通知服务端停止
│ │
│── deleteStream (stream_id=1) ─────────→│ 删除流
│ │
│ 服务端释放流资源 │
6.8 receiveAudio / receiveVideo
控制是否接收音视频数据(用于选择性接收):
def create_receive_audio(flag: bool) -> list:
"""
receiveAudio 命令
flag=false: 停止接收音频(节省带宽)
flag=true: 恢复接收音频
"""
return [
"receiveAudio",
0,
None,
flag,
]
def create_receive_video(flag: bool) -> list:
"""receiveVideo 命令"""
return [
"receiveVideo",
0,
None,
flag,
]
使用场景:
- 画中画模式:小窗只接收视频,不接收音频
- 纯语音模式:只接收音频,不接收视频以节省带宽
- 低带宽客户端:动态切换音视频接收
6.9 完整的流操作客户端实现
#!/usr/bin/env python3
"""
RTMP Stream Operations Demo
演示 RTMP 流操作的完整流程
"""
import socket
import struct
class RTMPStreamClient:
"""RTMP 流操作客户端(演示框架)"""
def __init__(self, host: str, port: int = 1935):
self.host = host
self.port = port
self.sock = None
self.stream_id = 0
self.transaction_id = 0
self.chunk_size = 128
def connect(self, app: str, tc_url: str):
"""建立连接并执行握手"""
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.host, self.port))
self._handshake()
self._send_connect(app, tc_url)
self._wait_for_result()
def create_stream(self) -> int:
"""创建流"""
self.transaction_id += 1
msg = self._encode_amf_command(
"createStream", self.transaction_id, None
)
self._send_amf_message(20, 3, msg)
result = self._wait_for_result()
self.stream_id = int(result)
return self.stream_id
def publish(self, stream_name: str, publish_type: str = "live"):
"""开始推流"""
msg = self._encode_amf_command(
"publish", 0, None, stream_name, publish_type
)
self._send_amf_message(20, self.stream_id + 6, msg)
def play(self, stream_name: str, start: float = -2, duration: float = -1):
"""开始播放"""
msg = self._encode_amf_command(
"play", 0, None, stream_name, start, duration, True
)
self._send_amf_message(20, self.stream_id + 6, msg)
def pause(self, pause: bool, position_ms: float):
"""暂停/恢复"""
msg = self._encode_amf_command(
"pause", 0, None, pause, position_ms
)
self._send_amf_message(20, self.stream_id + 6, msg)
def seek(self, position_ms: float):
"""跳转"""
msg = self._encode_amf_command(
"seek", 0, None, position_ms
)
self._send_amf_message(20, self.stream_id + 6, msg)
def send_video(self, data: bytes, timestamp: int, is_keyframe: bool = False):
"""发送视频数据"""
frame_type = 0x10 if is_keyframe else 0x20 # Keyframe / Inter
video_body = bytes([frame_type | 0x07]) + data # Codec ID = 7 (AVC)
self._send_media_message(9, self.stream_id + 5, video_body, timestamp)
def send_audio(self, data: bytes, timestamp: int):
"""发送音频数据"""
audio_body = bytes([0xAF, 0x01]) + data # AAC, Raw
self._send_media_message(8, self.stream_id + 4, audio_body, timestamp)
def close_stream(self):
"""关闭流"""
msg = self._encode_amf_command("closeStream", 0, None)
self._send_amf_message(20, self.stream_id + 6, msg)
msg = self._encode_amf_command(
"deleteStream", 0, None, float(self.stream_id)
)
self._send_amf_message(20, 3, msg)
# === 内部方法 ===
def _handshake(self):
"""执行 RTMP 握手"""
import os, time
c0 = struct.pack('B', 3)
ts = int(time.time()) & 0xFFFFFFFF
c1 = struct.pack('>II', ts, 0) + os.urandom(1528)
self.sock.sendall(c0 + c1)
s0s1s2 = self._recv_exact(3073)
s1 = s0s1s2[1:1537]
c2 = s1[:4] + struct.pack('>I', int(time.time()) & 0xFFFFFFFF) + s1[8:]
self.sock.sendall(c2)
def _recv_exact(self, size: int) -> bytes:
"""精确接收指定字节数"""
data = b''
while len(data) < size:
chunk = self.sock.recv(size - len(data))
if not chunk:
raise ConnectionError("Connection closed")
data += chunk
return data
def _send_amf_message(self, msg_type: int, cs_id: int, data: bytes):
"""发送 AMF 消息"""
header = self._encode_chunk_header(0, cs_id, 0, len(data), msg_type, 0)
# 简化:一次性发送(实际应按 chunk_size 拆分)
self.sock.sendall(header + data)
def _send_media_message(self, msg_type: int, cs_id: int, data: bytes, timestamp: int):
"""发送媒体消息"""
header = self._encode_chunk_header(0, cs_id, timestamp, len(data), msg_type, self.stream_id)
# 拆分为多个 chunk
offset = 0
chunks = header + data[:self.chunk_size]
offset = self.chunk_size
while offset < len(data):
chunk_hdr = self._encode_chunk_header(3, cs_id, 0, 0, 0, 0)
chunk_body = data[offset:offset + self.chunk_size]
chunks += chunk_hdr + chunk_body
offset += self.chunk_size
self.sock.sendall(chunks)
def _encode_chunk_header(self, fmt, cs_id, ts, msg_len, msg_type, msg_stream):
"""编码块头"""
hdr = bytes([(fmt << 6) | cs_id]) if cs_id >= 2 and cs_id <= 63 else bytes([(fmt << 6)])
if fmt == 0:
hdr += struct.pack('>I', ts)[1:]
hdr += struct.pack('>I', msg_len)[1:]
hdr += struct.pack('B', msg_type)
hdr += struct.pack('<I', msg_stream)
return hdr
def _encode_amf_command(self, *values):
"""简单 AMF0 编码(演示用)"""
result = b''
for v in values:
if v is None:
result += bytes([0x05])
elif isinstance(v, str):
encoded = v.encode('utf-8')
result += bytes([0x02]) + struct.pack('>H', len(encoded)) + encoded
elif isinstance(v, float):
result += bytes([0x00]) + struct.pack('>d', v)
elif isinstance(v, bool):
result += bytes([0x01, 0x01 if v else 0x00])
return result
def _wait_for_result(self):
"""等待 _result 响应(简化版)"""
# 实际实现需要解析接收到的消息
data = self.sock.recv(4096)
return 1.0 # 简化返回
注意事项
- Stream ID 分配:createStream 返回的 stream_id 用于后续 play/publish 的消息流 ID
- 块流 ID 映射:消息流 ID 和块流 ID 是不同的概念,通常音频使用 cs_id=8,视频使用 cs_id=6
- Sequence Header 顺序:publish 时必须先发 AAC/AVC Sequence Header,再发音视频数据
- 直播 vs 点播:play 的 start 参数为 -2 时表示直播模式,-1 表示从头播放点播
- deleteStream 时机:客户端停止播放/推流后应发送 deleteStream,否则服务端可能不释放资源
扩展阅读
上一章:05 - AMF 编码与命令 下一章:07 - 视频编解码 — 了解 RTMP 中的视频编码与 FLV 封装