Redis 完全指南 / 22 - 实战场景
实战场景
本章将前面学到的所有知识综合运用,实现生产级别的 Redis 实战场景。
22.1 缓存系统
多级缓存架构
请求 → 本地缓存 → Redis 缓存 → 数据库
↓ miss ↓ miss ↓
Redis 数据库 返回结果
↓ hit ↓ hit ↓
返回结果 写回缓存 写回缓存
完整实现(Python)
import redis
import json
import time
import hashlib
from functools import wraps
from typing import Optional, Any
class MultiLevelCache:
"""多级缓存实现"""
def __init__(self, redis_client, local_ttl=30, redis_ttl=300):
self.redis = redis_client
self.local_cache = {}
self.local_ttl = local_ttl
self.redis_ttl = redis_ttl
def get(self, key: str) -> Optional[Any]:
"""获取缓存"""
# 1. 查本地缓存
if key in self.local_cache:
value, timestamp = self.local_cache[key]
if time.time() - timestamp < self.local_ttl:
return value
del self.local_cache[key]
# 2. 查 Redis 缓存
try:
redis_value = self.redis.get(key)
if redis_value:
value = json.loads(redis_value)
# 回写本地缓存
self.local_cache[key] = (value, time.time())
return value
except redis.RedisError:
pass
return None
def set(self, key: str, value: Any, ttl: int = None):
"""设置缓存"""
# 写本地缓存
self.local_cache[key] = (value, time.time())
# 写 Redis 缓存
try:
self.redis.setex(
key,
ttl or self.redis_ttl,
json.dumps(value, ensure_ascii=False)
)
except redis.RedisError:
pass
def delete(self, key: str):
"""删除缓存"""
self.local_cache.pop(key, None)
try:
self.redis.delete(key)
except redis.RedisError:
pass
def cached(cache: MultiLevelCache, prefix: str, ttl: int = 300):
"""缓存装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 生成缓存 Key
cache_key = f"{prefix}:{hashlib.md5(str(args).encode()).hexdigest()}"
# 尝试获取缓存
result = cache.get(cache_key)
if result is not None:
return result
# 调用原函数
result = func(*args, **kwargs)
# 写入缓存
if result is not None:
cache.set(cache_key, result, ttl)
return result
return wrapper
return decorator
# 使用示例
r = redis.Redis(host='localhost', decode_responses=True)
cache = MultiLevelCache(r, local_ttl=30, redis_ttl=300)
@cached(cache, prefix='user', ttl=3600)
def get_user(user_id: int):
"""获取用户信息(带多级缓存)"""
# 模拟数据库查询
time.sleep(0.1)
return {"id": user_id, "name": f"User_{user_id}", "email": f"user{user_id}@example.com"}
# 测试
user = get_user(1001) # 第一次:查数据库
user = get_user(1001) # 第二次:命中本地缓存
22.2 排行榜系统
完整排行榜实现
import redis
import json
import time
from typing import List, Dict, Optional
class Leaderboard:
"""实时排行榜"""
def __init__(self, redis_client, name: str):
self.redis = redis_client
self.key = f"leaderboard:{name}"
self.info_key = f"leaderboard:{name}:info"
def add_score(self, user_id: str, score: float) -> bool:
"""添加/更新分数"""
return bool(self.redis.zadd(self.key, {user_id: score}))
def increment_score(self, user_id: str, delta: float) -> float:
"""增加分数"""
return self.redis.zincrby(self.key, delta, user_id)
def get_score(self, user_id: str) -> Optional[float]:
"""获取分数"""
score = self.redis.zscore(self.key, user_id)
return float(score) if score is not None else None
def get_rank(self, user_id: str) -> Optional[int]:
"""获取排名(从 1 开始,分数最高排名 1)"""
rank = self.redis.zrevrank(self.key, user_id)
return rank + 1 if rank is not None else None
def get_top_n(self, n: int = 10) -> List[Dict]:
"""获取 Top N"""
results = self.redis.zrevrange(self.key, 0, n - 1, withscores=True)
return [
{"rank": i + 1, "user_id": user_id, "score": score}
for i, (user_id, score) in enumerate(results)
]
def get_user_rank_range(self, user_id: str, range_n: int = 5) -> List[Dict]:
"""获取用户附近的排名"""
rank = self.redis.zrevrank(self.key, user_id)
if rank is None:
return []
start = max(0, rank - range_n)
end = rank + range_n
results = self.redis.zrevrange(self.key, start, end, withscores=True)
return [
{"rank": start + i + 1, "user_id": uid, "score": score}
for i, (uid, score) in enumerate(results)
]
def get_total(self) -> int:
"""获取总人数"""
return self.redis.zcard(self.key)
def remove(self, user_id: str) -> bool:
"""移除用户"""
return bool(self.redis.zrem(self.key, user_id))
def get_score_range(self, min_score: float, max_score: float) -> List[Dict]:
"""分数区间查询"""
results = self.redis.zrevrangebyscore(
self.key, max_score, min_score, withscores=True
)
return [
{"user_id": uid, "score": score}
for uid, score in results
]
# 使用示例
r = redis.Redis(host='localhost', decode_responses=True)
lb = Leaderboard(r, "game:daily")
# 添加玩家分数
lb.add_score("player:1001", 1500)
lb.add_score("player:1002", 1200)
lb.add_score("player:1003", 1800)
lb.add_score("player:1004", 950)
lb.add_score("player:1005", 1650)
# 增加分数
lb.increment_score("player:1001", 100)
# 获取排名
print(f"Player 1001 rank: {lb.get_rank('player:1001')}")
# 获取 Top 3
top3 = lb.get_top_n(3)
for entry in top3:
print(f"#{entry['rank']} {entry['user_id']}: {entry['score']}")
# 获取玩家附近的排名
nearby = lb.get_user_rank_range("player:1001", range_n=2)
for entry in nearby:
print(f"#{entry['rank']} {entry['user_id']}: {entry['score']}")
多维度排行榜
class MultiDimensionLeaderboard:
"""多维度排行榜"""
def __init__(self, redis_client, name: str):
self.redis = redis_client
self.name = name
def add_score(self, user_id: str, score: float, dimension: str = "default"):
key = f"leaderboard:{self.name}:{dimension}"
self.redis.zadd(key, {user_id: score})
self.redis.expire(key, 86400 * 7) # 7 天过期
def get_top_n(self, dimension: str = "default", n: int = 10):
key = f"leaderboard:{self.name}:{dimension}"
return self.redis.zrevrange(key, 0, n - 1, withscores=True)
def get_rank(self, user_id: str, dimension: str = "default"):
key = f"leaderboard:{self.name}:{dimension}"
rank = self.redis.zrevrank(key, user_id)
return rank + 1 if rank is not None else None
# 使用
mlb = MultiDimensionLeaderboard(r, "game")
mlb.add_score("player:1", 1000, "daily")
mlb.add_score("player:1", 5000, "weekly")
mlb.add_score("player:1", 20000, "monthly")
22.3 分布式锁
Redlock 算法实现
import redis
import uuid
import time
import threading
class Redlock:
"""Redlock 分布式锁实现"""
UNLOCK_SCRIPT = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
def __init__(self, redis_instances: list, ttl: int = 30000):
"""
初始化 Redlock
:param redis_instances: Redis 实例列表
:param ttl: 锁的过期时间(毫秒)
"""
self.instances = redis_instances
self.ttl = ttl
self.quorum = len(redis_instances) // 2 + 1
self.clock_drift_factor = 0.01
def lock(self, resource: str, ttl: int = None) -> dict:
"""获取锁"""
ttl = ttl or self.ttl
token = str(uuid.uuid4())
retry_count = 3
for _ in range(retry_count):
n = 0
start_time = time.time() * 1000
# 尝试在所有实例上加锁
for instance in self.instances:
try:
if self._acquire_instance(instance, resource, token, ttl):
n += 1
except redis.RedisError:
pass
# 计算耗时和有效时间
elapsed = time.time() * 1000 - start_time
validity = ttl - elapsed - (elapsed * self.clock_drift_factor)
# 检查是否获得多数派锁
if n >= self.quorum and validity > 0:
return {
'resource': resource,
'token': token,
'validity': validity,
}
else:
# 未能获得锁,释放所有实例
for instance in self.instances:
try:
self._release_instance(instance, resource, token)
except redis.RedisError:
pass
time.sleep(0.1)
return None
def unlock(self, lock_info: dict):
"""释放锁"""
if lock_info:
for instance in self.instances:
try:
self._release_instance(
instance,
lock_info['resource'],
lock_info['token']
)
except redis.RedisError:
pass
def _acquire_instance(self, instance, resource, token, ttl):
"""在单个实例上获取锁"""
return instance.set(
f"lock:{resource}",
token,
nx=True,
px=ttl
)
def _release_instance(self, instance, resource, token):
"""在单个实例上释放锁"""
script = instance.register_script(self.UNLOCK_SCRIPT)
script(keys=[f"lock:{resource}"], args=[token])
# 使用示例
instances = [
redis.Redis(host='redis1', port=6379, decode_responses=True),
redis.Redis(host='redis2', port=6379, decode_responses=True),
redis.Redis(host='redis3', port=6379, decode_responses=True),
]
redlock = Redlock(instances, ttl=30000)
# 获取锁
lock = redlock.lock("order:create:1001")
if lock:
try:
print(f"Lock acquired, validity: {lock['validity']}ms")
# 执行业务逻辑
time.sleep(1)
finally:
redlock.unlock(lock)
else:
print("Failed to acquire lock")
22.4 消息队列
基于 Stream 的可靠消息队列
import redis
import json
import uuid
import time
import threading
from typing import Callable, Dict, Any
class StreamQueue:
"""基于 Redis Stream 的消息队列"""
def __init__(self, redis_client, stream_key: str, group_name: str):
self.redis = redis_client
self.stream_key = stream_key
self.group_name = group_name
self._ensure_group()
def _ensure_group(self):
"""确保消费者组存在"""
try:
self.redis.xgroup_create(
self.stream_key, self.group_name, id='0', mkstream=True
)
except redis.exceptions.ResponseError:
pass
def publish(self, data: dict, max_len: int = 100000) -> str:
"""发布消息"""
return self.redis.xadd(
self.stream_key,
{'data': json.dumps(data, ensure_ascii=False)},
maxlen=max_len
)
def consume(self, consumer_name: str, handler: Callable,
count: int = 10, block: int = 5000):
"""消费消息"""
while True:
try:
messages = self.redis.xreadgroup(
self.group_name, consumer_name,
{self.stream_key: '>'},
count=count,
block=block,
)
if not messages:
continue
for stream, entries in messages:
for msg_id, data in entries:
try:
payload = json.loads(data['data'])
handler(payload)
self.redis.xack(
self.stream_key, self.group_name, msg_id
)
except Exception as e:
print(f"Error processing {msg_id}: {e}")
# 可选:推入死信队列
self.redis.xadd(
f"{self.stream_key}:dlq",
{'original_id': msg_id, 'error': str(e), 'data': data['data']},
maxlen=10000
)
self.redis.xack(
self.stream_key, self.group_name, msg_id
)
except redis.RedisError as e:
print(f"Redis error: {e}")
time.sleep(1)
def get_pending_count(self) -> int:
"""获取待确认消息数"""
pending = self.redis.xpending(self.stream_key, self.group_name)
return pending['pending'] if pending else 0
def reclaim_timeout_messages(self, consumer_name: str, min_idle_time: int = 60000):
"""回收超时未确认的消息"""
return self.redis.xclaim(
self.stream_key, self.group_name, consumer_name,
min_idle_time, justid=True
)
# 使用示例
r = redis.Redis(host='localhost', decode_responses=True)
# 创建消息队列
queue = StreamQueue(r, 'order_events', 'order_processors')
# 发布消息
for i in range(5):
msg_id = queue.publish({
'order_id': f'ORD-{1000 + i}',
'action': 'create',
'amount': 99.9 + i,
'user_id': 1001 + i,
})
print(f"Published: {msg_id}")
# 消费消息
def process_order(data: dict):
"""处理订单"""
print(f"Processing order: {data['order_id']}, action: {data['action']}")
# 启动消费者(后台线程)
t = threading.Thread(
target=queue.consume,
args=('consumer-1', process_order),
daemon=True
)
t.start()
22.5 限流系统
多种限流算法实现
import redis
import time
from functools import wraps
class RateLimiter:
"""限流器"""
def __init__(self, redis_client):
self.redis = redis_client
def fixed_window(self, key: str, limit: int, window: int) -> bool:
"""
固定窗口限流
:param key: 限流 Key
:param limit: 限制次数
:param window: 窗口大小(秒)
:return: True 允许, False 限流
"""
current = self.redis.incr(key)
if current == 1:
self.redis.expire(key, window)
return current <= limit
def sliding_window_log(self, key: str, limit: int, window: int) -> bool:
"""
滑动窗口日志限流
"""
now = time.time()
pipe = self.redis.pipeline()
# 移除窗口外的记录
pipe.zremrangebyscore(key, 0, now - window)
# 统计窗口内的请求数
pipe.zcard(key)
# 添加当前请求
pipe.zadd(key, {str(now): now})
# 设置过期时间
pipe.expire(key, window)
results = pipe.execute()
count = results[1]
return count < limit
def sliding_window_counter(self, key: str, limit: int, window: int) -> bool:
"""
滑动窗口计数器限流(近似算法)
"""
now = time.time()
current_window = int(now // window) * window
previous_window = current_window - window
pipe = self.redis.pipeline()
pipe.get(f"{key}:{previous_window}")
pipe.get(f"{key}:{current_window}")
pipe.incr(f"{key}:{current_window}")
pipe.expire(f"{key}:{current_window}", window * 2)
results = pipe.execute()
previous_count = int(results[0] or 0)
current_count = int(results[1] or 0)
# 计算滑动窗口内的请求数(加权平均)
elapsed_ratio = (now - current_window) / window
estimated_count = previous_count * (1 - elapsed_ratio) + current_count
return estimated_count < limit
def token_bucket(self, key: str, rate: float, capacity: int) -> bool:
"""
令牌桶限流
:param key: 限流 Key
:param rate: 令牌生成速率(个/秒)
:param capacity: 桶容量
:return: True 允许, False 限流
"""
now = time.time()
lua_script = """
local key = KEYS[1]
local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
local data = redis.call('HMGET', key, 'tokens', 'last_time')
local tokens = tonumber(data[1]) or capacity
local last_time = tonumber(data[2]) or now
-- 计算新增令牌
local elapsed = now - last_time
local new_tokens = math.min(capacity, tokens + elapsed * rate)
if new_tokens >= requested then
new_tokens = new_tokens - requested
redis.call('HMSET', key, 'tokens', new_tokens, 'last_time', now)
redis.call('EXPIRE', key, 3600)
return 1
else
redis.call('HMSET', key, 'tokens', new_tokens, 'last_time', now)
redis.call('EXPIRE', key, 3600)
return 0
end
"""
result = self.redis.eval(lua_script, 1, key, rate, capacity, now, 1)
return bool(result)
def leaky_bucket(self, key: str, rate: float, capacity: int) -> bool:
"""
漏桶限流
"""
now = time.time()
lua_script = """
local key = KEYS[1]
local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local data = redis.call('HMGET', key, 'water', 'last_time')
local water = tonumber(data[1]) or 0
local last_time = tonumber(data[2]) or now
-- 计算漏出的水量
local elapsed = now - last_time
local leaked = elapsed * rate
water = math.max(0, water - leaked)
if water < capacity then
water = water + 1
redis.call('HMSET', key, 'water', water, 'last_time', now)
redis.call('EXPIRE', key, 3600)
return 1
else
redis.call('HMSET', key, 'water', water, 'last_time', now)
redis.call('EXPIRE', key, 3600)
return 0
end
"""
result = self.redis.eval(lua_script, 1, key, rate, capacity, now)
return bool(result)
# 使用示例
r = redis.Redis(host='localhost', decode_responses=True)
limiter = RateLimiter(r)
# API 限流装饰器
def rate_limit(key_func, limit=100, window=60):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
key = key_func(*args, **kwargs)
if not limiter.sliding_window_log(key, limit, window):
raise Exception("Rate limit exceeded")
return func(*args, **kwargs)
return wrapper
return decorator
# 使用
@rate_limit(lambda user_id, **kw: f"rate:user:{user_id}", limit=100, window=60)
def get_user_data(user_id, **kwargs):
return {"user_id": user_id, "data": "..."}
22.6 分布式 Session
import redis
import json
import uuid
import time
from typing import Dict, Any, Optional
class RedisSession:
"""基于 Redis 的分布式 Session"""
def __init__(self, redis_client, prefix: str = "session", ttl: int = 1800):
self.redis = redis_client
self.prefix = prefix
self.ttl = ttl
def create(self, user_id: str, data: dict = None) -> str:
"""创建 Session"""
session_id = str(uuid.uuid4())
session_data = {
'user_id': user_id,
'created_at': time.time(),
'data': data or {},
}
key = f"{self.prefix}:{session_id}"
self.redis.setex(key, self.ttl, json.dumps(session_data, ensure_ascii=False))
return session_id
def get(self, session_id: str) -> Optional[Dict]:
"""获取 Session"""
key = f"{self.prefix}:{session_id}"
data = self.redis.get(key)
if data:
# 延长过期时间
self.redis.expire(key, self.ttl)
return json.loads(data)
return None
def update(self, session_id: str, data: dict) -> bool:
"""更新 Session"""
key = f"{self.prefix}:{session_id}"
session_data = self.get(session_id)
if session_data:
session_data['data'].update(data)
self.redis.setex(key, self.ttl, json.dumps(session_data, ensure_ascii=False))
return True
return False
def delete(self, session_id: str) -> bool:
"""删除 Session"""
key = f"{self.prefix}:{session_id}"
return bool(self.redis.delete(key))
def touch(self, session_id: str) -> bool:
"""刷新 Session 过期时间"""
key = f"{self.prefix}:{session_id}"
return bool(self.redis.expire(key, self.ttl))
# 使用示例
r = redis.Redis(host='localhost', decode_responses=True)
session = RedisSession(r, ttl=3600)
# 创建 Session
sid = session.create("user:1001", {"role": "admin", "permissions": ["read", "write"]})
print(f"Session ID: {sid}")
# 获取 Session
data = session.get(sid)
print(f"Session data: {data}")
# 更新 Session
session.update(sid, {"last_page": "/dashboard"})
# 删除 Session(登出)
session.delete(sid)
22.7 布隆过滤器防穿透
import redis
class BloomFilter:
"""基于 Redis 的布隆过滤器"""
def __init__(self, redis_client, name: str, capacity: int = 1000000, error_rate: float = 0.001):
self.redis = redis_client
self.name = name
self.capacity = capacity
self.error_rate = error_rate
def create(self):
"""创建布隆过滤器"""
try:
self.redis.execute_command(
'BF.RESERVE', self.name, self.error_rate, self.capacity
)
except redis.exceptions.ResponseError:
pass # 已存在
def add(self, item: str) -> bool:
"""添加元素"""
return bool(self.redis.execute_command('BF.ADD', self.name, item))
def exists(self, item: str) -> bool:
"""检查元素是否存在"""
return bool(self.redis.execute_command('BF.EXISTS', self.name, item))
def batch_add(self, items: list) -> list:
"""批量添加"""
return self.redis.execute_command('BF.MADD', self.name, *items)
def batch_exists(self, items: list) -> list:
"""批量检查"""
return self.redis.execute_command('BF.MEXISTS', self.name, *items)
# 缓存穿透防护
class CacheWithBloomFilter:
"""带布隆过滤器的缓存"""
def __init__(self, redis_client, bloom_name: str = "bf:cache"):
self.redis = redis_client
self.bloom = BloomFilter(redis_client, bloom_name)
self.bloom.create()
def get(self, key: str, fetch_func=None):
# 1. 布隆过滤器检查
if not self.bloom.exists(key):
return None # Key 一定不存在
# 2. 查缓存
value = self.redis.get(key)
if value:
if value == "NULL":
return None
return json.loads(value)
# 3. 查数据库
if fetch_func:
value = fetch_func(key)
if value is None:
self.redis.setex(key, 300, "NULL") # 缓存空值
else:
self.redis.setex(key, 3600, json.dumps(value))
return value
return None
def register(self, key: str):
"""注册 Key 到布隆过滤器"""
self.bloom.add(key)
# 使用
r = redis.Redis(host='localhost', decode_responses=True)
cache = CacheWithBloomFilter(r)
# 注册合法 Key
cache.register("user:1001")
cache.register("user:1002")
# 查询
result = cache.get("user:9999") # 不存在的 Key,布隆过滤器直接返回 None
result = cache.get("user:1001", lambda k: {"id": 1001, "name": "张三"})
📌 业务场景总结
| 场景 | 数据结构 | 关键命令 | 注意事项 |
|---|---|---|---|
| 缓存 | String/Hash | GET/SET/SETEX | TTL 随机化、缓存穿透 |
| 排行榜 | ZSet | ZADD/ZREVRANGE | 分数设计、范围查询 |
| 分布式锁 | String | SET NX EX | 安全释放(Lua)、续期 |
| 消息队列 | Stream | XADD/XREADGROUP | ACK 机制、死信队列 |
| 限流 | String/ZSet | INCR/ZADD | 算法选择、Lua 原子性 |
| Session | String/Hash | GET/SET/HSET | 过期时间、序列化 |
| 计数器 | String | INCR/DECRBY | 原子操作、精度 |
| 标签系统 | Set | SADD/SINTER | 去重、集合运算 |
| 位图统计 | Bitmap | SETBIT/BITCOUNT | 节省内存、批量统计 |
| 地理位置 | GeoSpatial | GEOADD/GEOSEARCH | 经纬度精度、范围查询 |