Erlang/OTP 完全指南 / 09 - 并发编程
第 09 章:并发编程
并发(Concurrency)是 Erlang 的灵魂。本章学习 spawn、send、receive、进程链接与监控——构建可靠并发系统的基础。
9.1 Erlang 并发模型
9.1.1 Actor 模型
Erlang 基于 Actor 模型:每个进程是一个独立的 Actor,拥有自己的状态,通过消息传递通信。
┌──────────┐ 消息 ┌──────────┐
│ 进程 A │ ────────→ │ 进程 B │
│ (状态) │ │ (状态) │
│ (邮箱) │ ←──────── │ (邮箱) │
└──────────┘ 消息 └──────────┘
| 概念 | 说明 |
|---|---|
| 进程(Process) | 独立的执行单元,约 2KB 内存 |
| 消息(Message) | 进程间通信的唯一方式 |
| 邮箱(Mailbox) | 每个进程有独立的消息队列 |
| 无共享内存 | 进程间不共享任何状态 |
9.1.2 进程 vs 线程
| 特性 | Erlang 进程 | OS 线程 | OS 进程 |
|---|---|---|---|
| 创建开销 | ~1μs, 2KB | ~10μs, 1MB | ~1ms, 数MB |
| 最大数量 | 数百万 | 数千 | 数百 |
| 调度器 | BEAM | OS 内核 | OS 内核 |
| 通信方式 | 消息传递 | 共享内存 | IPC |
| GC | 每进程独立 | 全局 | 独立 |
| 崩溃影响 | 只影响自身 | 可能影响整个进程 | 只影响自身 |
9.2 创建进程
9.2.1 spawn/1
%% 最简单的进程
Pid = spawn(fun() ->
io:format("Hello from process ~p!~n", [self()])
end).
%% Hello from process <0.123.0>!
%% spawn 返回 PID(进程标识符)
is_pid(Pid). %% true
9.2.2 spawn/3 — 模块函数
%% worker.erl
-module(worker).
-export([loop/0, start/0]).
start() ->
spawn(?MODULE, loop, []).
loop() ->
receive
stop ->
io:format("Worker stopping~n");
{work, Task} ->
io:format("Working on: ~p~n", [Task]),
loop()
end.
%% 使用
1> Pid = worker:start().
<0.123.0>
2> Pid ! {work, "process data"}.
Working on: process data
3> Pid ! stop.
Worker stopping
9.2.3 spawn 的变体
| 函数 | 说明 |
|---|---|
spawn(Fun) | 创建进程执行 Fun |
spawn(Module, Function, Args) | 创建进程调用 M:F(A) |
spawn(Node, Fun) | 在远程节点创建进程 |
spawn_link(Fun) | 创建并链接 |
spawn_monitor(Fun) | 创建并监控 |
spawn_opt(Fun, Opts) | 带选项创建 |
9.3 消息传递
9.3.1 发送消息 (!)
%% 语法:Pid ! Message
%% Message 可以是任意 Erlang 值
Pid ! hello.
Pid ! {data, [1, 2, 3]}.
Pid ! {self(), "request"}.
%% 发送给自己
self() ! ping.
%% 发送的返回值是消息本身
Msg = Pid ! hello.
%% Msg = hello
9.3.2 接收消息 (receive)
%% 基本 receive
receive
Pattern1 -> Action1;
Pattern2 -> Action2;
...
end.
%% 带超时的 receive
receive
{reply, Data} -> Data
after 5000 ->
timeout
end.
9.3.3 消息匹配
%% 匹配特定消息
receive
{hello, Name} ->
io:format("Hello, ~s!~n", [Name]);
{bye, Name} ->
io:format("Goodbye, ~s!~n", [Name])
end.
%% 带守卫的匹配
receive
{data, N} when is_integer(N), N > 0 ->
process(N);
{data, N} when is_integer(N) ->
io:format("Invalid: ~p~n", [N])
end.
%% 通配符匹配
receive
_ -> ok %% 匹配任何消息(不推荐,丢弃有用信息)
end.
%% 绑定变量
receive
{request, From, Ref, Data} ->
%% From = 发送方 PID
%% Ref = 请求引用
%% Data = 请求数据
From ! {response, Ref, process(Data)}
end.
9.3.4 超时处理
%% 普通超时
wait_for_message() ->
receive
{ready, Pid} -> {ok, Pid}
after 5000 ->
{error, timeout}
end.
%% 无限等待(不设置 after)
wait_forever() ->
receive
stop -> ok
end.
%% 零超时(立即返回,用于非阻塞检查)
check_mailbox() ->
receive
Msg -> {got, Msg}
after 0 ->
empty
end.
9.3.5 消息队列清理
%% 清空邮箱中的所有消息
flush() ->
receive
_Msg ->
%% io:format("Flushing: ~p~n", [_Msg]),
flush()
after 0 ->
ok
end.
%% 使用 erlang:flush() 在 Shell 中清空
9.4 请求-响应模式
9.4.1 同步调用
%% server.erl
-module(server).
-export([start/0, call/2, loop/0]).
start() ->
spawn(?MODULE, loop, []).
%% 同步调用:发送请求并等待响应
call(Pid, Request) ->
Ref = make_ref(),
Pid ! {self(), Ref, Request},
receive
{Ref, Response} -> Response
after 5000 ->
{error, timeout}
end.
loop() ->
receive
{From, Ref, {echo, Msg}} ->
From ! {Ref, Msg},
loop();
{From, Ref, {add, A, B}} ->
From ! {Ref, A + B},
loop();
{From, Ref, stop} ->
From ! {Ref, ok}
end.
$ erl
1> Pid = server:start().
<0.123.0>
2> server:call(Pid, {echo, "hello"}).
"hello"
3> server:call(Pid, {add, 3, 4}).
7
4> server:call(Pid, stop).
ok
💡 关键:使用
make_ref()生成唯一标识,确保响应匹配正确的请求。
9.4.2 异步调用
%% 异步:只发送,不等待响应
cast(Pid, Msg) ->
Pid ! {cast, Msg},
ok.
%% 服务端处理
loop() ->
receive
{cast, Msg} ->
handle_cast(Msg),
loop();
{From, Ref, Msg} ->
From ! {Ref, handle_call(Msg)},
loop()
end.
9.5 进程链接(Link)
9.5.1 基本概念
%% 链接两个进程
Pid = spawn_link(fun() ->
receive stop -> ok end
end).
%% 当一个进程崩溃时,链接的进程也会收到退出信号
%% 默认行为:双向崩溃(双向传播)
9.5.2 链接的行为
进程 A ←链接→ 进程 B
当 A 崩溃:
- B 收到退出信号 {'EXIT', PidA, Reason}
- 默认:B 也随之崩溃
当 B 崩溃:
- A 收到退出信号 {'EXIT', PidB, Reason}
- 默认:A 也随之崩溃
9.5.3 trap_exit
%% 设置 trap_exit 可以捕获退出信号
process_flag(trap_exit, true),
Pid = spawn_link(fun() ->
receive stop -> ok end
end),
%% 当链接的进程崩溃时,收到消息而非崩溃
receive
{'EXIT', Pid, Reason} ->
io:format("Process ~p exited: ~p~n", [Pid, Reason])
end.
9.6 进程监控(Monitor)
9.6.1 基本使用
%% 监控另一个进程
Pid = spawn(fun() ->
receive stop -> ok end
end),
Ref = monitor(process, Pid),
%% 当被监控的进程退出时,收到消息
%% 注意:不需要 trap_exit!
receive
{'DOWN', Ref, process, Pid, Reason} ->
io:format("Process ~p is down: ~p~n", [Pid, Reason])
end.
9.6.2 Link vs Monitor
| 特性 | Link | Monitor |
|---|---|---|
| 方向 | 双向 | 单向 |
| 需要 trap_exit | 是(想要处理退出信号时) | 否 |
| 消息格式 | {'EXIT', Pid, Reason} | {'DOWN', Ref, process, Pid, Reason} |
| 自动解除 | 是(进程退出后) | 是(进程退出后) |
| 多个 | 一对一直接链接 | 一个进程可监控多个目标 |
| 推荐场景 | 紧密耦合的进程 | 松耦合的监控 |
💡 经验法则:大多数情况使用 Monitor,只有在"共生死"的场景使用 Link。
9.7 进程注册
9.7.1 注册名称
%% 注册进程(给 PID 起名字)
Pid = spawn(fun() ->
receive stop -> ok end
end),
register(my_process, Pid).
%% 使用名字发送消息
my_process ! hello.
%% 查找注册的 PID
whereis(my_process). %% Pid
%% 取消注册
unregister(my_process).
%% 已注册的进程列表
registered().
9.7.2 注册的限制
%% 每个名字只能注册一个进程
register(name1, Pid1). %% ok
register(name1, Pid2). %% 错误!badarg
%% 全局唯一
%% 只有已注册才能用名字发送消息
unregistered_name ! msg. %% 错误!badarg
9.8 进程信息
%% 当前进程信息
self(). %% PID
process_info(self()). %% 详细信息列表
process_info(self(), memory). %% 内存使用
process_info(self(), message_queue_len). %% 消息队列长度
process_info(self(), current_function). %% 当前执行的函数
process_info(self(), reductions). %% 执行步数
%% 所有进程
erlang:processes(). %% 所有进程 PID 列表
erlang:system_info(process_count). %% 进程数量
9.9 实战:聊天室
%% chat_room.erl
-module(chat_room).
-export([start/0, join/2, leave/2, send/3, loop/1]).
start() ->
spawn(?MODULE, loop, [#{name => "General", members => #{}}]).
join(RoomPid, UserPid) ->
Ref = make_ref(),
RoomPid ! {join, self(), Ref, UserPid},
receive {Ref, Result} -> Result end.
leave(RoomPid, UserPid) ->
Ref = make_ref(),
RoomPid ! {leave, self(), Ref, UserPid},
receive {Ref, Result} -> Result end.
send(RoomPid, UserPid, Message) ->
RoomPid ! {message, UserPid, Message},
ok.
loop(#{members := Members} = State) ->
receive
{join, From, Ref, UserPid} ->
monitor(process, UserPid),
NewMembers = Members#{UserPid => true},
broadcast(Members, {user_joined, UserPid}),
From ! {Ref, ok},
loop(State#{members => NewMembers});
{leave, From, Ref, UserPid} ->
NewMembers = maps:remove(UserPid, Members),
broadcast(NewMembers, {user_left, UserPid}),
From ! {Ref, ok},
loop(State#{members => NewMembers});
{message, UserPid, Message} ->
broadcast(maps:remove(UserPid, Members),
{chat, UserPid, Message}),
loop(State);
{'DOWN', _Ref, process, UserPid, _Reason} ->
NewMembers = maps:remove(UserPid, Members),
broadcast(NewMembers, {user_left, UserPid}),
loop(State#{members => NewMembers})
end.
broadcast(Members, Msg) ->
maps:foreach(fun(Pid, _) -> Pid ! Msg end, Members).
9.10 实战:并发任务执行器
%% task_executor.erl
-module(task_executor).
-export([parallel_map/2, parallel_filter/2, pmap/2]).
%% 并行 map
-spec pmap(fun((A) -> B), [A]) -> [B].
pmap(Fun, List) ->
Parent = self(),
Pids = [spawn(fun() -> Parent ! {self(), Fun(X)} end) || X <- List],
[receive {Pid, Result} -> Result end || Pid <- Pids].
%% 并行 map(带超时)
-spec parallel_map(fun((A) -> B), [A]) -> [B | {error, timeout}].
parallel_map(Fun, List) ->
Parent = self(),
Pids = [spawn(fun() -> Parent ! {self(), Fun(X)} end) || X <- List],
[receive_result(Pid) || Pid <- Pids].
receive_result(Pid) ->
receive
{Pid, Result} -> Result
after 10000 ->
{error, timeout}
end.
%% 并行过滤
-spec parallel_filter(fun((A) -> boolean()), [A]) -> [A].
parallel_filter(Pred, List) ->
Pairs = pmap(fun(X) -> {X, Pred(X)} end, List),
[X || {X, true} <- Pairs].
$ erl
1> c(task_executor).
{ok, task_executor}
2> task_executor:pmap(fun(X) -> X * X end, [1,2,3,4,5]).
[1,4,9,16,25]
3> task_executor:parallel_filter(fun(X) -> X rem 2 =:= 0 end, [1,2,3,4,5,6]).
[2,4,6]
9.11 注意事项
⚠️ 常见陷阱
| 陷阱 | 说明 |
|---|---|
| 消息不可达 | 发送给已死进程的消息静默丢失 |
| 邮箱溢出 | 快速发送者可能导致接收者邮箱暴涨 |
| 死锁 | 两个进程互相等待对方消息 |
| 消息顺序 | 同一对进程的消息保证有序,但不同进程间的顺序不保证 |
| 内存泄漏 | 进程积累未处理的消息 |
💡 最佳实践
- 用
make_ref()标识请求-响应配对 - 设置合理的
after超时 - 使用 Monitor 而非 Link 监控外部进程
- 注册常驻进程的名字,方便查找
- 注意邮箱大小,必要时丢弃旧消息
- 避免在循环中积累未处理的消息
9.12 扩展阅读
- 📖 Erlang Reference Manual - Processes
- 📖 Learn You Some Erlang - The Hitchhiker’s Guide to Concurrency
- 📖 Joe Armstrong’s thesis — 可靠分布式系统
上一章:08 - 元组与 Map 下一章:10 - OTP 基础