强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

Vala 语言入门教程 / 07 - 异步编程

第 7 章:异步编程

异步编程是现代应用开发的核心能力。Vala 提供了优雅的 async/await 语法,让异步代码看起来像同步代码一样清晰。


7.1 为什么需要异步编程

7.1.1 同步 vs 异步

同步执行:
  任务A ──────────────→ 任务B ──────────────→ 任务C
  |  阻塞 5 秒  |        |  阻塞 3 秒  |
  总耗时: 8 秒

异步执行:
  任务A ──────────────→
  任务B ──────────→
  任务C ──────────→
  总耗时: max(5, 3) ≈ 5 秒

7.1.2 GLib 主循环(Main Loop)

Vala 的异步编程基于 GLib 主循环(Main Loop)。主循环是一个事件循环,负责:

  • 处理 I/O 事件
  • 执行定时器回调
  • 分发信号
  • 运行异步任务
┌───────────────────────────────────┐
│          GLib Main Loop           │
│  ┌─────────────────────────────┐  │
│  │  1. 检查 I/O 事件           │  │
│  │  2. 执行到期的定时器         │  │
│  │  3. 分发 idle 回调           │  │
│  │  4. 处理异步完成回调         │  │
│  │  5. 等待下一个事件           │  │
│  └────────── 循环 ──────────────┘  │
└───────────────────────────────────┘

7.2 async/await 基础

7.2.1 定义异步方法

// 异步方法使用 async 关键字
public async void fetch_data () {
    print ("开始获取数据...\n");

    // 模拟耗时操作
    // yield 关键字将控制权交还给主循环
    yield do_some_work ();

    print ("数据获取完成!\n");
}

// 模拟异步工作
public async void do_some_work () {
    print ("  工作中...\n");
    // 在实际应用中,这里会是网络请求、文件 I/O 等
}

void main () {
    // 创建主循环
    var loop = new MainLoop ();

    // 启动异步任务
    fetch_data.begin ((obj, res) => {
        fetch_data.end (res);
        print ("回调:异步任务完成\n");
        loop.quit ();
    });

    // 运行主循环
    loop.run ();
}

输出

开始获取数据...
  工作中...
数据获取完成!
回调:异步任务完成

7.2.2 async/await 语法详解

// async 方法可以有返回值
public async int calculate_async (int x, int y) {
    // 模拟异步计算
    int result = x + y;
    return result;
}

// 使用 await 等待异步结果
public async void process () {
    print ("计算开始\n");

    // 使用 yield(Vala 的 await)等待结果
    int result = yield calculate_async (10, 20);
    print ("计算结果: %d\n", result);

    // 可以连续等待多个异步操作
    int r1 = yield calculate_async (1, 2);
    int r2 = yield calculate_async (3, 4);
    print ("r1=%d, r2=%d, 总和=%d\n", r1, r2, r1 + r2);
}

void main () {
    var loop = new MainLoop ();

    process.begin ((obj, res) => {
        process.end (res);
        loop.quit ();
    });

    loop.run ();
}

7.2.3 异步方法与错误处理

errordomain NetworkError {
    TIMEOUT,
    CONNECTION_REFUSED,
    NOT_FOUND
}

// 异步方法可以抛出错误
public async string fetch_url (string url) throws NetworkError {
    print ("请求: %s\n", url);

    if (url.contains ("timeout")) {
        throw new NetworkError.TIMEOUT ("请求超时");
    }

    if (url.contains ("refused")) {
        throw new NetworkError.CONNECTION_REFUSED ("连接被拒绝");
    }

    return "响应内容: OK";
}

// 使用 try-catch 处理异步错误
public async void safe_fetch () {
    try {
        string result = yield fetch_url ("https://example.com");
        print ("成功: %s\n", result);
    } catch (NetworkError e) {
        printerr ("网络错误: %s\n", e.message);
    }
}

void main () {
    var loop = new MainLoop ();

    safe_fetch.begin ((obj, res) => {
        safe_fetch.end (res);
        loop.quit ();
    });

    loop.run ();
}

7.3 回调风格(Callbacks)

async/await 出现之前,Vala 使用回调风格进行异步编程:

7.3.1 基本回调

// 回调委托
delegate void AsyncCallback<T> (T result);

// 使用回调的异步方法
public void fetch_with_callback (AsyncCallback<string> callback) {
    // 模拟异步操作
    GLib.Timeout.add (1000, () => {
        callback ("回调结果: Hello");
        return GLib.Source.REMOVE;
    });
}

void main () {
    var loop = new MainLoop ();

    fetch_with_callback ((result) => {
        print ("%s\n", result);
        loop.quit ();
    });

    loop.run ();
}

7.3.2 GIO 风格回调

// GIO 风格:begin/end 模式
public void async_operation.begin (GLib.AsyncReadyCallback? callback) {
    // 启动异步操作
    GLib.Timeout.add (500, () => {
        callback (this, null);
        return GLib.Source.REMOVE;
    });
}

public string async_operation.end (GLib.AsyncResult result) {
    return "操作完成";
}

void main () {
    var loop = new MainLoop ();

    // 使用 GIO 风格的回调
    var obj = new Object ();
    obj.begin ((source, result) => {
        // 获取结果
        print ("异步操作完成\n");
        loop.quit ();
    });

    loop.run ();
}

7.4 GIO 异步操作

GIO 是 GLib 的 I/O 库,提供了丰富的异步操作。

7.4.1 异步文件读取

public async string read_file_async (string path) throws GLib.Error {
    // 打开文件
    var file = GLib.File.new_for_path (path);

    // 异步读取
    var stream = yield file.read_async (Priority.DEFAULT, null);

    // 异步读取全部内容
    var data_stream = new GLib.DataInputStream (stream);
    var contents = new StringBuilder ();

    string line;
    size_t length;
    while ((line = yield data_stream.read_line_async (
        Priority.DEFAULT, null, out length)) != null) {
        contents.append (line);
        contents.append_c ('\n');
    }

    return contents.str;
}

void main () {
    var loop = new MainLoop ();

    read_file_async.begin ("/etc/hostname", (obj, res) => {
        try {
            string content = read_file_async.end (res);
            print ("文件内容:\n%s\n", content);
        } catch (GLib.Error e) {
            printerr ("读取失败: %s\n", e.message);
        }
        loop.quit ();
    });

    loop.run ();
}

7.4.2 异步文件写入

public async void write_file_async (string path, string content)
    throws GLib.Error
{
    var file = GLib.File.new_for_path (path);

    // 异步创建文件
    var stream = yield file.replace_async (
        null,                    // etag
        false,                   // make_backup
        GLib.FileCreateFlags.NONE,
        Priority.DEFAULT,
        null                     // cancellable
    );

    // 异步写入
    yield stream.write_async (
        content.data,
        Priority.DEFAULT,
        null
    );

    // 异步关闭
    yield stream.close_async (Priority.DEFAULT, null);

    print ("文件写入完成: %s\n", path);
}

void main () {
    var loop = new MainLoop ();

    write_file_async.begin ("/tmp/test.txt", "Hello, Vala!", (obj, res) => {
        try {
            write_file_async.end (res);
        } catch (GLib.Error e) {
            printerr ("写入失败: %s\n", e.message);
        }
        loop.quit ();
    });

    loop.run ();
}

7.4.3 异步网络请求

public async string http_get (string url) throws GLib.Error {
    var session = new Soup.Session ();

    var message = new Soup.Message ("GET", url);

    // 异步发送请求
    GLib.Bytes response = yield session.send_and_read_async (
        message,
        Priority.DEFAULT,
        null
    );

    if (message.status_code != 200) {
        throw new GLib.IOError.FAILED (
            "HTTP 错误: %u".printf (message.status_code)
        );
    }

    return (string) response.get_data ();
}

void main () {
    var loop = new MainLoop ();

    print ("发送 HTTP 请求...\n");

    http_get.begin ("https://httpbin.org/get", (obj, res) => {
        try {
            string body = http_get.end (res);
            print ("响应:\n%s\n", body.substring (0, 200));
        } catch (GLib.Error e) {
            printerr ("请求失败: %s\n", e.message);
        }
        loop.quit ();
    });

    loop.run ();
}

💡 使用 Soup 库需要在编译时链接 --pkg libsoup-3.0


7.5 超时和取消

7.5.1 异步超时

public async string fetch_with_timeout (string url, uint timeout_ms)
    throws GLib.Error
{
    var cancellable = new GLib.Cancellable ();

    // 设置超时
    GLib.Timeout.add (timeout_ms, () => {
        cancellable.cancel ();
        return GLib.Source.REMOVE;
    });

    // 使用 cancellable
    var file = GLib.File.new_for_uri (url);
    var stream = yield file.read_async (
        Priority.DEFAULT,
        cancellable
    );

    return "请求完成";
}

void main () {
    var loop = new MainLoop ();

    fetch_with_timeout.begin ("https://example.com", 5000, (obj, res) => {
        try {
            string result = fetch_with_timeout.end (res);
            print ("%s\n", result);
        } catch (GLib.Error e) {
            if (e is GLib.IOError.CANCELLED) {
                printerr ("请求超时!\n");
            } else {
                printerr ("错误: %s\n", e.message);
            }
        }
        loop.quit ();
    });

    loop.run ();
}

7.5.2 手动取消

public async void long_running_task (GLib.Cancellable? cancellable = null)
    throws GLib.Error
{
    for (int i = 0; i < 10; i++) {
        // 检查是否被取消
        if (cancellable != null && cancellable.is_cancelled ()) {
            throw new GLib.IOError.CANCELLED ("任务被取消");
        }

        print ("处理步骤 %d/10\n", i + 1);
        yield do_step_async ();
    }

    print ("任务完成!\n");
}

public async void do_step_async () {
    // 模拟工作
    GLib.usleep (500000);  // 0.5 秒
}

void main () {
    var loop = new MainLoop ();
    var cancellable = new GLib.Cancellable ();

    // 启动任务
    long_running_task.begin (cancellable, (obj, res) => {
        try {
            long_running_task.end (res);
        } catch (GLib.Error e) {
            printerr ("任务失败: %s\n", e.message);
        }
        loop.quit ();
    });

    // 3 秒后取消任务
    GLib.Timeout.add (3000, () => {
        print ("正在取消任务...\n");
        cancellable.cancel ();
        return GLib.Source.REMOVE;
    });

    loop.run ();
}

7.6 并行异步操作

7.6.1 并行执行多个任务

public async int fetch_value (string name, int delay_ms) {
    print ("[%s] 开始获取\n", name);
    GLib.usleep (delay_ms * 1000);
    print ("[%s] 获取完成\n", name);
    return delay_ms;
}

public async void parallel_tasks () {
    var results = new int[3];

    // 并行启动多个异步任务
    var task1 = fetch_value.begin ("任务A", 1000, (obj, res) => {
        results[0] = fetch_value.end (res);
    });

    var task2 = fetch_value.begin ("任务B", 2000, (obj, res) => {
        results[1] = fetch_value.end (res);
    });

    var task3 = fetch_value.begin ("任务C", 1500, (obj, res) => {
        results[2] = fetch_value.end (res);
    });

    // 等待所有任务完成
    // 注意:在 Vala 中需要手动管理多个任务的完成
    GLib.usleep (3000000);  // 等待足够时间

    print ("结果: %d, %d, %d\n", results[0], results[1], results[2]);
}

void main () {
    var loop = new MainLoop ();

    parallel_tasks.begin ((obj, res) => {
        parallel_tasks.end (res);
        print ("所有并行任务完成\n");
        loop.quit ();
    });

    loop.run ();
}

7.6.2 顺序执行 vs 并行执行

public async void sequential () {
    print ("=== 顺序执行 ===\n");
    int r1 = yield fetch_value ("A", 1000);
    int r2 = yield fetch_value ("B", 1000);
    int r3 = yield fetch_value ("C", 1000);
    print ("总结果: %d\n", r1 + r2 + r3);
}

public async void parallel () {
    print ("=== 并行执行 ===\n");
    // 这里简化处理,实际需要更复杂的并发管理
    int r1 = yield fetch_value ("X", 1000);
    int r2 = yield fetch_value ("Y", 1000);
    int r3 = yield fetch_value ("Z", 1000);
    print ("总结果: %d\n", r1 + r2 + r3);
}

7.7 协程(Coroutines)

Vala 的 async/await 本质上是协程(Coroutines):

7.7.1 协程概念

协程是可以暂停和恢复的函数:

main()          async_func()
  |                   |
  |--- begin -------->|
  |                   | yield (暂停)
  |<-- 返回控制权 ----|
  |                   |
  | (继续运行)         | (等待)
  |                   |
  |<-- 完成回调 -------| (恢复)
  |                   |
  v                   v

7.7.2 生成器模式

// 使用 async 实现生成器风格
public async int? generate_fibonacci (int count) {
    int a = 0, b = 1;
    for (int i = 0; i < count; i++) {
        yield;  // 暂停,返回控制权
        int temp = a;
        a = b;
        b = temp + b;
        // 在实际实现中,这里会通过其他方式返回值
    }
    return a;
}

// 更实用的异步迭代
public class AsyncIterator<T> : Object {
    private T[] items;
    private int index = 0;

    public AsyncIterator (T[] items) {
        this.items = items;
    }

    public async T? next () {
        if (index >= items.length) {
            return null;
        }
        // 模拟异步延迟
        GLib.usleep (100000);  // 0.1 秒
        return items[index++];
    }

    public bool has_next () {
        return index < items.length;
    }
}

void main () {
    var loop = new MainLoop ();

    var numbers = new int[] {1, 2, 3, 4, 5};
    var iter = new AsyncIterator<int> (numbers);

    iterate_async.begin (iter, (obj, res) => {
        iterate_async.end (res);
        loop.quit ();
    });

    loop.run ();
}

public async void iterate_async (AsyncIterator<int> iter) {
    while (iter.has_next ()) {
        int? val = yield iter.next ();
        if (val != null) {
            print ("值: %d\n", val);
        }
    }
}

7.8 业务场景:异步下载管理器

// 下载状态
public enum DownloadState {
    PENDING,
    DOWNLOADING,
    COMPLETED,
    FAILED
}

// 下载任务
public class DownloadTask : Object {
    public string url { get; set; }
    public string dest_path { get; set; }
    public DownloadState state { get; set; default = DownloadState.PENDING; }
    public int progress { get; set; default = 0; }
    public string? error_message { get; set; }

    public signal void state_changed (DownloadState new_state);
    public signal void progress_updated (int percent);

    public DownloadTask (string url, string dest_path) {
        Object (url: url, dest_path: dest_path);
    }
}

// 下载管理器
public class DownloadManager : Object {
    private GLib.List<DownloadTask> queue = new GLib.List<DownloadTask> ();
    private int max_concurrent = 3;
    private int active_count = 0;

    public signal void task_completed (DownloadTask task);
    public signal void all_completed ();

    public void add_task (DownloadTask task) {
        queue.append (task);
    }

    public async void start_all () {
        print ("开始下载 %u 个任务\n", queue.length ());

        foreach (var task in queue) {
            if (task.state == DownloadState.PENDING) {
                yield download_task (task);
            }
        }

        all_completed ();
    }

    private async void download_task (DownloadTask task) {
        task.state = DownloadState.DOWNLOADING;
        task.state_changed (DownloadState.DOWNLOADING);

        print ("[下载] %s\n", task.url);

        try {
            // 模拟下载进度
            for (int i = 0; i <= 100; i += 10) {
                task.progress = i;
                task.progress_updated (i);
                GLib.usleep (100000);  // 模拟下载
            }

            task.state = DownloadState.COMPLETED;
            task.state_changed (DownloadState.COMPLETED);
            print ("[完成] %s\n", task.url);
            task_completed (task);
        } catch (GLib.Error e) {
            task.state = DownloadState.FAILED;
            task.error_message = e.message;
            task.state_changed (DownloadState.FAILED);
            printerr ("[失败] %s: %s\n", task.url, e.message);
        }
    }
}

void main () {
    var loop = new MainLoop ();
    var manager = new DownloadManager ();

    // 添加下载任务
    manager.add_task (new DownloadTask (
        "https://example.com/file1.zip", "/tmp/file1.zip"
    ));
    manager.add_task (new DownloadTask (
        "https://example.com/file2.zip", "/tmp/file2.zip"
    ));
    manager.add_task (new DownloadTask (
        "https://example.com/file3.zip", "/tmp/file3.zip"
    ));

    // 连接信号
    manager.task_completed.connect ((task) => {
        print ("✅ 任务完成: %s\n", task.url);
    });

    manager.all_completed.connect (() => {
        print ("\n🎉 所有下载完成!\n");
        loop.quit ();
    });

    // 开始下载
    manager.start_all.begin ();

    loop.run ();
}

7.9 注意事项

⚠️ 异步编程常见陷阱

  1. 必须有主循环:异步操作需要 GLib.MainLoop 运行
  2. yield 不是阻塞yield 将控制权交还给主循环,不是阻塞线程
  3. 错误传播:异步方法的错误通过 end() 方法抛出
  4. 生命周期管理:确保异步对象在回调执行期间仍然有效
  5. 嵌套异步:使用 yield 调用其他异步方法
  6. 回调顺序:回调可能在主循环的下一次迭代中执行

7.10 扩展阅读

资源链接
Vala 异步编程https://wiki.gnome.org/Projects/Vala/AsyncSamples
GIO 异步操作https://docs.gtk.org/gio/async.html
GLib 主循环https://docs.gtk.org/glib/main-loop.html
GLib 异步模式https://docs.gtk.org/gio/streaming.html

7.11 总结

要点说明
async定义异步方法
yield暂停异步方法,交还控制权
begin()启动异步操作
end()获取异步结果
主循环异步操作的运行基础
错误处理通过 end() 抛出
取消使用 GLib.Cancellable

下一章我们将学习如何使用 Vala 开发 GTK 应用。→ 第 8 章:GTK 应用开发