强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

nanomsg / NNG 消息库完全教程 / 第 8 章:IPC 与进程间通信

8.1 IPC 概述

nanomsg / NNG 支持多种进程内和进程间通信方式,无需网络协议栈即可实现高效消息传递。

8.1.1 传输方式一览

传输范围地址格式协议开销适用场景
inproc同进程线程间inproc://name线程间通信
IPC同机进程间ipc:///pathUnix Socket微服务、守护进程
TCP跨机器tcp://host:portTCP/IP分布式系统

8.1.2 性能层级

延迟:inproc << IPC < TCP < TLS+TCP
吞吐:inproc >> IPC > TCP > TLS+TCP

选择优先级:
1. 同进程 → inproc
2. 同机器 → IPC (Unix Socket)
3. 跨机器 → TCP / TLS

8.2 inproc(线程间通信)

8.2.1 概念

inproc 传输在同一进程内的线程之间共享内存,无需任何系统调用或数据拷贝,是最快的通信方式。

┌─────────────────────────────┐
│          进程 (Process)      │
│  ┌─────────┐  ┌─────────┐  │
│  │ Thread A │  │ Thread B │  │
│  │ (PUB)   │──│ (SUB)   │  │
│  └─────────┘  └─────────┘  │
│      └───── inproc ─────┘  │
└─────────────────────────────┘

8.2.2 特性

特性说明
延迟~1 μs
拷贝次数0(零拷贝)
内存使用极低
线程安全✅(NNG)/ ⚠️(nanomsg)
传输协议无(共享内存)

8.2.3 使用场景

场景说明
线程间消息传递替代共享内存 + 锁
Actor 模型实现每个 Actor 一个线程
生产者-消费者推送/PULL 模式
事件分发PUB/SUB 广播

8.2.4 代码示例(NNG)

inproc_pubsub.c:

#include <nng/nng.h>
#include <nng/protocol/pubsub0/pub.h>
#include <nng/protocol/pubsub0/sub.h>
#include <stdio.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>

void *subscriber_thread(void *arg) {
    nng_socket sub;
    nng_sub0_open(&sub);

    // 连接到 inproc 地址
    nng_setopt(sub, NNG_OPT_SUB_SUBSCRIBE, "", 0);
    nng_dial(sub, "inproc://events", NULL, 0);

    printf("Subscriber thread started\n");

    for (int i = 0; i < 5; i++) {
        char *buf = NULL;
        size_t sz;
        if (nng_recv(sub, &buf, &sz, NNG_FLAG_ALLOC) == 0) {
            printf("[SUB] Received: %.*s\n", (int)sz, buf);
            nng_free(buf, sz);
        }
    }

    nng_close(sub);
    return NULL;
}

int main() {
    nng_socket pub;
    pthread_t sub_tid;

    nng_pub0_open(&pub);

    // 绑定到 inproc 地址
    nng_listen(pub, "inproc://events", NULL, 0);

    // 启动订阅者线程
    pthread_create(&sub_tid, NULL, subscriber_thread, NULL);
    usleep(100000);  // 等待订阅者连接

    // 发布消息
    const char *events[] = {
        "user.login",
        "order.created",
        "payment.success",
        "order.shipped",
        "user.logout"
    };

    for (int i = 0; i < 5; i++) {
        printf("[PUB] Publishing: %s\n", events[i]);
        nng_send(pub, (void *)events[i], strlen(events[i]), 0);
        usleep(200000);
    }

    pthread_join(sub_tid, NULL);
    nng_close(pub);

    printf("Done.\n");
    return 0;
}
cc inproc_pubsub.c -lnng -lpthread -o inproc_pubsub
./inproc_pubsub

输出示例:

[PUB] Publishing: user.login
[SUB] Received: user.login
[PUB] Publishing: order.created
[SUB] Received: order.created
[PUB] Publishing: payment.success
[SUB] Received: payment.success
[PUB] Publishing: order.shipped
[SUB] Received: order.shipped
[PUB] Publishing: user.logout
[SUB] Received: user.logout
Done.

8.2.5 inproc + PAIR 的线程通信模式

#include <nng/nng.h>
#include <nng/protocol/pair0/pair.h>
#include <stdio.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>

typedef struct {
    nng_socket sock;
    const char *name;
} thread_arg;

void *worker(void *arg) {
    thread_arg *ta = (thread_arg *)arg;

    nng_pair0_open(&ta->sock);
    nng_dial(ta->sock, "inproc://control", NULL, 0);

    // 发送就绪消息
    nng_send(ta->sock, "READY", 5, 0);

    // 接收工作指令
    char *buf = NULL;
    size_t sz;
    if (nng_recv(ta->sock, &buf, &sz, NNG_FLAG_ALLOC) == 0) {
        printf("[%s] Got command: %.*s\n", ta->name, (int)sz, buf);
        nng_free(buf, sz);
    }

    // 发送结果
    nng_send(ta->sock, "DONE", 4, 0);

    nng_close(ta->sock);
    return NULL;
}

int main() {
    nng_socket pair;
    pthread_t tid;
    thread_arg arg = { .name = "Worker-1" };

    nng_pair0_open(&pair);
    nng_listen(pair, "inproc://control", NULL, 0);

    pthread_create(&tid, NULL, worker, &arg);
    usleep(100000);

    // 接收就绪消息
    char *buf = NULL;
    size_t sz;
    nng_recv(pair, &buf, &sz, NNG_FLAG_ALLOC);
    printf("[Main] Worker status: %.*s\n", (int)sz, buf);
    nng_free(buf, sz);

    // 发送工作指令
    nng_send(pair, "PROCESS_DATA", 12, 0);

    // 接收结果
    nng_recv(pair, &buf, &sz, NNG_FLAG_ALLOC);
    printf("[Main] Worker result: %.*s\n", (int)sz, buf);
    nng_free(buf, sz);

    pthread_join(tid, NULL);
    nng_close(pair);
    return 0;
}

8.3 IPC(Unix Socket)

8.3.1 概念

IPC 传输使用 Unix Domain Socket(UDS)实现同机器上不同进程之间的通信。相比 TCP,UDS 省去了网络协议栈的开销。

┌──────────┐   Unix Socket   ┌──────────┐
│ Process A │────────────────│ Process B │
│ (Server)  │  /tmp/app.ipc  │ (Client)  │
└──────────┘                 └──────────┘

8.3.2 特性

特性说明
延迟~10 μs(比 TCP 快 3-5 倍)
拷贝次数1-2 次(内核缓冲区)
最大消息大小受系统缓冲区限制
文件系统需要可写的文件系统路径
权限控制支持文件权限

8.3.3 地址格式

ipc:///tmp/myapp.sock
ipc:///var/run/myapp.sock
ipc://myapp.sock        # 相对路径(当前目录)

注意:路径必须对运行进程可写。启动时 NNG 会创建 Socket 文件,关闭时自动删除。

8.3.4 代码示例(NNG)

ipc_server.c:

#include <nng/nng.h>
#include <nng/protocol/reqrep0/rep.h>
#include <stdio.h>
#include <string.h>
#include <signal.h>

static volatile int running = 1;

void on_signal(int sig) { running = 0; }

int main() {
    nng_socket sock;
    int rv;

    signal(SIGINT, on_signal);
    signal(SIGTERM, on_signal);

    nng_rep0_open(&sock);

    // 设置最大接收消息大小
    size_t maxsz = 1048576;  // 1MB
    nng_setopt(sock, NNG_OPT_RECVMAXSZ, &maxsz, sizeof(maxsz));

    // 设置接收超时
    nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, 1000);

    // 绑定到 Unix Socket
    if ((rv = nng_listen(sock, "ipc:///tmp/nng_demo.ipc", NULL, 0)) != 0) {
        fprintf(stderr, "nng_listen: %s\n", nng_strerror(rv));
        return 1;
    }

    printf("IPC server listening on /tmp/nng_demo.ipc\n");

    while (running) {
        char *buf = NULL;
        size_t sz;
        rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC);
        if (rv == NNG_ETIMEDOUT) continue;
        if (rv != 0) break;

        printf("Received: %.*s\n", (int)sz, buf);
        nng_free(buf, sz);

        const char *reply = "IPC OK";
        nng_send(sock, (void *)reply, strlen(reply), 0);
    }

    printf("\nShutting down...\n");
    nng_close(sock);
    return 0;
}

ipc_client.c:

#include <nng/nng.h>
#include <nng/protocol/reqrep0/req.h>
#include <stdio.h>
#include <string.h>

int main() {
    nng_socket sock;
    int rv;

    nng_req0_open(&sock);

    // 连接到 Unix Socket
    if ((rv = nng_dial(sock, "ipc:///tmp/nng_demo.ipc", NULL, 0)) != 0) {
        fprintf(stderr, "nng_dial: %s\n", nng_strerror(rv));
        return 1;
    }

    // 发送请求
    const char *msg = "Hello via IPC!";
    nng_send(sock, (void *)msg, strlen(msg), 0);

    // 接收响应
    char *buf = NULL;
    size_t sz;
    if (nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC) == 0) {
        printf("Reply: %.*s\n", (int)sz, buf);
        nng_free(buf, sz);
    }

    nng_close(sock);
    return 0;
}
# 终端 1
cc ipc_server.c -lnng -o ipc_server && ./ipc_server

# 终端 2
cc ipc_client.c -lnng -o ipc_client && ./ipc_client
# 输出: Reply: IPC OK

8.3.5 IPC 权限控制

Unix Socket 文件继承创建进程的 umask。可手动设置权限:

// 在绑定后设置 Socket 文件权限
#include <sys/stat.h>
chmod("/tmp/nng_demo.ipc", 0660);  // 仅所有者和组可访问

8.3.6 systemd 集成

在 systemd 管理的服务中,可使用 systemd socket activation:

# /etc/systemd/system/myapp.socket
[Unit]
Description=My NNG IPC Socket

[Socket]
ListenStream=/var/run/myapp.sock
SocketMode=0660

[Install]
WantedBy=sockets.target

8.4 inproc vs IPC vs TCP 性能对比

8.4.1 延迟对比

传输P50 延迟P99 延迟相对速度
inproc~1 μs~5 μs基准 (1x)
IPC~10 μs~50 μs~10x
TCP (localhost)~30 μs~100 μs~30x
TCP (LAN)~100 μs~500 μs~100x
TLS (LAN)~200 μs~1 ms~200x

8.4.2 吞吐量对比(256 字节消息)

传输吞吐量 (msg/s)吞吐量 (MB/s)
inproc~5,000,000~1,280
IPC~500,000~128
TCP (localhost)~300,000~77
TCP (LAN)~200,000~51

8.4.3 选择建议

需要什么通信?
│
├── 同一进程内的线程?
│   └── inproc(零延迟,零拷贝)
│
├── 同一机器上的进程?
│   └── IPC / Unix Socket(低延迟,无网络开销)
│
└── 不同机器?
    └── TCP(标准网络通信)
        └── 需要加密?→ TLS

8.5 多进程架构模式

8.5.1 模式 1:微服务 + IPC

┌──────────┐  IPC   ┌──────────┐  IPC   ┌──────────┐
│ Web 服务  │◄──────►│ 业务服务  │◄──────►│ 数据服务  │
│ (nginx)  │        │ (worker) │        │ (cache)  │
└──────────┘        └──────────┘        └──────────┘
    5555                5556                5557

8.5.2 模式 2:插件系统

┌────────────────────────────┐
│         主进程 (Host)       │
│                            │
│  ┌────────┐   inproc       │
│  │ Plugin │◄──────────────►│
│  │ Thread │   PAIR         │
│  └────────┘                │
└────────────────────────────┘
         │  IPC
         ▼
┌──────────────────┐
│ 外部插件 (独立进程)│
└──────────────────┘

8.5.3 模式 3:网关代理

外部请求 (TCP)      内部服务 (IPC)
      │                   │
      ▼                   ▼
┌──────────────────────────────┐
│          网关进程              │
│  ┌──────────┐  ┌──────────┐ │
│  │ TCP 接入  │  │ IPC 转发  │ │
│  │ (面向外部) │──│ (面向内部) │ │
│  └──────────┘  └──────────┘ │
└──────────────────────────────┘

8.6 共享内存替代方案

对于超高性能场景(延迟 <1 μs),可以考虑直接使用共享内存 + 信号量,但 nanomsg / NNG 的 inproc 已经足够高效。

8.6.1 inproc vs 共享内存

特性inproc (NNG)共享内存 + 锁
延迟~1 μs~0.1 μs
编程复杂度高(需手动同步)
消息路由内置协议支持需手动实现
线程安全需要锁/信号量
消息格式任意二进制需自定义序列化
背压处理内置需手动实现

建议:除非有极端性能要求(<1 μs),否则使用 inproc 传输,避免手动管理共享内存的复杂性。


8.7 IPC 故障排查

8.7.1 常见问题

问题原因解决方案
NNG_EADDRINUSESocket 文件已存在删除旧文件:rm /tmp/app.ipc
NNG_EPERM无文件写入权限检查目录权限
NNG_ECONNREFUSED服务端未启动确认服务端进程
连接后无响应协议不匹配确认两端使用相同协议

8.7.2 清理残留 Socket 文件

# 查找残留的 IPC Socket 文件
ls -la /tmp/*.ipc /var/run/*.sock

# 启动前清理
rm -f /tmp/nng_demo.ipc

# 在代码中处理
#include <unistd.h>
unlink("/tmp/nng_demo.ipc");  // 启动前删除旧文件
nng_listen(sock, "ipc:///tmp/nng_demo.ipc", NULL, 0);

8.8 注意事项

inproc 必须先 bind 后 connect:inproc 传输要求至少一端先 listen/bind,另一端才能 dial/connect。与 TCP 不同,inproc 没有自动重连机制。

IPC Socket 文件清理:程序异常退出时,Socket 文件可能残留。建议在程序启动时清理旧文件。

容器中的 IPC:Docker 容器默认有独立的文件系统。使用 IPC 时需要挂载共享卷。


8.9 扩展阅读


上一章第 7 章:TLS 与安全通信 | 下一章第 9 章:Docker 与微服务