强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

Perl 完全指南 / 第 19 章:并发与异步编程

第 19 章:并发与异步编程

“并发是现代应用的必备能力”

Perl 支持多种并发模型:多线程、多进程、事件驱动和协程。本章介绍各种方案及其适用场景。


19.1 并发模型概览

模型模块特点适用场景
多线程threads共享内存,轻量CPU 密集型
多进程fork独立内存,稳定任务隔离
事件驱动IO::Async / AnyEvent单线程非阻塞I/O 密集型
协程Coro轻量级切换高并发连接
预分叉Parallel::Prefork进程池Web 服务器

19.2 fork — 多进程

use strict;
use warnings;
use POSIX ":sys_wait_h";

# 基本 fork
my $pid = fork();

if (!defined $pid) {
    die "fork 失败: $!\n";
} elsif ($pid == 0) {
    # 子进程
    print "子进程 PID: $$\n";
    sleep 2;
    print "子进程完成\n";
    exit(0);
} else {
    # 父进程
    print "父进程 PID: $$, 子进程 PID: $pid\n";
    waitpid($pid, 0);
    print "子进程已退出\n";
}

管道通信

use strict;
use warnings;

# 创建管道
pipe(my $reader, my $writer);

my $pid = fork();

if ($pid == 0) {
    # 子进程 — 写入
    close $reader;
    print $writer "Hello from child!\n";
    close $writer;
    exit(0);
} else {
    # 父进程 — 读取
    close $writer;
    my $msg = <$reader>;
    chomp $msg;
    print "收到消息: $msg\n";
    close $reader;
    waitpid($pid, 0);
}

并行处理任务

use strict;
use warnings;
use POSIX ":sys_wait_h";

my @tasks = (1, 2, 3, 4, 5, 6, 7, 8);
my $max_workers = 4;
my @pids;

for my $task (@tasks) {
    # 等待空闲
    while (scalar @pids >= $max_workers) {
        my $finished = waitpid(-1, 0);
        @pids = grep { $_ != $finished } @pids;
    }

    my $pid = fork();
    if ($pid == 0) {
        # 子进程处理任务
        print "处理任务 $task (PID $$)\n";
        sleep(1 + int(rand(3)));
        print "任务 $task 完成\n";
        exit(0);
    } else {
        push @pids, $pid;
    }
}

# 等待所有子进程
while (my $pid = waitpid(-1, 0)) {
    last if $pid == -1;
}

print "所有任务完成\n";

19.3 threads — 多线程

use strict;
use warnings;
use threads;
use threads::shared;

# 共享变量
my $counter :shared = 0;

sub worker {
    my ($id) = @_;
    for (1..5) {
        {
            lock($counter);
            $counter++;
            print "线程 $id: counter = $counter\n";
        }
        threads->yield();
    }
}

# 创建线程
my @threads;
for my $i (1..4) {
    push @threads, threads->create(\&worker, $i);
}

# 等待所有线程
for my $t (@threads) {
    $t->join();
}

print "最终 counter = $counter\n";

线程安全

use threads;
use threads::shared;
use Thread::Queue;

# 使用队列进行线程间通信
my $queue = Thread::Queue->new();

# 生产者
my $producer = threads->create(sub {
    for my $i (1..10) {
        $queue->enqueue("task_$i");
        print "生产: task_$i\n";
    }
    $queue->enqueue(undef);  # 结束信号
});

# 消费者
my @consumers;
for my $id (1..3) {
    push @consumers, threads->create(sub {
        while (my $item = $queue->dequeue()) {
            print "消费者 $id: 处理 $item\n";
            sleep 1;
        }
    });
}

$producer->join();
$_->join() for @consumers;

19.4 AnyEvent — 事件驱动

use strict;
use warnings;
use AnyEvent;
use AnyEvent::HTTP;

# 基本事件循环
my $cv = AnyEvent->condvar;

# 定时器
my $timer = AnyEvent->timer(
    after    => 1,
    interval => 2,
    cb       => sub { print "每 2 秒执行\n" },
);

# 并发 HTTP 请求
my @urls = (
    "https://httpbin.org/get",
    "https://httpbin.org/ip",
    "https://httpbin.org/uuid",
);

for my $url (@urls) {
    http_get $url, sub {
        my ($body, $headers) = @_;
        print "完成: $url (Status: $headers->{Status})\n";
        $cv->send if --$count == 0;
    };
}

my $count = scalar @urls;
$cv->recv;
print "所有请求完成\n";

19.5 IO::Async — 现代异步框架

use strict;
use warnings;
use IO::Async::Loop;
use IO::Async::Timer::Periodic;

my $loop = IO::Async::Loop->new;

# 周期定时器
my $timer = IO::Async::Timer::Periodic->new(
    interval => 3,
    on_tick  => sub { print "定时任务: " . localtime . "\n" },
);
$timer->start;
$loop->add($timer);

# 子进程管理
$loop->open_child(
    command => ["ping", "-c", "3", "localhost"],
    on_stdout => sub {
        my ($stream, $line) = @_;
        print "PING: $line";
    },
    on_exit => sub {
        my ($pid, $exitcode) = @_;
        print "ping 退出: $exitcode\n";
        $loop->stop;
    },
);

$loop->run;

19.6 Parallel::ForkManager — 进程池

use strict;
use warnings;
use Parallel::ForkManager;

my $pm = Parallel::ForkManager->new(4);  # 最多 4 个进程

# 回调
$pm->run_on_finish(sub {
    my ($pid, $exit_code, $ident) = @_;
    print "进程 $ident 完成 (exit: $exit_code)\n";
});

for my $task (1..20) {
    $pm->start($task) and next;   # 父进程继续
    
    # 子进程
    print "处理任务 $task (PID $$)\n";
    sleep int(rand(3)) + 1;
    
    $pm->finish($task);           # 子进程退出
}

$pm->wait_all_children;
print "所有任务完成\n";

19.7 业务场景:并发网页爬虫

#!/usr/bin/env perl
use strict;
use warnings;
use AnyEvent;
use AnyEvent::HTTP;
use JSON::XS;
use Path::Tiny;

my @urls = (
    "https://httpbin.org/get",
    "https://httpbin.org/ip",
    "https://httpbin.org/user-agent",
    "https://httpbin.org/headers",
    "https://httpbin.org/uuid",
);

my $output_dir = path("crawl_results");
$output_dir->mkpath;

my $cv = AnyEvent->condvar(scalar @urls);

my $completed = 0;
my $total = scalar @urls;

for my $url (@urls) {
    http_get $url, timeout => 10, sub {
        my ($body, $headers) = @_;
        $completed++;

        if ($headers->{Status} == 200) {
            my $filename = $url;
            $filename =~ s{.*/}{};
            $output_dir->child("$filename.json")->spew_utf8($body);
            printf "[%d/%d] %s → 成功 (%d bytes)\n",
                $completed, $total, $url, length($body);
        } else {
            printf "[%d/%d] %s → 失败 (%s)\n",
                $completed, $total, $url, $headers->{Status};
        }

        $cv->send;
    };
}

$cv->recv;
print "爬取完成!共 $completed 个页面\n";

19.8 并发模型选择指南

需要并发?
  │
  ├── I/O 密集型(网络请求、文件操作)
  │   ├── 协议层 → IO::Async / AnyEvent
  │   └── 应用层 → Mojo::IOLoop(Mojolicious 内置)
  │
  ├── CPU 密集型(计算、数据处理)
  │   ├── 进程隔离 → Parallel::ForkManager
  │   └── 共享内存 → threads + Thread::Queue
  │
  └── 任务队列
      ├── 简单 → Parallel::ForkManager
      └── 复杂 → Minion(Mojolicious 任务队列)

本章小结

要点内容
fork多进程,独立内存,Unix 标准
threads多线程,共享内存,需注意锁
AnyEvent事件驱动,轻量级
IO::Async现代异步框架
Parallel::ForkManager进程池管理
Thread::Queue线程安全队列
进程间通信管道、信号、共享内存

练习

  1. 使用 fork 实现并行文件下载器
  2. 使用 threads + Thread::Queue 实现生产者-消费者模型
  3. 使用 AnyEvent 实现并发 HTTP 请求
  4. 使用 Parallel::ForkManager 处理 100 个任务(限制 8 个并发)
  5. 比较 fork 和 threads 的性能差异

扩展阅读