OpenResty 高性能网关开发教程 / 第 06 章 - 限流与流控
第 06 章 - 限流与流控
6.1 为什么需要限流?
| 场景 | 不限流的后果 | 限流策略 |
|---|---|---|
| 突发流量 | 后端服务崩溃 | 令牌桶允许突发 |
| 恶意攻击 | DDoS/CC 攻击 | IP 级别限流 |
| API 滥用 | 资源耗尽 | 用户级别限流 |
| 依赖故障 | 级联失败 | 熔断 + 限流 |
6.2 限流算法对比
| 算法 | 特点 | 突发处理 | 实现复杂度 | 精确度 |
|---|---|---|---|---|
| 固定窗口 | 最简单 | 差 | 低 | 低 |
| 滑动窗口 | 平滑 | 中 | 中 | 高 |
| 漏桶 | 恒定速率 | 无 | 低 | 高 |
| 令牌桶 | 允许突发 | 好 | 中 | 高 |
6.3 固定窗口限流
固定窗口将时间划分为固定大小的窗口,每个窗口内限制请求数量。
-- /usr/local/openresty/lua/limiters/fixed_window.lua
local _M = {}
function _M.new(shared_dict_name, max_requests, window_seconds)
local shared = ngx.shared[shared_dict_name]
if not shared then
return nil, "shared dict not found: " .. shared_dict_name
end
return {
shared = shared,
max_requests = max_requests,
window = window_seconds,
}
end
function _M:incoming(key, commit)
local now = ngx.time()
local window_key = key .. ":" .. math.floor(now / self.window)
local count, err = self.shared:get(window_key)
if not count then
count = 0
end
if count >= self.max_requests then
return false, "rejected"
end
if commit then
local new_count, err = self.shared:incr(window_key, 1, 0, self.window)
if not new_count then
return false, err
end
end
return true, "ok"
end
return _M
固定窗口的问题
窗口边界问题:用户在窗口边界发送请求,可能在短时间内发送 2 倍限额
窗口1 (0-60s) 窗口2 (60-120s)
|████████████|████████████|
↑ 58s: 发送 100 个请求
↑ 62s: 发送 100 个请求
2秒内发送了 200 个请求!
6.4 滑动窗口限流
滑动窗口通过记录每个请求的时间戳来解决固定窗口的边界问题。
-- /usr/local/openresty/lua/limiters/sliding_window.lua
local _M = {}
function _M.new(shared_dict_name, max_requests, window_seconds)
return {
shared = ngx.shared[shared_dict_name],
max_requests = max_requests,
window = window_seconds,
}
end
-- 基于 Redis 的滑动窗口(分布式场景)
function _M:incoming_redis(key, commit)
local red = require "resty.redis"
local redis = red:new()
redis:set_timeout(1000)
local ok, err = redis:connect("127.0.0.1", 6379)
if not ok then
return nil, "redis connect failed: " .. err
end
local now = ngx.now() * 1000 -- 毫秒
local window_start = now - (self.window * 1000)
-- 使用 Redis 有序集合实现滑动窗口
local redis_key = "rate_limit:" .. key
-- Lua 脚本保证原子性
local script = [[
local key = KEYS[1]
local now = tonumber(ARGV[1])
local window_start = tonumber(ARGV[2])
local max_requests = tonumber(ARGV[3])
local window_ms = tonumber(ARGV[4])
-- 移除过期的请求记录
redis.call('ZREMRANGEBYSCORE', key, '-inf', window_start)
-- 获取当前窗口内的请求数
local count = redis.call('ZCARD', key)
if count < max_requests then
-- 允许请求,记录时间戳
redis.call('ZADD', key, now, now .. ':' .. math.random(1000000))
redis.call('EXPIRE', key, math.ceil(window_ms / 1000))
return {1, max_requests - count - 1}
else
-- 拒绝请求
return {0, 0}
end
]]
local res, err = redis:eval(script, 1, redis_key, now, window_start, self.max_requests, self.window * 1000)
-- 放回连接池
redis:set_keepalive(10000, 100)
if not res then
return nil, "redis eval failed: " .. err
end
if res[1] == 1 then
return true, "ok", res[2]
else
return false, "rejected", 0
end
end
return _M
6.5 漏桶算法(Leaky Bucket)
漏桶以恒定速率处理请求,多余的请求被排队或丢弃。
-- /usr/local/openresty/lua/limiters/leaky_bucket.lua
local _M = {}
function _M.new(shared_dict_name, rate, capacity)
return {
shared = ngx.shared[shared_dict_name],
rate = rate, -- 每秒处理请求数
capacity = capacity, -- 桶容量
}
end
function _M:incoming(key, commit)
local now = ngx.now() -- 高精度时间
-- 获取桶状态
local bucket_key = key .. ":bucket"
local state = self.shared:get(bucket_key)
local water_level -- 当前水位
local last_leak_time -- 上次漏水时间
if state then
water_level, last_leak_time = state:match("([^,]+),([^,]+)")
water_level = tonumber(water_level)
last_leak_time = tonumber(last_leak_time)
else
water_level = 0
last_leak_time = now
end
-- 计算漏水量
local elapsed = now - last_leak_time
local leaked = elapsed * self.rate
water_level = math.max(0, water_level - leaked)
-- 判断是否溢出
if water_level >= self.capacity then
return false, "rejected", 0
end
-- 添加水
if commit then
water_level = water_level + 1
self.shared:set(bucket_key, water_level .. "," .. now, 60)
end
-- 计算建议等待时间
local wait_time = 0
if water_level > self.capacity * 0.8 then
wait_time = (water_level - self.capacity * 0.8) / self.rate
end
return true, "ok", wait_time
end
return _M
漏桶特性
请求进入 漏桶 恒定速率输出
─→ │ ■■■■■ │ ─→ ■ → 处理
│ ■■■■ │ ■ → 处理
│ ■■■ │ ■ → 处理
└─────────┘
多余请求被丢弃或排队
6.6 令牌桶算法(Token Bucket)
令牌桶以恒定速率添加令牌,请求需要消耗令牌。允许一定程度的突发。
-- /usr/local/openresty/lua/limiters/token_bucket.lua
local _M = {}
function _M.new(shared_dict_name, rate, capacity)
return {
shared = ngx.shared[shared_dict_name],
rate = rate, -- 每秒产生的令牌数
capacity = capacity, -- 桶容量
}
end
function _M:incoming(key, commit, tokens)
tokens = tokens or 1
local now = ngx.now()
-- 获取桶状态
local bucket_key = key .. ":token_bucket"
local data = self.shared:get(bucket_key)
local available_tokens -- 可用令牌数
local last_refill_time -- 上次补充时间
if data then
available_tokens, last_refill_time = data:match("([^,]+),([^,]+)")
available_tokens = tonumber(available_tokens)
last_refill_time = tonumber(last_refill_time)
else
available_tokens = self.capacity -- 初始满桶
last_refill_time = now
end
-- 补充令牌
local elapsed = now - last_refill_time
local new_tokens = elapsed * self.rate
available_tokens = math.min(self.capacity, available_tokens + new_tokens)
-- 检查令牌是否足够
if available_tokens < tokens then
-- 计算需要等待的时间
local wait_time = (tokens - available_tokens) / self.rate
return false, "rejected", wait_time
end
-- 消耗令牌
if commit then
available_tokens = available_tokens - tokens
self.shared:set(bucket_key, available_tokens .. "," .. now, 120)
end
return true, "ok", 0
end
return _M
令牌桶 vs 漏桶
令牌桶:
- 允许突发流量(桶中有足够的令牌时)
- 长期平均速率 = 令牌生成速率
- 适合:API 限流、用户配额
漏桶:
- 恒定速率输出,不允许突发
- 输出速率固定
- 适合:流量整形、消息队列
6.7 基于 Redis 的分布式限流
在多节点部署场景下,需要使用 Redis 实现分布式限流。
-- /usr/local/openresty/lua/limiters/distributed.lua
local _M = {}
local cjson = require "cjson"
-- Redis 连接池
local function get_redis()
local redis = require "resty.redis"
local red = redis:new()
red:set_timeout(1000)
local ok, err = red:connect("127.0.0.1", 6379)
if not ok then
return nil, err
end
return red
end
-- 滑动窗口限流(Redis 有序集合实现)
local SLIDING_WINDOW_SCRIPT = [[
local key = KEYS[1]
local now = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local max_requests = tonumber(ARGV[3])
local window_start = now - window
-- 移除过期记录
redis.call('ZREMRANGEBYSCORE', key, '-inf', window_start)
-- 获取当前计数
local count = redis.call('ZCARD', key)
if count < max_requests then
-- 添加新记录
redis.call('ZADD', key, now, now .. ':' .. math.random(1000000))
redis.call('EXPIRE', key, window)
return {1, max_requests - count - 1, window_start}
else
-- 获取最早的记录,计算重置时间
local oldest = redis.call('ZRANGE', key, 0, 0, 'WITHSCORES')
local retry_after = 0
if #oldest >= 2 then
retry_after = math.ceil((tonumber(oldest[2]) + window - now) / 1000)
end
return {0, 0, retry_after}
end
]]
-- 令牌桶限流(Redis 实现)
local TOKEN_BUCKET_SCRIPT = [[
local key = KEYS[1]
local now = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local capacity = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
local data = redis.call('HMGET', key, 'tokens', 'last_time')
local tokens = tonumber(data[1]) or capacity
local last_time = tonumber(data[2]) or now
-- 补充令牌
local elapsed = now - last_time
tokens = math.min(capacity, tokens + (elapsed * rate))
if tokens >= requested then
tokens = tokens - requested
redis.call('HMSET', key, 'tokens', tokens, 'last_time', now)
redis.call('EXPIRE', key, math.ceil(capacity / rate) + 1)
return {1, tokens}
else
local wait_time = (requested - tokens) / rate
return {0, wait_time}
end
]]
function _M.new(config)
return {
max_requests = config.max_requests or 100,
window = config.window or 60,
algorithm = config.algorithm or "sliding_window",
key_prefix = config.key_prefix or "rl:",
}
end
function _M:incoming(key, commit)
local red, err = get_redis()
if not red then
ngx.log(ngx.ERR, "Redis connect failed: ", err)
-- 降级:放行请求
return true, "degraded"
end
local redis_key = self.key_prefix .. key
local now = ngx.now() * 1000 -- 毫秒
local res, err
if self.algorithm == "sliding_window" then
res, err = red:eval(SLIDING_WINDOW_SCRIPT, 1, redis_key,
now, self.window * 1000, self.max_requests)
elseif self.algorithm == "token_bucket" then
res, err = red:eval(TOKEN_BUCKET_SCRIPT, 1, redis_key,
now, self.max_requests / self.window, self.max_requests, 1)
end
red:set_keepalive(10000, 100)
if not res then
ngx.log(ngx.ERR, "Redis eval failed: ", err)
return true, "degraded"
end
if res[1] == 1 then
return true, "ok", res[2]
else
return false, "rejected", res[2]
end
end
return _M
6.8 多维度限流
实际场景中,通常需要对不同维度组合限流:
-- /usr/local/openresty/lua/rate_limiter.lua
local _M = {}
-- 限流规则配置
local rules = {
-- IP 级别限流
{
name = "ip_limit",
key_func = function() return ngx.var.remote_addr end,
max_requests = 1000,
window = 60,
response = {
status = 429,
body = '{"error":"Too many requests from this IP"}',
},
},
-- API 级别限流
{
name = "api_limit",
key_func = function() return ngx.var.uri end,
max_requests = 5000,
window = 60,
response = {
status = 429,
body = '{"error":"API rate limit exceeded"}',
},
},
-- 用户级别限流
{
name = "user_limit",
key_func = function()
return ngx.var.http_x_user_id or ngx.var.remote_addr
end,
max_requests = 100,
window = 60,
response = {
status = 429,
body = '{"error":"User rate limit exceeded"}',
},
},
-- 端点级别限流(不同 API 不同限额)
{
name = "endpoint_limit",
key_func = function()
local method = ngx.req.get_method()
local uri = ngx.var.uri
-- POST/PUT/DELETE 限额更严格
if method == "POST" or method == "PUT" or method == "DELETE" then
return method .. ":" .. uri
end
return nil -- GET 不限流
end,
max_requests = 50,
window = 60,
response = {
status = 429,
body = '{"error":"Write rate limit exceeded"}',
},
},
}
-- 限流器实例(懒加载)
local limiters = {}
local function get_limiter(rule)
if limiters[rule.name] then
return limiters[rule.name]
end
local limiter_mod = require "limiters.token_bucket"
local limiter, err = limiter_mod.new("rate_limit", rule.max_requests, rule.window)
if limiter then
limiters[rule.name] = limiter
end
return limiter
end
function _M.check()
for _, rule in ipairs(rules) do
local key = rule.key_func()
if key then
local limiter = get_limiter(rule)
if limiter then
local ok, err, extra = limiter:incoming(key, true)
if not ok then
-- 设置限流响应头
ngx.header["X-RateLimit-Limit"] = rule.max_requests
ngx.header["X-RateLimit-Remaining"] = 0
ngx.header["Retry-After"] = extra or 1
ngx.status = rule.response.status
ngx.say(rule.response.body)
return ngx.exit(rule.response.status)
end
end
end
end
end
return _M
nginx 配置
lua_shared_dict rate_limit 50m;
server {
listen 8080;
location /api/ {
access_by_lua_block {
local rate_limiter = require "rate_limiter"
rate_limiter.check()
}
proxy_pass http://backend;
}
}
6.9 自适应限流
根据后端服务的响应时间和错误率动态调整限流阈值。
-- /usr/local/openresty/lua/adaptive_limiter.lua
local _M = {}
-- 性能指标收集
local function get_metrics()
local shared = ngx.shared.rate_limit
local total = shared:get("req_total") or 0
local errors = shared:get("req_errors") or 0
local total_latency = shared:get("latency_total") or 0
local error_rate = total > 0 and (errors / total) or 0
local avg_latency = total > 0 and (total_latency / total) or 0
return {
total = total,
error_rate = error_rate,
avg_latency = avg_latency,
}
end
-- 自适应限流决策
function _M.get_limit(base_limit)
local metrics = get_metrics()
local limit = base_limit
-- 根据错误率调整
if metrics.error_rate > 0.1 then -- 错误率 > 10%
limit = limit * 0.5 -- 降低 50%
elseif metrics.error_rate > 0.05 then -- 错误率 > 5%
limit = limit * 0.75 -- 降低 25%
end
-- 根据延迟调整
if metrics.avg_latency > 2000 then -- 平均延迟 > 2s
limit = limit * 0.5
elseif metrics.avg_latency > 1000 then -- 平均延迟 > 1s
limit = limit * 0.75
end
-- 设置下限
limit = math.max(limit, base_limit * 0.1)
return math.floor(limit)
end
-- 记录请求指标
function _M.record(status, latency)
local shared = ngx.shared.rate_limit
shared:incr("req_total", 1, 0, 60)
if status >= 500 then
shared:incr("req_errors", 1, 0, 60)
end
shared:incr("latency_total", latency, 0, 60)
end
return _M
6.10 限流响应最佳实践
-- 限流被拒绝时的标准响应
local function rate_limit_response(rule, retry_after)
ngx.header["Content-Type"] = "application/json"
ngx.header["X-RateLimit-Limit"] = rule.max_requests
ngx.header["X-RateLimit-Remaining"] = 0
ngx.header["X-RateLimit-Reset"] = ngx.time() + (retry_after or 60)
ngx.header["Retry-After"] = retry_after or 60
ngx.status = 429
ngx.say(cjson.encode({
error = "Too Many Requests",
message = "Rate limit exceeded. Please retry after " .. (retry_after or 60) .. " seconds.",
retry_after = retry_after or 60,
}))
end
标准响应头
| 响应头 | 说明 | 示例 |
|---|---|---|
X-RateLimit-Limit | 窗口内最大请求数 | 100 |
X-RateLimit-Remaining | 剩余请求数 | 42 |
X-RateLimit-Reset | 窗口重置时间戳 | 1704067260 |
Retry-After | 建议等待秒数 | 30 |
6.11 注意事项
Redis 故障降级:当 Redis 不可用时,限流组件应降级为放行或本地限流,避免因限流组件故障导致全部请求被拒绝。
精度与性能:Redis 限流会增加 1-5ms 延迟。对于高 QPS 场景,可以先用本地限流(
ngx.shared.DICT)做第一层过滤,再用 Redis 做分布式限流。
时钟同步:分布式限流依赖时间一致性,确保所有节点的时钟偏差在 1 秒以内(使用 NTP)。
上一章:← 第 05 章 - 路由与动态路由 下一章:第 07 章 - 认证与鉴权 →