Sysbench 完全指南 / 第五章:自定义 Lua 测试
第五章:自定义 Lua 测试
5.1 为什么需要自定义测试
虽然 Sysbench 内置了多种 OLTP 测试,但真实业务场景往往与标准测试有差异。通过编写自定义 Lua 脚本,你可以:
- 模拟特定业务的 SQL 模式(如电商下单、社交 Feed 流)
- 测试特定表结构(如 JSON 字段、全文索引)
- 调整读写比例以匹配真实负载
- 实现自定义的初始化和清理逻辑
- 测试存储过程 / 函数
5.2 Lua 脚本基础
5.2.1 Sysbench 的 Lua API
Sysbench 使用内嵌的 Lua 5.1 解释器(支持 LuaJIT),并提供以下核心 API:
| API | 说明 |
|---|---|
sysbench.cmdline | 命令行参数定义 |
sysbench.opt | 访问命令行选项值 |
sysbench.rand | 随机数生成 |
sysbench.sql | SQL 辅助函数 |
sb_event | 事件类型常量 |
sb_status | 状态查询 |
5.2.2 脚本生命周期
-- 1. 命令行选项定义(加载时执行)
sysbench.cmdline.options = { ... }
-- 2. thread_init() - 每个线程初始化时调用一次
function thread_init()
-- 创建每线程的数据库连接
-- 初始化局部变量
end
-- 3. event() - 每次事件(事务)调用一次
function event()
-- 核心测试逻辑
-- 这里放置 SQL 操作
end
-- 4. thread_done() - 每个线程结束时调用一次
function thread_done()
-- 关闭连接
-- 清理资源
end
-- 5. prepare() - prepare 阶段调用(可选)
function prepare()
-- 创建表、插入初始数据
end
-- 6. cleanup() - cleanup 阶段调用(可选)
function cleanup()
-- 删除表、清理数据
end
-- 7. done() - 所有线程结束后调用(可选,用于报告自定义指标)
function done()
-- 汇总统计、输出自定义报告
end
5.2.3 最小示例
-- minimal.lua - 最小的自定义测试脚本
function event()
-- 每次事件什么都不做
end
sysbench --lua-script=minimal.lua run --threads=4 --time=10
5.3 数据库操作 API
5.3.1 连接管理
function thread_init()
-- 创建数据库连接
drv = sysbench.sql.driver()
con = drv:connect()
end
function thread_done()
-- 断开连接
con:disconnect()
end
5.3.2 执行 SQL
function event()
-- 执行查询(返回结果集)
local rs = con:query("SELECT * FROM users WHERE id = " .. sysbench.rand.uniform(1, 100000))
-- 遍历结果集
for i = 1, rs.nrows do
local id = rs[i][1] -- 第 1 列
local name = rs[i][2] -- 第 2 列
end
-- 执行更新(无返回结果)
con:query("UPDATE users SET login_count = login_count + 1 WHERE id = 1")
-- 执行插入
con:query(string.format(
"INSERT INTO logs (user_id, action, created_at) VALUES (%d, '%s', NOW())",
sysbench.rand.uniform(1, 100000),
"login"
))
end
5.3.3 预编译语句(Prepared Statement)
function thread_init()
drv = sysbench.sql.driver()
con = drv:connect()
-- 准备语句(性能更好)
stmt = con:prepare("SELECT * FROM users WHERE id = ?")
end
function event()
-- 绑定参数并执行
stmt:bind(sysbench.rand.uniform(1, 100000))
local rs = stmt:execute()
end
function thread_done()
stmt:close()
con:disconnect()
end
5.3.4 事务控制
function event()
con:query("BEGIN")
-- 执行一系列操作
con:query("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
con:query("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
con:query("COMMIT")
-- 或 con:query("ROLLBACK")
end
5.4 自定义命令行选项
5.4.1 定义选项
-- custom_options.lua
sysbench.cmdline.options = {
{"用户表大小", "users-table-size", 100000},
{"订单表大小", "orders-table-size", 500000},
{"热点数据比例 (百分比)", "hotspot-pct", 20},
{"是否使用事务", "use-transaction", true},
{"自定义 SQL", "custom-sql", ""},
}
# 使用自定义选项
sysbench --lua-script=custom_options.lua \
--users-table-size=500000 \
--orders-table-size=2000000 \
--hotspot-pct=10 \
--threads=16 \
--time=60 \
run
5.4.2 在脚本中访问选项
function event()
-- 通过 sysbench.opt 访问选项值
local table_size = sysbench.opt.users_table_size -- 注意:连字符转下划线
local hotspot_pct = sysbench.opt.hotspot_pct
-- 使用选项值
local id
if sysbench.rand.uniform(1, 100) <= hotspot_pct then
-- 热点数据:访问前 20% 的数据
id = sysbench.rand.uniform(1, math.floor(table_size * 0.2))
else
-- 冷数据:访问后 80% 的数据
id = sysbench.rand.uniform(math.floor(table_size * 0.2) + 1, table_size)
end
con:query("SELECT * FROM users WHERE id = " .. id)
end
5.5 完整示例:电商下单测试
5.5.1 场景描述
模拟电商下单场景:
- 查询用户信息
- 查询商品信息和库存
- 创建订单
- 减少库存
- 所有操作在一个事务中
5.5.2 完整脚本
-- ecommerce_order.lua
-- 电商下单场景测试
sysbench.cmdline.options = {
{"用户表行数", "users-size", 100000},
{"商品表行数", "products-size", 10000},
{"订单表行数", "orders-size", 1000000},
{"每单商品数", "items-per-order", 3},
{"热点商品比例 (%)", "hotspot-pct", 10},
}
local drv, con
function prepare()
local host = sysbench.opt.mysql_host or "127.0.0.1"
local port = sysbench.opt.mysql_port or 3306
local user = sysbench.opt.mysql_user or "root"
local pass = sysbench.opt.mysql_password or ""
local db = sysbench.opt.mysql_db or "sbtest"
drv = sysbench.sql.driver()
con = drv:connect()
-- 创建数据库
con:query("CREATE DATABASE IF NOT EXISTS " .. db)
con:query("USE " .. db)
-- 创建用户表
con:query([[
CREATE TABLE IF NOT EXISTS ec_users (
id INT PRIMARY KEY AUTO_INCREMENT,
username VARCHAR(64) NOT NULL,
email VARCHAR(128) NOT NULL,
balance DECIMAL(10,2) DEFAULT 0.00,
level TINYINT DEFAULT 1,
created_at DATETIME DEFAULT NOW(),
INDEX idx_username (username)
) ENGINE=InnoDB
]])
-- 创建商品表
con:query([[
CREATE TABLE IF NOT EXISTS ec_products (
id INT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(255) NOT NULL,
price DECIMAL(10,2) NOT NULL,
stock INT NOT NULL DEFAULT 0,
category_id INT DEFAULT 1,
status TINYINT DEFAULT 1,
INDEX idx_category (category_id),
INDEX idx_status (status)
) ENGINE=InnoDB
]])
-- 创建订单表
con:query([[
CREATE TABLE IF NOT EXISTS ec_orders (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id INT NOT NULL,
total_amount DECIMAL(12,2) NOT NULL,
status TINYINT DEFAULT 0,
created_at DATETIME DEFAULT NOW(),
INDEX idx_user_id (user_id),
INDEX idx_status (status),
INDEX idx_created (created_at)
) ENGINE=InnoDB
]])
-- 创建订单明细表
con:query([[
CREATE TABLE IF NOT EXISTS ec_order_items (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
order_id BIGINT NOT NULL,
product_id INT NOT NULL,
quantity INT NOT NULL,
price DECIMAL(10,2) NOT NULL,
INDEX idx_order_id (order_id),
INDEX idx_product_id (product_id)
) ENGINE=InnoDB
]])
-- 插入用户数据
print("Inserting users...")
for i = 1, sysbench.opt.users_size do
con:query(string.format(
"INSERT IGNORE INTO ec_users (username, email, balance) VALUES ('user%d', 'user%[email protected]', %.2f)",
i, i, sysbench.rand.uniform_double() * 10000
))
end
-- 插入商品数据
print("Inserting products...")
for i = 1, sysbench.opt.products_size do
con:query(string.format(
"INSERT IGNORE INTO ec_products (name, price, stock) VALUES ('Product-%d', %.2f, %d)",
i,
sysbench.rand.uniform_double() * 999 + 1,
sysbench.rand.uniform(10, 1000)
))
end
print("Prepare complete!")
end
function cleanup()
drv = sysbench.sql.driver()
con = drv:connect()
con:query("DROP TABLE IF EXISTS ec_order_items")
con:query("DROP TABLE IF EXISTS ec_orders")
con:query("DROP TABLE IF EXISTS ec_products")
con:query("DROP TABLE IF EXISTS ec_users")
end
function thread_init()
drv = sysbench.sql.driver()
con = drv:connect()
con:query("USE " .. (sysbench.opt.mysql_db or "sbtest"))
end
function thread_done()
con:disconnect()
end
function get_hot_product_id()
local total = sysbench.opt.products_size
local hotspot_size = math.floor(total * sysbench.opt.hotspot_pct / 100)
if sysbench.rand.uniform(1, 100) <= 70 then
-- 70% 概率访问热点商品
return sysbench.rand.uniform(1, hotspot_size)
else
-- 30% 概率访问冷门商品
return sysbench.rand.uniform(hotspot_size + 1, total)
end
end
function event()
local user_id = sysbench.rand.uniform(1, sysbench.opt.users_size)
local num_items = sysbench.rand.uniform(1, sysbench.opt.items_per_order)
con:query("BEGIN")
-- 1. 查询用户余额
local rs = con:query("SELECT balance FROM ec_users WHERE id = " .. user_id)
if rs.nrows == 0 then
con:query("ROLLBACK")
return
end
-- 2. 逐个处理商品
local total = 0
local items = {}
for i = 1, num_items do
local product_id = get_hot_product_id()
-- 查询商品信息和库存
local prs = con:query(string.format(
"SELECT price, stock FROM ec_products WHERE id = %d AND status = 1 FOR UPDATE",
product_id
))
if prs.nrows > 0 then
local price = tonumber(prs[1][1])
local stock = tonumber(prs[1][2])
if stock > 0 then
local qty = math.min(sysbench.rand.uniform(1, 3), stock)
total = total + price * qty
table.insert(items, {product_id, qty, price})
-- 减少库存
con:query(string.format(
"UPDATE ec_products SET stock = stock - %d WHERE id = %d",
qty, product_id
))
end
end
end
-- 3. 创建订单
if #items > 0 then
con:query(string.format(
"INSERT INTO ec_orders (user_id, total_amount, status) VALUES (%d, %.2f, 0)",
user_id, total
))
-- 获取订单 ID
local ors = con:query("SELECT LAST_INSERT_ID()")
local order_id = tonumber(ors[1][1])
-- 插入订单明细
for _, item in ipairs(items) do
con:query(string.format(
"INSERT INTO ec_order_items (order_id, product_id, quantity, price) VALUES (%d, %d, %d, %.2f)",
order_id, item[1], item[2], item[3]
))
end
end
con:query("COMMIT")
end
5.5.3 使用自定义脚本
# 准备数据
sysbench --lua-script=ecommerce_order.lua \
--mysql-host=127.0.0.1 \
--mysql-user=root \
--mysql-password=secret \
--mysql-db=ecommerce_bench \
--users-size=100000 \
--products-size=10000 \
prepare
# 运行测试
sysbench --lua-script=ecommerce_order.lua \
--mysql-host=127.0.0.1 \
--mysql-user=root \
--mysql-password=secret \
--mysql-db=ecommerce_bench \
--users-size=100000 \
--products-size=10000 \
--hotspot-pct=10 \
--threads=32 \
--time=300 \
--histogram \
run
# 清理
sysbench --lua-script=ecommerce_order.lua \
--mysql-host=127.0.0.1 \
--mysql-user=root \
--mysql-db=ecommerce_bench \
cleanup
5.6 完整示例:社交动态 Feed 测试
-- social_feed.lua
-- 社交平台 Feed 流读写测试
sysbench.cmdline.options = {
{"用户数", "user-count", 100000},
{"每人关注数", "follows-per-user", 50},
{"每人动态数", "posts-per-user", 100},
}
local drv, con
function prepare()
drv = sysbench.sql.driver()
con = drv:connect()
con:query("CREATE DATABASE IF NOT EXISTS " .. (sysbench.opt.mysql_db or "sbtest"))
con:query("USE " .. (sysbench.opt.mysql_db or "sbtest"))
con:query([[
CREATE TABLE IF NOT EXISTS sf_users (
id INT PRIMARY KEY AUTO_INCREMENT,
nickname VARCHAR(64),
INDEX idx_nickname (nickname)
) ENGINE=InnoDB
]])
con:query([[
CREATE TABLE IF NOT EXISTS sf_follows (
user_id INT NOT NULL,
follow_id INT NOT NULL,
created_at DATETIME DEFAULT NOW(),
PRIMARY KEY (user_id, follow_id),
INDEX idx_follow (follow_id)
) ENGINE=InnoDB
]])
con:query([[
CREATE TABLE IF NOT EXISTS sf_posts (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id INT NOT NULL,
content TEXT,
like_count INT DEFAULT 0,
comment_count INT DEFAULT 0,
created_at DATETIME DEFAULT NOW(),
INDEX idx_user_time (user_id, created_at DESC)
) ENGINE=InnoDB
]])
-- 插入用户
print("Inserting users...")
local batch_size = 1000
for i = 1, sysbench.opt.user_count, batch_size do
local values = {}
for j = i, math.min(i + batch_size - 1, sysbench.opt.user_count) do
table.insert(values, string.format("('user_%d')", j))
end
con:query("INSERT INTO sf_users (nickname) VALUES " .. table.concat(values, ","))
end
-- 插入关注关系
print("Inserting follows...")
for i = 1, sysbench.opt.user_count do
for j = 1, sysbench.opt.follows_per_user do
local follow_id = sysbench.rand.uniform(1, sysbench.opt.user_count)
con:query(string.format(
"INSERT IGNORE INTO sf_follows (user_id, follow_id) VALUES (%d, %d)",
i, follow_id
))
end
end
-- 插入动态
print("Inserting posts...")
for i = 1, sysbench.opt.user_count do
for j = 1, sysbench.opt.posts_per_user do
con:query(string.format(
"INSERT INTO sf_posts (user_id, content, like_count, comment_count) VALUES (%d, 'Post content %d-%d', %d, %d)",
i, i, j, sysbench.rand.uniform(0, 1000), sysbench.rand.uniform(0, 100)
))
end
end
print("Prepare complete!")
end
function cleanup()
drv = sysbench.sql.driver()
con = drv:connect()
con:query("DROP TABLE IF EXISTS sf_posts")
con:query("DROP TABLE IF EXISTS sf_follows")
con:query("DROP TABLE IF EXISTS sf_users")
end
function thread_init()
drv = sysbench.sql.driver()
con = drv:connect()
con:query("USE " .. (sysbench.opt.mysql_db or "sbtest"))
end
function thread_done()
con:disconnect()
end
function event()
local user_id = sysbench.rand.uniform(1, sysbench.opt.user_count)
local action = sysbench.rand.uniform(1, 10)
if action <= 5 then
-- 50%: 读取 Feed(关注的人的最新动态)
con:query(string.format([[
SELECT p.id, p.user_id, p.content, p.like_count, p.created_at
FROM sf_posts p
INNER JOIN sf_follows f ON f.follow_id = p.user_id
WHERE f.user_id = %d
ORDER BY p.created_at DESC
LIMIT 20
]], user_id))
elseif action <= 7 then
-- 20%: 发布动态
con:query(string.format(
"INSERT INTO sf_posts (user_id, content, like_count) VALUES (%d, 'New post at %d', 0)",
user_id, os.time()
))
elseif action <= 9 then
-- 20%: 点赞
con:query(string.format(
"UPDATE sf_posts SET like_count = like_count + 1 WHERE id = %d",
sysbench.rand.uniform(1, sysbench.opt.user_count * sysbench.opt.posts_per_user)
))
else
-- 10%: 查看某用户的个人页
con:query(string.format(
"SELECT * FROM sf_posts WHERE user_id = %d ORDER BY created_at DESC LIMIT 20",
user_id
))
end
end
5.7 随机数 API 详解
5.7.1 随机数函数
-- 均匀分布随机整数 [min, max]
sysbench.rand.uniform(1, 1000)
-- 均匀分布随机浮点数 [0, 1)
sysbench.rand.uniform_double()
-- 高斯分布(正态分布),参数为均值和标准差
sysbench.rand.gaussian(100, 20)
-- 特殊分布(热点数据集中)
-- 由 --rand-spec-pct 和 --rand-spec-res 控制
sysbench.rand.special()
-- Pareto 分布
sysbench.rand.pareto()
-- Zipfian 分布
sysbench.rand.zipfian()
-- 生成指定长度的随机字符串
sysbench.rand.string("#####-@@@@-$$$$") -- #=数字 @=字母 $=混合
5.7.2 随机字符串模板
| 字符 | 含义 |
|---|---|
@ | 随机字母(a-z, A-Z) |
# | 随机数字(0-9) |
$ | 随机字母或数字 |
| 其他 | 原样输出 |
-- 生成订单号格式:ORD-20240101-ABCD1234
local order_no = "ORD-" .. sysbench.rand.string("######") .. "-" .. sysbench.rand.string("@@@@####")
-- 生成邮箱:[email protected]
local email = "user" .. sysbench.rand.string("####") .. "@example.com"
5.8 高级技巧
5.8.1 自定义统计报告
-- 使用全局变量收集自定义统计
local custom_stats = {
order_count = 0,
total_amount = 0,
error_count = 0,
}
function event()
-- ... 执行业务逻辑 ...
-- 更新统计
custom_stats.order_count = custom_stats.order_count + 1
custom_stats.total_amount = custom_stats.total_amount + amount
if error then
custom_stats.error_count = custom_stats.error_count + 1
end
end
function done()
-- 输出自定义报告
print("")
print("=== 自定义业务统计 ===")
print(string.format("订单总数: %d", custom_stats.order_count))
print(string.format("总金额: %.2f", custom_stats.total_amount))
print(string.format("错误次数: %d", custom_stats.error_count))
print(string.format("平均每单金额: %.2f",
custom_stats.order_count > 0 and custom_stats.total_amount / custom_stats.order_count or 0))
end
注意:
done()在所有线程结束后由主线程调用,但自定义变量在多线程环境下是隔离的,需要通过其他方式汇总(如写入共享文件或数据库)。
5.8.2 条件执行与错误处理
function event()
local ok, err = pcall(function()
con:query("BEGIN")
con:query("UPDATE accounts SET balance = balance - 100 WHERE id = 1 AND balance >= 100")
con:query("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
con:query("COMMIT")
end)
if not ok then
-- 错误处理
pcall(function() con:query("ROLLBACK") end)
sysbench.metrics.report_event("errors")
end
end
5.8.3 连接复用与连接池
-- 每个线程使用独立的连接(Sysbench 默认模式)
-- 连接在 thread_init() 中创建,在 thread_done() 中关闭
-- 如果需要多个连接(如同时操作主库和从库)
function thread_init()
drv = sysbench.sql.driver()
con_master = drv:connect() -- 主库连接
con_slave = drv:connect() -- 从库连接(需配置不同选项)
end
5.8.4 使用外部文件中的数据
-- 从 CSV 文件加载测试数据
function prepare()
local file = io.open("test_data.csv", "r")
if not file then
error("Cannot open test_data.csv")
end
for line in file:lines() do
local fields = {}
for field in line:gmatch("[^,]+") do
table.insert(fields, field)
end
con:query(string.format(
"INSERT INTO users (name, email, age) VALUES ('%s', '%s', %d)",
fields[1], fields[2], tonumber(fields[3])
))
end
file:close()
end
5.9 内置 OLTP 脚本的源码结构
理解内置脚本的结构有助于编写自定义脚本。以 oltp_read_write.lua 为例:
-- /usr/share/sysbench/oltp_read_write.lua 的简化结构
pathtest = string.match(testpath, "(.*/)")
-- 加载公共模块
require(pathtest .. "oltp_common")
function prepare()
-- 调用公共模块的 prepare
oltp_prepare()
end
function cleanup()
oltp_cleanup()
end
function thread_init()
-- 调用公共模块的线程初始化
oltp_thread_init()
end
function thread_done()
oltp_thread_done()
end
function event()
-- 每个事务包含的具体操作
local table_name = "sbtest" .. sysbench.rand.uniform(1, sysbench.opt.tables)
local id = sysbench.rand.uniform(1, sysbench.opt.table_size)
con:query("BEGIN")
-- 点查询
for i = 1, sysbench.opt.point_selects do
con:query("SELECT c FROM " .. table_name .. " WHERE id = " .. id)
end
-- 范围查询
for i = 1, sysbench.opt.simple_ranges do
local range_start = sysbench.rand.uniform(1, sysbench.opt.table_size)
local range_end = range_start + sysbench.opt.range_size
con:query(string.format("SELECT c FROM %s WHERE id BETWEEN %d AND %d",
table_name, range_start, range_end))
end
-- ... 更多操作 ...
con:query("COMMIT")
end
公共模块 oltp_common.lua 包含:
- 表结构定义
oltp_prepare()函数(创建表、插入数据)oltp_cleanup()函数(删除表)- 命令行选项定义
5.10 调试 Lua 脚本
5.10.1 启用调试输出
-- 使用 print() 输出调试信息
function event()
local id = sysbench.rand.uniform(1, 1000)
print("DEBUG: Querying id = " .. id) -- 调试输出
con:query("SELECT * FROM sbtest WHERE id = " .. id)
end
# 运行时增加 verbosity
sysbench --lua-script=my_test.lua --verbosity=5 --threads=1 --time=5 run
5.10.2 使用断言
function event()
local rs = con:query("SELECT COUNT(*) FROM users")
assert(rs.nrows > 0, "Users table is empty!")
local count = tonumber(rs[1][1])
assert(count > 0, "No users found")
end
5.10.3 单线程调试
# 使用 1 个线程,短时间运行
sysbench --lua-script=my_test.lua --threads=1 --time=5 --verbosity=5 run
5.11 小结
| 要点 | 说明 |
|---|---|
| 脚本结构 | thread_init → event() 循环 → thread_done |
| 数据库操作 | con:query() 执行 SQL,con:prepare() 预编译 |
| 自定义选项 | sysbench.cmdline.options 定义,sysbench.opt 访问 |
| 随机数 | sysbench.rand.uniform/uniform_double/gaussian 等 |
| 调试方法 | print() + –verbosity=5 + 单线程运行 |
| 最佳实践 | 使用预编译语句、合理使用事务、错误处理 |