强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

Julia 教程 / 数据库操作

数据库操作

数据持久化是大多数应用的核心需求。Julia 提供了多种数据库驱动和 ORM,支持 SQLite、PostgreSQL、MySQL 以及通用 ODBC 连接。本文介绍最常用的数据库操作方式。


1. SQLite.jl — SQLite 操作

1.1 安装与连接

using Pkg
Pkg.add("SQLite")

using SQLite

# 创建/打开数据库
db = SQLite.DB("mydata.db")

# 或内存数据库
db = SQLite.DB(":memory:")

# 关闭
close(db)

1.2 创建表与插入数据

db = SQLite.DB("mydata.db")

# 创建表
SQLite.execute(db, """
    CREATE TABLE IF NOT EXISTS users (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        name TEXT NOT NULL,
        email TEXT UNIQUE,
        age INTEGER DEFAULT 0,
        created_at TEXT DEFAULT CURRENT_TIMESTAMP
    )
""")

# 插入单条数据
SQLite.execute(db, """
    INSERT INTO users (name, email, age) VALUES (?, ?, ?)
""", ["Alice", "[email protected]", 25])

# 批量插入
names = ["Bob", "Charlie", "Diana"]
emails = ["[email protected]", "[email protected]", "[email protected]"]
ages = [30, 35, 28]

for (name, email, age) in zip(names, emails, ages)
    SQLite.execute(db, "INSERT INTO users (name, email, age) VALUES (?, ?, ?)",
                   [name, email, age])
end

1.3 查询数据

# 查询返回 DataFrame
using DataFrames

result = SQLite.execute(db, "SELECT * FROM users WHERE age > ?", [25])
df = DataFrame(result)
println(df)

# 查询单条
result = SQLite.execute(db, "SELECT name FROM users WHERE id = ?", [1])

1.4 事务处理

# 使用事务批量操作
SQLite.transaction(db) do
    for i in 1:1000
        SQLite.execute(db, "INSERT INTO users (name, email, age) VALUES (?, ?, ?)",
                       ["User $i", "user$i@example.com", rand(18:60)])
    end
end  # 自动提交;异常时自动回滚

2. LibPQ.jl — PostgreSQL

2.1 安装与连接

using Pkg
Pkg.add("LibPQ")

using LibPQ

# 连接 PostgreSQL
conn = LibPQ.Connection("host=localhost dbname=mydb user=postgres password=secret")

# 或使用 URL
conn = LibPQ.Connection("postgresql://postgres:secret@localhost/mydb")

# 关闭
close(conn)

2.2 基本操作

# 创建表
execute(conn, """
    CREATE TABLE IF NOT EXISTS products (
        id SERIAL PRIMARY KEY,
        name VARCHAR(100) NOT NULL,
        price DECIMAL(10, 2),
        category VARCHAR(50),
        in_stock BOOLEAN DEFAULT true
    )
""")

# 插入数据(参数化查询,防 SQL 注入)
execute(conn, """
    INSERT INTO products (name, price, category) VALUES (\$1, \$2, \$3)
""", ["Laptop", 999.99, "Electronics"])

# 查询
result = execute(conn, "SELECT * FROM products WHERE category = \$1", ["Electronics"])
df = DataFrame(result)

2.3 使用连接池

using ConnectionPools

# 创建连接池
pool = ConnectionPool{LibPQ.Connection}(
    () -> LibPQ.Connection("host=localhost dbname=mydb"),
    5,   # 最小连接数
    20   # 最大连接数
)

# 从池中获取连接
conn = acquire(pool)
try
    result = execute(conn, "SELECT * FROM products")
finally
    release!(pool, conn)
end

3. ODBC.jl — 通用 ODBC 连接

3.1 配置

using Pkg
Pkg.add("ODBC")

using ODBC

# 列出可用驱动
ODBC.drivers()

# 列出数据源
ODBC.dsns()

3.2 连接与查询

# 连接
dsn = ODBC.DSN("MyDatabase", "username", "password")

# 或使用连接字符串
dsn = ODBC.DSN("Driver={PostgreSQL};Server=localhost;Database=mydb;")

# 查询
result = ODBC.query(dsn, "SELECT * FROM users LIMIT 10")
df = DataFrame(result)

# 参数化查询
result = ODBC.query(dsn, "SELECT * FROM users WHERE age > ?", [30])

3.3 支持的数据库

数据库 驱动 连接字符串示例
PostgreSQL psqlODBC Driver={PostgreSQL};Server=localhost;Database=mydb;
MySQL MySQL ODBC Driver={MySQL};Server=localhost;Database=mydb;
SQL Server ODBC Driver 17 Driver={ODBC Driver 17 for SQL Server};Server=.;Database=mydb;
SQLite SQLite3 ODBC Driver={SQLite3 ODBC};Database=mydb.db;

4. MySQL.jl

using Pkg
Pkg.add("MySQL")

using MySQL

# 连接
conn = MySQL.connect("localhost", "root", "password", db="mydb")

# 查询
result = MySQL.query(conn, "SELECT * FROM users")
df = DataFrame(result)

# 插入
MySQL.execute!(conn, "INSERT INTO users (name, age) VALUES (?, ?)", ["Alice", 25])

# 关闭
MySQL.disconnect(conn)

5. SearchLight.jl — ORM

5.1 定义模型

using SearchLight

Base.@kwdef mutable struct Article <: AbstractModel
    id::DbId = DbId()
    title::String = ""
    content::String = ""
    views::Int = 0
    published::Bool = false
end

5.2 迁移

# db/migrations/001_create_articles.jl
module CreateArticles

using SearchLight.Migration

function up()
    create_table(:articles) do
        [
            column(:id, :integer, primary_key=true, autoincrement=true)
            column(:title, :string, not_null=true, limit=200)
            column(:content, :text)
            column(:views, :integer, default=0)
            column(:published, :boolean, default=false)
            timestamps()
        ]
    end

    add_index(:articles, :title, unique=true)
end

function down()
    drop_table(:articles)
end

end

# 执行迁移
SearchLight.Migration.up()

5.3 CRUD 操作

# 创建
article = Article(title="Julia 入门", content="这是内容...")
save!(article)

# 查询
all_articles = all(Article)
article = findone(Article, title="Julia 入门")
recent = find(Article, SQLWhereExpression("views > ?", [100]))

# 更新
article.views += 1
save!(article)

# 删除
delete!(article)

# 链式查询(如果支持)
# articles = Article |> where(:published, "=", true) |> order(:views, :desc) |> limit(10)

6. 连接池管理

6.1 手动实现连接池

mutable struct ConnectionPool{T}
    factory::Function
    pool::Vector{T}
    lock::ReentrantLock
    max_size::Int
end

function ConnectionPool{T}(factory::Function, max_size::Int=10) where T
    pool = T[]
    ConnectionPool{T}(factory, pool, ReentrantLock(), max_size)
end

function acquire!(pool::ConnectionPool{T}) where T
    lock(pool.lock) do
        if !isempty(pool.pool)
            return pop!(pool.pool)
        end
    end
    return pool.factory()
end

function release!(pool::ConnectionPool{T}, conn::T) where T
    lock(pool.lock) do
        if length(pool.pool) < pool.max_size
            push!(pool.pool, conn)
        else
            close(conn)
        end
    end
end

# 使用
pool = ConnectionPool{SQLite.DB}(() -> SQLite.DB("mydata.db"), 5)
conn = acquire!(pool)
try
    SQLite.execute(conn, "SELECT 1")
finally
    release!(pool, conn)
end

6.2 使用模式

# do 语法自动管理连接
function with_connection(f, pool::ConnectionPool)
    conn = acquire!(pool)
    try
        f(conn)
    finally
        release!(pool, conn)
    end
end

# 使用
with_connection(pool) do conn
    result = SQLite.execute(conn, "SELECT * FROM users")
    DataFrame(result)
end

7. 事务处理

7.1 SQLite 事务

function transfer_funds(db, from_id, to_id, amount)
    SQLite.transaction(db) do
        # 检查余额
        balance = SQLite.execute(db,
            "SELECT balance FROM accounts WHERE id = ?", [from_id])
        
        if balance < amount
            error("余额不足")
        end
        
        # 扣款
        SQLite.execute(db,
            "UPDATE accounts SET balance = balance - ? WHERE id = ?",
            [amount, from_id])
        
        # 入账
        SQLite.execute(db,
            "UPDATE accounts SET balance = balance + ? WHERE id = ?",
            [amount, to_id])
    end
    # 事务自动提交;任何异常自动回滚
end

7.2 PostgreSQL 事务

function batch_insert(conn, records)
    execute(conn, "BEGIN")
    try
        for record in records
            execute(conn, "INSERT INTO logs (message) VALUES (\$1)", [record])
        end
        execute(conn, "COMMIT")
    catch e
        execute(conn, "ROLLBACK")
        rethrow(e)
    end
end

8. SQL 查询构建

8.1 原生 SQL(推荐简单场景)

# 参数化查询 — 防止 SQL 注入
function search_users(db, name_pattern, min_age)
    sql = """
        SELECT id, name, email, age
        FROM users
        WHERE name LIKE ? AND age >= ?
        ORDER BY name
        LIMIT 100
    """
    DataFrame(SQLite.execute(db, sql, [name_pattern, min_age]))
end

8.2 查询构建器模式

# 简单的查询构建器
mutable struct QueryBuilder
    table::String
    conditions::Vector{String}
    params::Vector{Any}
    order::String
    limit_val::Int
end

QueryBuilder(table::String) = QueryBuilder(table, String[], [], "", 0)

function where!(qb::QueryBuilder, condition::String, params...)
    push!(qb.conditions, condition)
    append!(qb.params, collect(params))
    return qb
end

function order_by!(qb::QueryBuilder, col::String, dir::String="ASC")
    qb.order = "$col $dir"
    return qb
end

function limit!(qb::QueryBuilder, n::Int)
    qb.limit_val = n
    return qb
end

function build(qb::QueryBuilder)
    sql = "SELECT * FROM $(qb.table)"
    if !isempty(qb.conditions)
        sql *= " WHERE " * join(qb.conditions, " AND ")
    end
    if !isempty(qb.order)
        sql *= " ORDER BY $(qb.order)"
    end
    if qb.limit_val > 0
        sql *= " LIMIT $(qb.limit_val)"
    end
    return sql, qb.params
end

# 使用
qb = QueryBuilder("users")
where!(qb, "age > ?", 18)
where!(qb, "active = ?", true)
order_by!(qb, "name")
limit!(qb, 50)

sql, params = build(qb)
println(sql)  # SELECT * FROM users WHERE age > ? AND active = ? ORDER BY name LIMIT 50

9. 数据库设计模式

9.1 Repository 模式

abstract type AbstractRepository end

struct UserRepository <: AbstractRepository
    db::SQLite.DB
end

function find_by_id(repo::UserRepository, id::Int)
    result = SQLite.execute(repo.db, "SELECT * FROM users WHERE id = ?", [id])
    df = DataFrame(result)
    isempty(df) ? nothing : first(df)
end

function find_all(repo::UserRepository; limit=100, offset=0)
    SQLite.execute(repo.db, "SELECT * FROM users LIMIT ? OFFSET ?", [limit, offset])
    DataFrame(result)
end

function save(repo::UserRepository, user::NamedTuple)
    SQLite.execute(repo.db,
        "INSERT OR REPLACE INTO users (id, name, email, age) VALUES (?, ?, ?, ?)",
        [user.id, user.name, user.email, user.age])
end

# 使用
repo = UserRepository(db)
user = find_by_id(repo, 1)

9.2 Unit of Work 模式

mutable struct UnitOfWork
    db::SQLite.DB
    pending_inserts::Vector{Tuple{String, Vector{Any}}}
    pending_updates::Vector{Tuple{String, Vector{Any}}}
    pending_deletes::Vector{Tuple{String, Vector{Any}}}
end

UnitOfWork(db) = UnitOfWork(db, [], [], [])

function register_insert!(uow::UnitOfWork, table::String, data::Vector{Any})
    cols = join(keys(data), ", ")
    placeholders = join(["?" for _ in data], ", ")
    push!(uow.pending_inserts, ("INSERT INTO $table ($cols) VALUES ($placeholders)", collect(values(data))))
end

function commit!(uow::UnitOfWork)
    SQLite.transaction(uow.db) do
        for (sql, params) in uow.pending_inserts
            SQLite.execute(uow.db, sql, params)
        end
        for (sql, params) in uow.pending_updates
            SQLite.execute(uow.db, sql, params)
        end
        for (sql, params) in uow.pending_deletes
            SQLite.execute(uow.db, sql, params)
        end
    end
    empty!(uow.pending_inserts)
    empty!(uow.pending_updates)
    empty!(uow.pending_deletes)
end

10. 实际案例:数据采集与存储系统

module DataCollector

using SQLite, DataFrames, Dates

struct Collector
    db::SQLite.DB
end

function Collector(db_path::String="collector.db")
    db = SQLite.DB(db_path)
    SQLite.execute(db, """
        CREATE TABLE IF NOT EXISTS readings (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            sensor_id TEXT NOT NULL,
            value REAL NOT NULL,
            unit TEXT,
            timestamp TEXT DEFAULT CURRENT_TIMESTAMP
        )
    """)
    SQLite.execute(db, "CREATE INDEX IF NOT EXISTS idx_sensor ON readings(sensor_id)")
    SQLite.execute(db, "CREATE INDEX IF NOT EXISTS idx_ts ON readings(timestamp)")
    Collector(db)
end

function record!(c::Collector, sensor_id::String, value::Float64, unit::String="")
    SQLite.execute(c.db,
        "INSERT INTO readings (sensor_id, value, unit) VALUES (?, ?, ?)",
        [sensor_id, value, unit])
end

function batch_record!(c::Collector, records::Vector{NamedTuple})
    SQLite.transaction(c.db) do
        for r in records
            SQLite.execute(c.db,
                "INSERT INTO readings (sensor_id, value, unit) VALUES (?, ?, ?)",
                [r.sensor_id, r.value, r.unit])
        end
    end
end

function query_range(c::Collector, sensor_id::String, from::DateTime, to::DateTime)
    SQLite.execute(c.db, """
        SELECT * FROM readings
        WHERE sensor_id = ? AND timestamp BETWEEN ? AND ?
        ORDER BY timestamp
    """, [sensor_id, string(from), string(to)])
    DataFrame(result)
end

function statistics(c::Collector, sensor_id::String)
    SQLite.execute(c.db, """
        SELECT
            COUNT(*) as count,
            AVG(value) as mean,
            MIN(value) as min,
            MAX(value) as max
        FROM readings WHERE sensor_id = ?
    """, [sensor_id])
    DataFrame(result)
end

end

# 使用
using .DataCollector, Dates
collector = Collector("sensors.db")

DataCollector.record!(collector, "temp_01", 23.5, "°C")
DataCollector.record!(collector, "temp_01", 24.1, "°C")
DataCollector.record!(collector, "humidity_01", 65.2, "%")

stats = DataCollector.statistics(collector, "temp_01")
println(stats)

各数据库驱动对比

驱动 数据库 特点 推荐场景
SQLite.jl SQLite 轻量、无需服务器 嵌入式、原型开发
LibPQ.jl PostgreSQL 功能完整、连接池 生产环境、大数据
MySQL.jl MySQL 广泛使用 Web 应用
ODBC.jl 任意 通用接口 遗留系统集成
SearchLight.jl 多种 ORM 抽象 Genie.jl 项目

业务场景

场景一:IoT 数据存储

使用 SQLite.jl 存储传感器数据,批量写入+事务保证性能。定期查询统计,导出 CSV 给分析团队。

场景二:Web 应用后端

使用 PostgreSQL + LibPQ.jl 配合 Genie.jl 构建 Web 应用。连接池管理数据库连接,事务保证数据一致性。

场景三:数据迁移工具

编写 Julia 脚本从旧数据库(ODBC)读取数据,清洗后写入新数据库(PostgreSQL)。利用 Julia 的数据处理能力和多驱动支持。


总结

主题 关键要点
SQLite.jl 轻量嵌入式,SQLite.execute(db, sql, params)
LibPQ.jl PostgreSQL 专业驱动
ODBC.jl 通用接口,支持多种数据库
SearchLight.jl ORM,与 Genie.jl 集成
事务 SQLite.transaction(db) do ... end
连接池 手动实现或使用 ConnectionPools.jl
安全 始终使用参数化查询,防止 SQL 注入

扩展阅读