nanomsg / NNG 消息库完全教程 / 第 8 章:IPC 与进程间通信
8.1 IPC 概述
nanomsg / NNG 支持多种进程内和进程间通信方式,无需网络协议栈即可实现高效消息传递。
8.1.1 传输方式一览
| 传输 | 范围 | 地址格式 | 协议开销 | 适用场景 |
|---|---|---|---|---|
| inproc | 同进程线程间 | inproc://name | 无 | 线程间通信 |
| IPC | 同机进程间 | ipc:///path | Unix Socket | 微服务、守护进程 |
| TCP | 跨机器 | tcp://host:port | TCP/IP | 分布式系统 |
8.1.2 性能层级
延迟:inproc << IPC < TCP < TLS+TCP
吞吐:inproc >> IPC > TCP > TLS+TCP
选择优先级:
1. 同进程 → inproc
2. 同机器 → IPC (Unix Socket)
3. 跨机器 → TCP / TLS
8.2 inproc(线程间通信)
8.2.1 概念
inproc 传输在同一进程内的线程之间共享内存,无需任何系统调用或数据拷贝,是最快的通信方式。
┌─────────────────────────────┐
│ 进程 (Process) │
│ ┌─────────┐ ┌─────────┐ │
│ │ Thread A │ │ Thread B │ │
│ │ (PUB) │──│ (SUB) │ │
│ └─────────┘ └─────────┘ │
│ └───── inproc ─────┘ │
└─────────────────────────────┘
8.2.2 特性
| 特性 | 说明 |
|---|---|
| 延迟 | ~1 μs |
| 拷贝次数 | 0(零拷贝) |
| 内存使用 | 极低 |
| 线程安全 | ✅(NNG)/ ⚠️(nanomsg) |
| 传输协议 | 无(共享内存) |
8.2.3 使用场景
| 场景 | 说明 |
|---|---|
| 线程间消息传递 | 替代共享内存 + 锁 |
| Actor 模型实现 | 每个 Actor 一个线程 |
| 生产者-消费者 | 推送/PULL 模式 |
| 事件分发 | PUB/SUB 广播 |
8.2.4 代码示例(NNG)
inproc_pubsub.c:
#include <nng/nng.h>
#include <nng/protocol/pubsub0/pub.h>
#include <nng/protocol/pubsub0/sub.h>
#include <stdio.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
void *subscriber_thread(void *arg) {
nng_socket sub;
nng_sub0_open(&sub);
// 连接到 inproc 地址
nng_setopt(sub, NNG_OPT_SUB_SUBSCRIBE, "", 0);
nng_dial(sub, "inproc://events", NULL, 0);
printf("Subscriber thread started\n");
for (int i = 0; i < 5; i++) {
char *buf = NULL;
size_t sz;
if (nng_recv(sub, &buf, &sz, NNG_FLAG_ALLOC) == 0) {
printf("[SUB] Received: %.*s\n", (int)sz, buf);
nng_free(buf, sz);
}
}
nng_close(sub);
return NULL;
}
int main() {
nng_socket pub;
pthread_t sub_tid;
nng_pub0_open(&pub);
// 绑定到 inproc 地址
nng_listen(pub, "inproc://events", NULL, 0);
// 启动订阅者线程
pthread_create(&sub_tid, NULL, subscriber_thread, NULL);
usleep(100000); // 等待订阅者连接
// 发布消息
const char *events[] = {
"user.login",
"order.created",
"payment.success",
"order.shipped",
"user.logout"
};
for (int i = 0; i < 5; i++) {
printf("[PUB] Publishing: %s\n", events[i]);
nng_send(pub, (void *)events[i], strlen(events[i]), 0);
usleep(200000);
}
pthread_join(sub_tid, NULL);
nng_close(pub);
printf("Done.\n");
return 0;
}
cc inproc_pubsub.c -lnng -lpthread -o inproc_pubsub
./inproc_pubsub
输出示例:
[PUB] Publishing: user.login
[SUB] Received: user.login
[PUB] Publishing: order.created
[SUB] Received: order.created
[PUB] Publishing: payment.success
[SUB] Received: payment.success
[PUB] Publishing: order.shipped
[SUB] Received: order.shipped
[PUB] Publishing: user.logout
[SUB] Received: user.logout
Done.
8.2.5 inproc + PAIR 的线程通信模式
#include <nng/nng.h>
#include <nng/protocol/pair0/pair.h>
#include <stdio.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
typedef struct {
nng_socket sock;
const char *name;
} thread_arg;
void *worker(void *arg) {
thread_arg *ta = (thread_arg *)arg;
nng_pair0_open(&ta->sock);
nng_dial(ta->sock, "inproc://control", NULL, 0);
// 发送就绪消息
nng_send(ta->sock, "READY", 5, 0);
// 接收工作指令
char *buf = NULL;
size_t sz;
if (nng_recv(ta->sock, &buf, &sz, NNG_FLAG_ALLOC) == 0) {
printf("[%s] Got command: %.*s\n", ta->name, (int)sz, buf);
nng_free(buf, sz);
}
// 发送结果
nng_send(ta->sock, "DONE", 4, 0);
nng_close(ta->sock);
return NULL;
}
int main() {
nng_socket pair;
pthread_t tid;
thread_arg arg = { .name = "Worker-1" };
nng_pair0_open(&pair);
nng_listen(pair, "inproc://control", NULL, 0);
pthread_create(&tid, NULL, worker, &arg);
usleep(100000);
// 接收就绪消息
char *buf = NULL;
size_t sz;
nng_recv(pair, &buf, &sz, NNG_FLAG_ALLOC);
printf("[Main] Worker status: %.*s\n", (int)sz, buf);
nng_free(buf, sz);
// 发送工作指令
nng_send(pair, "PROCESS_DATA", 12, 0);
// 接收结果
nng_recv(pair, &buf, &sz, NNG_FLAG_ALLOC);
printf("[Main] Worker result: %.*s\n", (int)sz, buf);
nng_free(buf, sz);
pthread_join(tid, NULL);
nng_close(pair);
return 0;
}
8.3 IPC(Unix Socket)
8.3.1 概念
IPC 传输使用 Unix Domain Socket(UDS)实现同机器上不同进程之间的通信。相比 TCP,UDS 省去了网络协议栈的开销。
┌──────────┐ Unix Socket ┌──────────┐
│ Process A │────────────────│ Process B │
│ (Server) │ /tmp/app.ipc │ (Client) │
└──────────┘ └──────────┘
8.3.2 特性
| 特性 | 说明 |
|---|---|
| 延迟 | ~10 μs(比 TCP 快 3-5 倍) |
| 拷贝次数 | 1-2 次(内核缓冲区) |
| 最大消息大小 | 受系统缓冲区限制 |
| 文件系统 | 需要可写的文件系统路径 |
| 权限控制 | 支持文件权限 |
8.3.3 地址格式
ipc:///tmp/myapp.sock
ipc:///var/run/myapp.sock
ipc://myapp.sock # 相对路径(当前目录)
注意:路径必须对运行进程可写。启动时 NNG 会创建 Socket 文件,关闭时自动删除。
8.3.4 代码示例(NNG)
ipc_server.c:
#include <nng/nng.h>
#include <nng/protocol/reqrep0/rep.h>
#include <stdio.h>
#include <string.h>
#include <signal.h>
static volatile int running = 1;
void on_signal(int sig) { running = 0; }
int main() {
nng_socket sock;
int rv;
signal(SIGINT, on_signal);
signal(SIGTERM, on_signal);
nng_rep0_open(&sock);
// 设置最大接收消息大小
size_t maxsz = 1048576; // 1MB
nng_setopt(sock, NNG_OPT_RECVMAXSZ, &maxsz, sizeof(maxsz));
// 设置接收超时
nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, 1000);
// 绑定到 Unix Socket
if ((rv = nng_listen(sock, "ipc:///tmp/nng_demo.ipc", NULL, 0)) != 0) {
fprintf(stderr, "nng_listen: %s\n", nng_strerror(rv));
return 1;
}
printf("IPC server listening on /tmp/nng_demo.ipc\n");
while (running) {
char *buf = NULL;
size_t sz;
rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC);
if (rv == NNG_ETIMEDOUT) continue;
if (rv != 0) break;
printf("Received: %.*s\n", (int)sz, buf);
nng_free(buf, sz);
const char *reply = "IPC OK";
nng_send(sock, (void *)reply, strlen(reply), 0);
}
printf("\nShutting down...\n");
nng_close(sock);
return 0;
}
ipc_client.c:
#include <nng/nng.h>
#include <nng/protocol/reqrep0/req.h>
#include <stdio.h>
#include <string.h>
int main() {
nng_socket sock;
int rv;
nng_req0_open(&sock);
// 连接到 Unix Socket
if ((rv = nng_dial(sock, "ipc:///tmp/nng_demo.ipc", NULL, 0)) != 0) {
fprintf(stderr, "nng_dial: %s\n", nng_strerror(rv));
return 1;
}
// 发送请求
const char *msg = "Hello via IPC!";
nng_send(sock, (void *)msg, strlen(msg), 0);
// 接收响应
char *buf = NULL;
size_t sz;
if (nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC) == 0) {
printf("Reply: %.*s\n", (int)sz, buf);
nng_free(buf, sz);
}
nng_close(sock);
return 0;
}
# 终端 1
cc ipc_server.c -lnng -o ipc_server && ./ipc_server
# 终端 2
cc ipc_client.c -lnng -o ipc_client && ./ipc_client
# 输出: Reply: IPC OK
8.3.5 IPC 权限控制
Unix Socket 文件继承创建进程的 umask。可手动设置权限:
// 在绑定后设置 Socket 文件权限
#include <sys/stat.h>
chmod("/tmp/nng_demo.ipc", 0660); // 仅所有者和组可访问
8.3.6 systemd 集成
在 systemd 管理的服务中,可使用 systemd socket activation:
# /etc/systemd/system/myapp.socket
[Unit]
Description=My NNG IPC Socket
[Socket]
ListenStream=/var/run/myapp.sock
SocketMode=0660
[Install]
WantedBy=sockets.target
8.4 inproc vs IPC vs TCP 性能对比
8.4.1 延迟对比
| 传输 | P50 延迟 | P99 延迟 | 相对速度 |
|---|---|---|---|
| inproc | ~1 μs | ~5 μs | 基准 (1x) |
| IPC | ~10 μs | ~50 μs | ~10x |
| TCP (localhost) | ~30 μs | ~100 μs | ~30x |
| TCP (LAN) | ~100 μs | ~500 μs | ~100x |
| TLS (LAN) | ~200 μs | ~1 ms | ~200x |
8.4.2 吞吐量对比(256 字节消息)
| 传输 | 吞吐量 (msg/s) | 吞吐量 (MB/s) |
|---|---|---|
| inproc | ~5,000,000 | ~1,280 |
| IPC | ~500,000 | ~128 |
| TCP (localhost) | ~300,000 | ~77 |
| TCP (LAN) | ~200,000 | ~51 |
8.4.3 选择建议
需要什么通信?
│
├── 同一进程内的线程?
│ └── inproc(零延迟,零拷贝)
│
├── 同一机器上的进程?
│ └── IPC / Unix Socket(低延迟,无网络开销)
│
└── 不同机器?
└── TCP(标准网络通信)
└── 需要加密?→ TLS
8.5 多进程架构模式
8.5.1 模式 1:微服务 + IPC
┌──────────┐ IPC ┌──────────┐ IPC ┌──────────┐
│ Web 服务 │◄──────►│ 业务服务 │◄──────►│ 数据服务 │
│ (nginx) │ │ (worker) │ │ (cache) │
└──────────┘ └──────────┘ └──────────┘
5555 5556 5557
8.5.2 模式 2:插件系统
┌────────────────────────────┐
│ 主进程 (Host) │
│ │
│ ┌────────┐ inproc │
│ │ Plugin │◄──────────────►│
│ │ Thread │ PAIR │
│ └────────┘ │
└────────────────────────────┘
│ IPC
▼
┌──────────────────┐
│ 外部插件 (独立进程)│
└──────────────────┘
8.5.3 模式 3:网关代理
外部请求 (TCP) 内部服务 (IPC)
│ │
▼ ▼
┌──────────────────────────────┐
│ 网关进程 │
│ ┌──────────┐ ┌──────────┐ │
│ │ TCP 接入 │ │ IPC 转发 │ │
│ │ (面向外部) │──│ (面向内部) │ │
│ └──────────┘ └──────────┘ │
└──────────────────────────────┘
8.6 共享内存替代方案
对于超高性能场景(延迟 <1 μs),可以考虑直接使用共享内存 + 信号量,但 nanomsg / NNG 的 inproc 已经足够高效。
8.6.1 inproc vs 共享内存
| 特性 | inproc (NNG) | 共享内存 + 锁 |
|---|---|---|
| 延迟 | ~1 μs | ~0.1 μs |
| 编程复杂度 | 低 | 高(需手动同步) |
| 消息路由 | 内置协议支持 | 需手动实现 |
| 线程安全 | ✅ | 需要锁/信号量 |
| 消息格式 | 任意二进制 | 需自定义序列化 |
| 背压处理 | 内置 | 需手动实现 |
建议:除非有极端性能要求(<1 μs),否则使用 inproc 传输,避免手动管理共享内存的复杂性。
8.7 IPC 故障排查
8.7.1 常见问题
| 问题 | 原因 | 解决方案 |
|---|---|---|
NNG_EADDRINUSE | Socket 文件已存在 | 删除旧文件:rm /tmp/app.ipc |
NNG_EPERM | 无文件写入权限 | 检查目录权限 |
NNG_ECONNREFUSED | 服务端未启动 | 确认服务端进程 |
| 连接后无响应 | 协议不匹配 | 确认两端使用相同协议 |
8.7.2 清理残留 Socket 文件
# 查找残留的 IPC Socket 文件
ls -la /tmp/*.ipc /var/run/*.sock
# 启动前清理
rm -f /tmp/nng_demo.ipc
# 在代码中处理
#include <unistd.h>
unlink("/tmp/nng_demo.ipc"); // 启动前删除旧文件
nng_listen(sock, "ipc:///tmp/nng_demo.ipc", NULL, 0);
8.8 注意事项
inproc 必须先 bind 后 connect:inproc 传输要求至少一端先
listen/bind,另一端才能dial/connect。与 TCP 不同,inproc 没有自动重连机制。
IPC Socket 文件清理:程序异常退出时,Socket 文件可能残留。建议在程序启动时清理旧文件。
容器中的 IPC:Docker 容器默认有独立的文件系统。使用 IPC 时需要挂载共享卷。
8.9 扩展阅读
上一章:第 7 章:TLS 与安全通信 | 下一章:第 9 章:Docker 与微服务