OpenAI API 接口对接完全教程 / 04 - 流式响应处理
第 04 章 · 流式响应处理 (Streaming)
流式响应让 AI 回复像"打字机"一样逐字输出,极大提升用户体验。本章详解 SSE 协议、前后端实现和错误处理。
4.1 为什么需要流式响应
对比:普通 vs 流式
| 特性 | 普通请求 | 流式请求 |
|---|---|---|
| 用户体验 | 等待 2-10s 后一次性显示 | 逐字输出,即时反馈 |
| 首字延迟 | 与总响应时间相同 | 极低(通常 < 1s) |
| 适用场景 | 后台任务、批量处理 | 对话、实时交互 |
| 实现复杂度 | 简单 | 中等 |
| 错误处理 | 简单 | 需处理中断 |
4.2 Python 流式调用
基础流式输出
from openai import OpenAI
client = OpenAI()
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "user", "content": "写一首关于编程的五言绝句"}
],
stream=True, # 开启流式
)
# 逐块接收
for chunk in stream:
content = chunk.choices[0].delta.content
if content is not None:
print(content, end="", flush=True)
print() # 换行
流式 Chunk 结构
Chunk 1: {"choices": [{"delta": {"role": "assistant"}, "index": 0}]}
Chunk 2: {"choices": [{"delta": {"content": "代"}, "index": 0}]}
Chunk 3: {"choices": [{"delta": {"content": "码"}, "index": 0}]}
Chunk 4: {"choices": [{"delta": {"content": "如"}, "index": 0}]}
...
Chunk N: {"choices": [{"delta": {}, "finish_reason": "stop"}]}
注意:第一个 chunk 的
delta.content通常为None,只包含role信息。
收集完整响应
def stream_chat(messages: list, model: str = "gpt-4o-mini") -> tuple[str, dict]:
"""流式获取完整响应并返回"""
stream = client.chat.completions.create(
model=model,
messages=messages,
stream=True,
stream_options={"include_usage": True}, # 包含 token 统计
)
full_response = []
usage_info = {}
for chunk in stream:
# 提取内容
if chunk.choices and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
full_response.append(content)
print(content, end="", flush=True)
# 提取用量统计(最后一个 chunk)
if chunk.usage:
usage_info = {
"prompt_tokens": chunk.usage.prompt_tokens,
"completion_tokens": chunk.usage.completion_tokens,
"total_tokens": chunk.usage.total_tokens,
}
print()
return "".join(full_response), usage_info
# 使用
response, usage = stream_chat(
[{"role": "user", "content": "解释什么是微服务架构"}]
)
print(f"Token 用量: {usage}")
4.3 流式多轮对话
from openai import OpenAI
client = OpenAI()
def stream_multi_turn():
messages = [
{"role": "system", "content": "你是一个友好的AI助手。"}
]
while True:
user_input = input("\n你: ")
if user_input.lower() in ["exit", "quit"]:
break
messages.append({"role": "user", "content": user_input})
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
stream=True,
)
print("AI: ", end="")
full_response = []
for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
print(content, end="", flush=True)
full_response.append(content)
print()
# 将 AI 回复加入历史
messages.append({
"role": "assistant",
"content": "".join(full_response)
})
stream_multi_turn()
4.4 SSE (Server-Sent Events) 协议
流式响应基于 SSE 协议,每条消息格式:
data: {"id":"chatcmpl-abc","choices":[{"delta":{"content":"你"},"index":0}]}\n\n
data: {"id":"chatcmpl-abc","choices":[{"delta":{"content":"好"},"index":0}]}\n\n
data: [DONE]\n\n
关键特征
- 每条消息以
data:前缀 - 消息之间以
\n\n分隔 - 最后一条消息为
data: [DONE] - Content-Type 为
text/event-stream
4.5 Node.js 流式处理
// streaming.mjs
import OpenAI from 'openai';
const client = new OpenAI();
async function streamChat() {
const stream = await client.chat.completions.create({
model: 'gpt-4o-mini',
messages: [
{ role: 'user', content: '用三句话解释人工智能' }
],
stream: true,
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
process.stdout.write(content);
}
}
console.log();
}
streamChat();
4.6 FastAPI 后端流式接口
# server.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI
from pydantic import BaseModel
import json
app = FastAPI()
client = OpenAI()
class ChatRequest(BaseModel):
message: str
model: str = "gpt-4o-mini"
@app.post("/api/chat/stream")
async def chat_stream(request: ChatRequest):
"""SSE 流式聊天接口"""
async def generate():
stream = client.chat.completions.create(
model=request.model,
messages=[{"role": "user", "content": request.message}],
stream=True,
)
for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
# SSE 格式
yield f"data: {json.dumps({'content': content})}\n\n"
# 结束标记
yield f"data: {json.dumps({'done': True})}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Nginx 禁用缓冲
}
)
4.7 前端流式处理
JavaScript (Fetch API)
async function streamChat(message) {
const response = await fetch('/api/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// 解析 SSE 消息
const lines = buffer.split('\n\n');
buffer = lines.pop(); // 保留未完成的行
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = JSON.parse(line.slice(6));
if (data.done) {
console.log('流式输出完成');
return;
}
// 追加到页面
document.getElementById('output').textContent += data.content;
}
}
}
}
React 组件示例
function StreamChat() {
const [output, setOutput] = useState('');
const [loading, setLoading] = useState(false);
const handleSend = async (message) => {
setOutput('');
setLoading(true);
try {
const response = await fetch('/api/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value, { stream: true });
const lines = text.split('\n\n').filter(l => l.startsWith('data: '));
for (const line of lines) {
const data = JSON.parse(line.slice(6));
if (!data.done) {
setOutput(prev => prev + data.content);
}
}
}
} finally {
setLoading(false);
}
};
return (
<div>
<button onClick={() => handleSend('你好')} disabled={loading}>
发送
</button>
<div>{output}</div>
</div>
);
}
4.8 流式 Function Calling
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "北京今天天气怎么样?"}],
tools=[{
"type": "function",
"function": {
"name": "get_weather",
"description": "获取天气",
"parameters": {
"type": "object",
"properties": {
"city": {"type": "string", "description": "城市名"}
},
"required": ["city"]
}
}
}],
stream=True,
)
# 收集工具调用参数
tool_calls = {}
for chunk in stream:
delta = chunk.choices[0].delta
if delta.tool_calls:
for tc in delta.tool_calls:
idx = tc.index
if idx not in tool_calls:
tool_calls[idx] = {
"id": tc.id or "",
"function": {"name": "", "arguments": ""}
}
if tc.function:
if tc.function.name:
tool_calls[idx]["function"]["name"] = tc.function.name
if tc.function.arguments:
tool_calls[idx]["function"]["arguments"] += tc.function.arguments
import json
for idx, tc in tool_calls.items():
args = json.loads(tc["function"]["arguments"])
print(f"调用函数: {tc['function']['name']}, 参数: {args}")
4.9 错误处理
流式中断处理
from openai import OpenAI, APIError, APIConnectionError, RateLimitError
client = OpenAI()
def safe_stream_chat(messages: list, model: str = "gpt-4o-mini") -> str:
"""带错误处理的流式调用"""
collected = []
try:
stream = client.chat.completions.create(
model=model,
messages=messages,
stream=True,
timeout=60.0, # 流式请求建议较长超时
)
for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
collected.append(content)
print(content, end="", flush=True)
# 检查是否被截断
if chunk.choices and chunk.choices[0].finish_reason == "length":
print("\n[警告: 输出被 max_tokens 截断]")
except APIConnectionError:
print("\n[错误: 网络连接失败]")
except RateLimitError:
print("\n[错误: 请求频率超限,请稍后重试]")
except APIError as e:
print(f"\n[API 错误: {e}]")
except Exception as e:
print(f"\n[未知错误: {e}]")
print()
return "".join(collected)
流式超时控制
import signal
class StreamTimeout(Exception):
pass
def timeout_handler(signum, frame):
raise StreamTimeout("流式响应超时")
def stream_with_timeout(messages, timeout_seconds=30):
"""带超时的流式调用"""
old_handler = signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(timeout_seconds)
try:
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
stream=True,
)
for chunk in stream:
signal.alarm(timeout_seconds) # 每收到 chunk 重置计时
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
except StreamTimeout:
print("\n[超时: 响应中断]")
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)
4.10 业务场景
| 场景 | 推荐方案 | 原因 |
|---|---|---|
| 聊天机器人 | 流式 | 即时反馈,体验好 |
| 代码生成 | 流式 | 用户可以边看边思考 |
| 文档翻译 | 流式 | 长文本等待感强 |
| 数据分析报告 | 非流式 | 一次性渲染更方便 |
| 批量 API 调用 | 非流式 | 简化处理逻辑 |
| Agent 工具调用 | 流式 | 实时展示工具调用过程 |
4.11 注意事项
- 不要在流式中打断:避免在流式输出过程中关闭连接
- Nginx 代理配置:需添加
proxy_buffering off; - CDN/负载均衡:确保不会缓冲 SSE 消息
- 内存管理:长时间对话注意累积的响应文本
- 并发处理:同一用户的多个流式请求需要区分
4.12 扩展阅读
下一章:05 - 视觉理解 API — 图片输入、多模态对话、OCR 功能。