Redis 完全指南 / 19 - Python 集成
Python 集成
19.1 redis-py 概述
redis-py 是 Python 最流行的 Redis 客户端库,支持同步和异步操作。
安装
# 基础安装
pip install redis
# 带异步支持
pip install redis[hiredis]
# hiredis 是 C 语言实现的解析器,性能提升 2-3 倍
版本特性
| 版本 | Python | 特性 |
|---|---|---|
| 4.x | 3.7+ | 异步支持、JSON、Search |
| 5.x | 3.8+ | 改进连接池、更好的 Cluster 支持 |
19.2 基础连接
基本连接
import redis
# 最简连接
r = redis.Redis(host='localhost', port=6379, db=0)
r.ping() # True
# 带密码和解码
r = redis.Redis(
host='localhost',
port=6379,
db=0,
password='yourpassword',
decode_responses=True, # 自动解码为字符串
socket_timeout=5,
socket_connect_timeout=5,
retry_on_timeout=True,
)
连接池
import redis
# 创建连接池
pool = redis.ConnectionPool(
host='localhost',
port=6379,
password='yourpassword',
db=0,
max_connections=100,
decode_responses=True,
socket_timeout=5,
socket_connect_timeout=5,
)
# 从连接池获取连接
r = redis.Redis(connection_pool=pool)
# 使用
r.set('key', 'value')
value = r.get('key')
Sentinel 连接
from redis.sentinel import Sentinel
sentinel = Sentinel(
[('192.168.1.1', 26379),
('192.168.1.2', 26379),
('192.168.1.3', 26379)],
socket_timeout=0.5,
password='sentinel_password'
)
# 获取主节点连接
master = sentinel.master_for('mymaster', password='redis_password')
master.set('key', 'value')
# 获取从节点连接
slave = sentinel.slave_for('mymaster', password='redis_password')
value = slave.get('key')
Cluster 连接
from redis.cluster import RedisCluster
rc = RedisCluster(
startup_nodes=[
{"host": "192.168.1.1", "port": 6379},
{"host": "192.168.1.2", "port": 6379},
{"host": "192.168.1.3", "port": 6379},
],
password='cluster_password',
decode_responses=True,
)
rc.set('key', 'value')
value = rc.get('key')
19.3 数据类型操作
String 操作
import redis
import json
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 基础操作
r.set('name', '张三')
r.get('name') # '张三'
# 设置过期时间
r.setex('session:abc', 3600, 'user_data') # 3600秒
r.psetex('token:xyz', 60000, 'jwt_token') # 60000毫秒
# 不存在才设置
r.setnx('lock:order', 'owner-uuid') # True/False
# 设置值+过期时间+NX(分布式锁推荐)
r.set('lock:order', 'owner-uuid', nx=True, ex=30)
# 批量操作
r.mset({'k1': 'v1', 'k2': 'v2', 'k3': 'v3'})
r.mget('k1', 'k2', 'k3') # ['v1', 'v2', 'v3']
# 数值操作
r.set('counter', 0)
r.incr('counter') # 1
r.incr('counter', 5) # 6
r.decr('counter') # 5
r.incrbyfloat('counter', 1.5) # 6.5
# 追加
r.append('greeting', 'Hello')
r.append('greeting', ' World')
r.get('greeting') # 'Hello World'
# JSON 序列化存储
user = {'name': '张三', 'age': 25, 'city': '北京'}
r.set('user:1001', json.dumps(user, ensure_ascii=False))
user_data = json.loads(r.get('user:1001'))
Hash 操作
# 设置字段
r.hset('user:1001', 'name', '张三')
r.hset('user:1001', mapping={'age': 25, 'city': '北京'})
# 获取字段
r.hget('user:1001', 'name') # '张三'
r.hmget('user:1001', 'name', 'age') # ['张三', '25']
r.hgetall('user:1001') # {'name': '张三', 'age': '25', 'city': '北京'}
# 数值操作
r.hincrby('user:1001', 'age', 1) # 26
r.hincrbyfloat('user:1001', 'score', 0.5)
# 判断字段是否存在
r.hexists('user:1001', 'name') # True
r.hexists('user:1001', 'phone') # False
# 删除字段
r.hdel('user:1001', 'city')
# 获取所有字段名/值
r.hkeys('user:1001') # ['name', 'age']
r.hvals('user:1001') # ['张三', 26]
r.hlen('user:1001') # 2
List 操作
# 推入
r.lpush('queue:tasks', 'task1', 'task2', 'task3')
r.rpush('queue:tasks', 'task4')
# 弹出
r.lpop('queue:tasks') # 'task3'
r.rpop('queue:tasks') # 'task4'
# 阻塞弹出
r.blpop('queue:tasks', timeout=30) # ('queue:tasks', 'task2')
r.brpop('queue:tasks', timeout=30)
# 范围查询
r.lrange('queue:tasks', 0, -1) # 所有元素
r.lrange('queue:tasks', 0, 9) # 前 10 个
# 长度
r.llen('queue:tasks')
# 索引
r.lindex('queue:tasks', 0)
# 修剪
r.ltrim('queue:tasks', 0, 99) # 只保留前 100 个
Set 操作
# 添加
r.sadd('tags:article:1', 'python', 'redis', 'tutorial')
# 判断成员
r.sismember('tags:article:1', 'python') # True
# 获取所有成员
r.smembers('tags:article:1') # {'python', 'redis', 'tutorial'}
# 集合大小
r.scard('tags:article:1') # 3
# 集合运算
r.sadd('set_a', 'a', 'b', 'c', 'd')
r.sadd('set_b', 'b', 'c', 'e', 'f')
r.sinter('set_a', 'set_b') # {'b', 'c'} 交集
r.sunion('set_a', 'set_b') # {'a','b','c','d','e','f'} 并集
r.sdiff('set_a', 'set_b') # {'a', 'd'} 差集
# 存储结果
r.sinterstore('result', 'set_a', 'set_b')
ZSet 操作
# 添加
r.zadd('leaderboard', {'Alice': 100, 'Bob': 85, 'Charlie': 92})
# 获取分数
r.zscore('leaderboard', 'Alice') # 100.0
# 分数自增
r.zincrby('leaderboard', 5, 'Bob') # 90.0
# 排名(降序,从 0 开始)
r.zrevrank('leaderboard', 'Alice') # 0
# Top N(降序)
r.zrevrange('leaderboard', 0, 2, withscores=True)
# [('Alice', 100.0), ('Charlie', 92.0), ('Bob', 90.0)]
# 分数范围查询
r.zrangebyscore('leaderboard', 80, 100, withscores=True)
# 集合大小
r.zcard('leaderboard') # 3
# 分数区间计数
r.zcount('leaderboard', 80, 100) # 3
# 删除
r.zrem('leaderboard', 'Bob')
# 按排名范围删除
r.zremrangebyrank('leaderboard', 0, 0) # 删除排名最低的
19.4 Pipeline 批量操作
import redis
import time
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 无 Pipeline
start = time.time()
for i in range(10000):
r.set(f'key:{i}', f'value:{i}')
print(f"No Pipeline: {time.time() - start:.2f}s") # ~8s
# Pipeline
start = time.time()
pipe = r.pipeline(transaction=False)
for i in range(10000):
pipe.set(f'key:{i}', f'value:{i}')
pipe.execute()
print(f"Pipeline: {time.time() - start:.2f}s") # ~0.5s
# 事务 Pipeline(MULTI/EXEC)
pipe = r.pipeline(transaction=True)
pipe.multi()
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
pipe.execute() # 原子执行
# 分批 Pipeline(避免一次发送过大)
def batch_pipeline(r, items, batch_size=1000):
"""分批执行 Pipeline"""
pipe = r.pipeline(transaction=False)
results = []
for i, (key, value) in enumerate(items):
pipe.set(key, value)
if (i + 1) % batch_size == 0:
results.extend(pipe.execute())
pipe = r.pipeline(transaction=False)
results.extend(pipe.execute()) # 执行剩余
return results
19.5 分布式锁
import redis
import uuid
import time
class RedisLock:
"""Redis 分布式锁"""
UNLOCK_SCRIPT = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
def __init__(self, redis_client, key, ttl=30):
self.redis = redis_client
self.key = f"lock:{key}"
self.ttl = ttl
self.token = str(uuid.uuid4())
def acquire(self, blocking=True, timeout=None):
"""获取锁"""
end_time = time.time() + timeout if timeout else None
while True:
if self.redis.set(self.key, self.token, nx=True, ex=self.ttl):
return True
if not blocking:
return False
if end_time and time.time() > end_time:
return False
time.sleep(0.1)
def release(self):
"""释放锁(Lua 脚本保证原子性)"""
return self.redis.eval(
self.UNLOCK_SCRIPT, 1, self.key, self.token
) == 1
def __enter__(self):
self.acquire()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
# 使用示例
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 方式一:上下文管理器
with RedisLock(r, 'order:create:1001', ttl=30):
# 临界区代码
print("Lock acquired, processing order...")
time.sleep(5)
# 自动释放锁
# 方式二:手动管理
lock = RedisLock(r, 'order:create:1001', ttl=30)
if lock.acquire(blocking=False):
try:
print("Lock acquired!")
finally:
lock.release()
19.6 异步 Redis(aioredis)
import asyncio
import redis.asyncio as aioredis
async def main():
# 异步连接
r = aioredis.Redis(
host='localhost',
port=6379,
password='yourpassword',
decode_responses=True,
)
# 异步操作
await r.set('key', 'value')
value = await r.get('key')
print(f"Value: {value}")
# 异步 Pipeline
pipe = r.pipeline()
for i in range(100):
pipe.set(f'async:key:{i}', f'value:{i}')
await pipe.execute()
# 异步哈希操作
await r.hset('user:1001', mapping={'name': '张三', 'age': 25})
name = await r.hget('user:1001', 'name')
print(f"Name: {name}")
# 异步发布订阅
pubsub = r.pubsub()
await pubsub.subscribe('channel:news')
async def listener():
async for message in pubsub.listen():
if message['type'] == 'message':
print(f"Received: {message['data']}")
# 启动监听任务
task = asyncio.create_task(listener())
# 发布消息
await r.publish('channel:news', 'Hello async Redis!')
await asyncio.sleep(1)
task.cancel()
await r.aclose()
asyncio.run(main())
FastAPI 集成
from fastapi import FastAPI, Depends
import redis.asyncio as aioredis
app = FastAPI()
# Redis 连接(生命周期管理)
redis_pool = None
@app.on_event("startup")
async def startup():
global redis_pool
redis_pool = aioredis.ConnectionPool.from_url(
"redis://:password@localhost:6379/0",
max_connections=100,
decode_responses=True,
)
@app.on_event("shutdown")
async def shutdown():
await redis_pool.aclose()
async def get_redis():
return aioredis.Redis(connection_pool=redis_pool)
@app.get("/api/cache/{key}")
async def get_cache(key: str, r: aioredis.Redis = Depends(get_redis)):
value = await r.get(key)
return {"key": key, "value": value}
@app.post("/api/cache/{key}")
async def set_cache(key: str, value: str, ttl: int = 3600,
r: aioredis.Redis = Depends(get_redis)):
await r.set(key, value, ex=ttl)
return {"key": key, "status": "ok"}
19.7 消息队列(Stream)
import redis
import json
import threading
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# ---- 生产者 ----
def produce():
for i in range(10):
msg_id = r.xadd('orders', {
'order_id': f'ORD-{1000 + i}',
'action': 'create',
'amount': str(99.9 + i),
})
print(f"Produced: {msg_id}")
# ---- 消费者 ----
GROUP_NAME = 'order-processors'
CONSUMER_NAME = 'consumer-1'
# 创建消费者组
try:
r.xgroup_create('orders', GROUP_NAME, id='0', mkstream=True)
except redis.exceptions.ResponseError:
pass
def consume():
while True:
messages = r.xreadgroup(
GROUP_NAME, CONSUMER_NAME,
{'orders': '>'},
count=10,
block=2000,
)
if not messages:
continue
for stream, entries in messages:
for msg_id, data in entries:
print(f"Processing: {msg_id} -> {data}")
r.xack('orders', GROUP_NAME, msg_id)
# 启动消费者线程
t = threading.Thread(target=consume, daemon=True)
t.start()
# 生产消息
produce()
📌 业务场景
场景一:Flask/Django 缓存
from functools import wraps
import redis
import json
r = redis.Redis(host='localhost', decode_responses=True)
def cached(prefix, ttl=300):
"""自定义缓存装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
cache_key = f"{prefix}:{':'.join(str(a) for a in args)}"
cached_value = r.get(cache_key)
if cached_value:
return json.loads(cached_value)
result = func(*args, **kwargs)
r.setex(cache_key, ttl, json.dumps(result, ensure_ascii=False))
return result
return wrapper
return decorator
@cached('user', ttl=3600)
def get_user(user_id):
# 查询数据库
return db.query_user(user_id)
场景二:限流器
class RateLimiter:
def __init__(self, redis_client):
self.redis = redis_client
def is_allowed(self, key, limit, window):
"""滑动窗口限流"""
now = time.time()
pipe = self.redis.pipeline()
# 移除窗口外的记录
pipe.zremrangebyscore(key, 0, now - window)
# 统计窗口内的请求数
pipe.zcard(key)
# 添加当前请求
pipe.zadd(key, {str(now): now})
# 设置过期时间
pipe.expire(key, window)
results = pipe.execute()
count = results[1]
return count < limit
limiter = RateLimiter(r)
if limiter.is_allowed('rate:user:1001', limit=100, window=60):
print("Request allowed")
else:
print("Rate limit exceeded")
场景三:分布式任务队列
import redis
import json
import uuid
class TaskQueue:
def __init__(self, name, redis_client):
self.name = name
self.redis = redis_client
def enqueue(self, task_type, payload):
task = {
'id': str(uuid.uuid4()),
'type': task_type,
'payload': payload,
}
self.redis.xadd(f'queue:{self.name}', {'data': json.dumps(task)})
return task['id']
def create_consumer(self, group, consumer):
try:
self.redis.xgroup_create(f'queue:{self.name}', group, id='0', mkstream=True)
except redis.exceptions.ResponseError:
pass
while True:
messages = self.redis.xreadgroup(
group, consumer,
{f'queue:{self.name}': '>'},
count=1,
block=5000,
)
if not messages:
continue
for stream, entries in messages:
for msg_id, data in entries:
task = json.loads(data['data'])
yield msg_id, task
# 使用
queue = TaskQueue('email', r)
queue.enqueue('send_welcome', {'user_id': 1001, 'email': '[email protected]'})