异步与协程精讲 / 第16章:异步测试 —— 驯服不确定性
第16章:异步测试 —— 驯服不确定性
16.1 异步测试的挑战
异步代码比同步代码更难测试,主要因为:
| 挑战 | 描述 | 影响 |
|---|
| 不确定性 | 执行顺序不可预测 | 测试结果不稳定(flaky tests) |
| 超时 | 测试可能挂起不返回 | CI 卡住 |
| 竞态 | 并发操作的交错 | 难以复现 bug |
| 副作用 | 异步操作的副作用难以验证 | 验证困难 |
| 时序依赖 | 依赖时间流逝 | 测试慢且不可靠 |
16.2 测试策略概览
| 策略 | 目的 | 工具 |
|---|
| 超时控制 | 防止测试挂起 | 各语言的超时机制 |
| Mock/Stub | 隔离外部依赖 | mock 库 |
| 虚拟时间 | 加速时间相关测试 | 时间模拟库 |
| 竞态检测 | 发现数据竞争 | race detector |
| 确定性测试 | 保证结果一致 | 可控调度 |
16.3 超时控制
Go
func TestAsyncOperation(t *testing.T) {
done := make(chan bool)
go func() {
result := asyncOperation()
// 验证结果
if result != expected {
t.Errorf("期望 %v, 得到 %v", expected, result)
}
done <- true
}()
select {
case <-done:
// 测试通过
case <-time.After(5 * time.Second):
t.Fatal("测试超时")
}
}
// 使用 context 控制超时
func TestWithContext(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
result, err := asyncOperationWithContext(ctx)
if err != nil {
t.Fatalf("错误: %v", err)
}
if result != expected {
t.Errorf("期望 %v, 得到 %v", expected, result)
}
}
Python
import pytest
import asyncio
# 方式一:pytest-asyncio
@pytest.mark.asyncio
async def test_fetch_data():
# 自动管理事件循环
result = await fetch_data("https://api.example.com")
assert result["status"] == "ok"
# 方式二:设置超时
@pytest.mark.asyncio
@pytest.mark.timeout(5) # 5 秒超时
async def test_with_timeout():
result = await slow_operation()
assert result is not None
# 方式三:asyncio.wait_for
@pytest.mark.asyncio
async def test_with_wait_for():
result = await asyncio.wait_for(
slow_operation(),
timeout=5.0
)
assert result is not None
JavaScript (Jest)
// Jest 超时控制
test('fetch data', async () => {
const data = await fetchData('/api/test');
expect(data.status).toBe('ok');
}, 10000); // 10 秒超时
// 使用 AbortController
test('fetch with abort', async () => {
const controller = new AbortController();
setTimeout(() => controller.abort(), 5000);
try {
await fetch('/api/slow', { signal: controller.signal });
} catch (err) {
expect(err.name).toBe('AbortError');
}
});
16.4 Mock 与 Stub
异步 Mock 的挑战
# 同步 Mock 很简单
def test_sync():
with mock.patch('module.api_call', return_value={'ok': True}):
result = my_function()
assert result['ok']
# 异步 Mock 需要特殊处理
@pytest.mark.asyncio
async def test_async():
with mock.patch('module.async_api_call',
new_callable=mock.AsyncMock,
return_value={'ok': True}):
result = await my_async_function()
assert result['ok']
Python async Mock
from unittest.mock import AsyncMock, patch
import pytest
# 创建异步 Mock
mock_db = AsyncMock()
mock_db.query.return_value = [{'id': 1, 'name': 'test'}]
@pytest.mark.asyncio
async def test_get_user():
user = await get_user(mock_db, user_id=1)
assert user['name'] == 'test'
mock_db.query.assert_awaited_once_with('SELECT * FROM users WHERE id = 1')
# 模拟异常
mock_db.query.side_effect = ConnectionError("数据库连接失败")
@pytest.mark.asyncio
async def test_get_user_db_error():
with pytest.raises(ConnectionError):
await get_user(mock_db, user_id=1)
Go 接口 Mock
// 定义接口
type UserRepo interface {
GetUser(ctx context.Context, id int) (*User, error)
}
// 实现 Mock
type MockUserRepo struct {
GetUserFunc func(ctx context.Context, id int) (*User, error)
}
func (m *MockUserRepo) GetUser(ctx context.Context, id int) (*User, error) {
return m.GetUserFunc(ctx, id)
}
// 测试
func TestGetUser(t *testing.T) {
mockRepo := &MockUserRepo{
GetUserFunc: func(ctx context.Context, id int) (*User, error) {
return &User{ID: id, Name: "测试用户"}, nil
},
}
svc := NewUserService(mockRepo)
user, err := svc.GetUser(context.Background(), 1)
assert.NoError(t, err)
assert.Equal(t, "测试用户", user.Name)
}
JavaScript Mock (Jest)
// Mock 异步函数
jest.mock('./api');
test('fetch user', async () => {
api.getUser.mockResolvedValue({ id: 1, name: 'test' });
const user = await service.getUser(1);
expect(user.name).toBe('test');
expect(api.getUser).toHaveBeenCalledWith(1);
});
test('fetch user error', async () => {
api.getUser.mockRejectedValue(new Error('网络错误'));
await expect(service.getUser(1)).rejects.toThrow('网络错误');
});
16.5 竞态检测
Go Race Detector
# 使用 -race 标志运行测试
go test -race ./...
# 示例:检测竞态条件
func TestRaceCondition(t *testing.T) {
counter := 0
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter++ // 这里有竞态!
}()
}
wg.Wait()
// -race 会检测到竞态并报错
}
Rust 的安全保证
// Rust 编译器在编译期阻止大多数数据竞争
use std::sync::{Arc, Mutex};
#[tokio::test]
async fn test_concurrent() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..100 {
let counter = Arc::clone(&counter);
handles.push(tokio::spawn(async move {
let mut num = counter.lock().unwrap();
*num += 1;
}));
}
for handle in handles {
handle.await.unwrap();
}
assert_eq!(*counter.lock().unwrap(), 100);
}
16.6 确定性测试
虚拟时间
# Python — 使用 freezegun 模拟时间
from freezegun import freeze_time
import asyncio
@freeze_time("2024-01-01 12:00:00")
@pytest.mark.asyncio
async def test_cache_expiry():
cache = TTLCache(ttl=60)
await cache.set("key", "value")
# 时间前进 30 秒 — 仍然有效
with freeze_time("2024-01-01 12:00:30"):
assert await cache.get("key") == "value"
# 时间前进 61 秒 — 已过期
with freeze_time("2024-01-01 12:01:01"):
assert await cache.get("key") is None
Go — 时间模拟
// 使用 clock 接口模拟时间
type Clock interface {
Now() time.Time
After(d time.Duration) <-chan time.Time
}
type MockClock struct {
now time.Time
}
func (c *MockClock) Now() time.Time {
return c.now
}
func (c *MockClock) After(d time.Duration) <-chan time.Time {
ch := make(chan time.Time, 1)
ch <- c.now.Add(d) // 立即返回
return ch
}
func (c *MockClock) Advance(d time.Duration) {
c.now = c.now.Add(d)
}
// 使用模拟时钟测试
func TestCacheExpiry(t *testing.T) {
clock := &MockClock{now: time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC)}
cache := NewTTLCache(clock, 60*time.Second)
cache.Set("key", "value")
assert.Equal(t, "value", cache.Get("key"))
clock.Advance(61 * time.Second)
assert.Nil(t, cache.Get("key"))
}
16.7 集成测试
容器化依赖
import pytest
import testcontainers.postgres
@pytest.fixture(scope="session")
def postgres():
with testcontainers.postgres.PostgresContainer("postgres:15") as pg:
yield pg
@pytest.mark.asyncio
async def test_user_repo(postgres):
repo = UserRepo(postgres.get_connection_url())
await repo.create_table()
user = await repo.create_user("test", "[email protected]")
assert user.name == "test"
fetched = await repo.get_user(user.id)
assert fetched.email == "[email protected]"
16.8 测试最佳实践
| 实践 | 说明 |
|---|
| 总是设置超时 | 防止测试挂起 |
| 隔离外部依赖 | 使用 Mock 隔离网络、数据库 |
| 使用 Race Detector | Go -race,Rust 编译器检查 |
| 模拟时间 | 时间相关测试用虚拟时间 |
| 避免 sleep | 用信号量/通道代替 time.sleep() |
| 测试边界条件 | 超时、取消、网络错误 |
| 幂等性测试 | 验证重试安全性 |
| 并发压力测试 | 高并发下验证正确性 |
16.9 业务场景:测试支付服务
@pytest.fixture
def payment_service():
mock_gateway = AsyncMock()
mock_gateway.charge.return_value = {"status": "success", "transaction_id": "tx_123"}
return PaymentService(gateway=mock_gateway)
@pytest.mark.asyncio
async def test_payment_success(payment_service):
mock_gateway = payment_service.gateway
result = await payment_service.charge(user_id=1, amount=100)
assert result.status == "success"
mock_gateway.charge.assert_awaited_once_with(user_id=1, amount=100)
@pytest.mark.asyncio
async def test_payment_timeout(payment_service):
payment_service.gateway.charge.side_effect = asyncio.TimeoutError()
with pytest.raises(PaymentTimeoutError):
await payment_service.charge(user_id=1, amount=100)
@pytest.mark.asyncio
async def test_payment_retry(payment_service):
# 前两次失败,第三次成功
payment_service.gateway.charge.side_effect = [
ConnectionError("网络错误"),
ConnectionError("网络错误"),
{"status": "success", "transaction_id": "tx_123"},
]
result = await payment_service.charge_with_retry(user_id=1, amount=100, retries=3)
assert result.status == "success"
assert payment_service.gateway.charge.await_count == 3
16.10 本章小结
| 要点 | 说明 |
|---|
| 超时控制 | 每个异步测试都要有超时 |
| Mock | 异步函数需要 AsyncMock |
| 竞态检测 | Go -race,Rust 编译器 |
| 确定性测试 | 虚拟时间、可控调度 |
| 集成测试 | 容器化依赖 |
| 最佳实践 | 隔离依赖、测试边界条件、幂等性 |
下一章预告:异步服务如何容器化部署?资源限制和性能调优有哪些技巧?
扩展阅读