nanomsg / NNG 消息库完全教程 / 第 1 章:nanomsg / NNG 概述
1.1 什么是 nanomsg
nanomsg 是一个用 C 语言编写的轻量级消息传递库,由 ZeroMQ 的联合创始人 Martin Sústrik 于 2012 年发起。它的目标是提供一套简洁、可嵌入的 socket 库,用于解决分布式系统中的进程间和网络间通信问题。
1.1.1 设计哲学
nanomsg 的设计遵循以下原则:
| 原则 | 说明 |
|---|---|
| 简洁性 | API 尽量精简,学习成本低 |
| 可嵌入性 | 无外部依赖,可直接嵌入应用 |
| 可移植性 | 支持 Linux、macOS、Windows 等主流平台 |
| 协议驱动 | 内置多种"可扩展性协议",屏蔽底层通信细节 |
| 零 Broker | 无需中间件,点对点直接通信 |
1.1.2 历史背景
nanomsg 的诞生源于 ZeroMQ 在发展过程中遇到的一些问题:
- ZeroMQ 的代码库逐渐庞大,维护困难
- 许可证争议(从 LGPL 改为 MPL,再改为 LGPL)
- 社区分裂(衍生出 Crossroads I/O 等项目)
Martin Sústrik 决定从零开始,用更简洁的方式重新实现消息传递库的核心功能。
1.2 什么是 NNG
NNG(nanomsg-next-generation)是 nanomsg 的继任者,由 Garrett D’Amore 于 2017 年开始开发。它在保持 nanomsg API 兼容的基础上,进行了全面的现代化改造。
1.2.1 NNG 的改进
| 特性 | nanomsg | NNG |
|---|---|---|
| 异步 I/O | ❌ 同步阻塞 | ✅ 现代异步事件循环 |
| TLS 支持 | ❌ 无 | ✅ 原生 mbedTLS / OpenSSL |
| WebSocket | ❌ 无 | ✅ 原生支持 |
| 上下文 (Context) | ❌ 无 | ✅ 支持多线程共享 Socket |
| 零拷贝 | ⚠️ 有限 | ✅ 完整支持 |
| 内存分配器 | 固定 | 可自定义 |
| 维护状态 | ⚠️ 停滞 | ✅ 活跃开发 |
建议:新项目应优先选择 NNG。nanomsg 仅适用于已有项目的维护场景。
1.2.2 NNG 的架构
┌─────────────────────────────────────┐
│ 应用层 (Application) │
├─────────────────────────────────────┤
│ 可扩展性协议 (SP Protocols) │
│ PAIR / PUB / SUB / REQ / REP ... │
├─────────────────────────────────────┤
│ 传输层 (Transport) │
│ TCP / IPC / inproc / WS / TLS │
├─────────────────────────────────────┤
│ 平台抽象层 (Platform) │
│ Linux / macOS / Windows / POSIX │
└─────────────────────────────────────┘
1.3 nanomsg / NNG vs ZeroMQ
选择消息传递库时,最常见的对比对象是 ZeroMQ (0MQ)。以下是三者的详细对比:
1.3.1 核心对比
| 维度 | nanomsg | NNG | ZeroMQ |
|---|---|---|---|
| 语言 | C | C | C++ (C 绑定) |
| 许可证 | MIT | MIT | LGPL / MPL |
| API 风格 | POSIX-like | 现代异步 | POSIX-like |
| TLS | ❌ | ✅ | ❌ (需 zmq-plus) |
| WebSocket | ❌ | ✅ | ⚠️ 有限 |
| 内存占用 | 极小 | 小 | 中等 |
| 学习曲线 | 低 | 中 | 中 |
| 社区活跃度 | 低 | 中 | 高 |
| 成熟度 | 中 | 中 | 高 |
1.3.2 何时选择 nanomsg / NNG
选择 nanomsg / NNG 的场景:
- 需要极小的内存和二进制体积(嵌入式设备)
- 项目许可证要求严格(MIT 许可证更宽松)
- 需要原生 TLS / WebSocket 支持
- 追求简洁的 API,降低学习成本
选择 ZeroMQ 的场景:
- 团队已有 ZeroMQ 使用经验
- 需要更丰富的语言绑定生态
- 需要更高成熟度和社区支持
1.3.3 API 风格对比
ZeroMQ:
void *ctx = zmq_ctx_new();
void *sock = zmq_socket(ctx, ZMQ_PAIR);
zmq_bind(sock, "tcp://*:5555");
zmq_send(sock, "hello", 5, 0);
zmq_close(sock);
zmq_ctx_destroy(ctx);
nanomsg:
int sock = nn_socket(AF_SP, NN_PAIR);
nn_bind(sock, "tcp://*:5555");
nn_send(sock, "hello", 5, 0);
nn_close(sock);
NNG:
nng_socket sock;
nng_pair_open(&sock);
nng_listen(sock, "tcp://*:5555", NULL, 0);
nng_send(sock, "hello", 5, 0);
nng_close(sock);
可以看到,三者的 API 风格非常相似,迁移成本较低。
1.4 可扩展性协议(Scalability Protocols)
nanomsg / NNG 的核心价值在于内置的可扩展性协议(Scalability Protocols,简称 SP)。这些协议解决了不同通信拓扑下的常见问题。
1.4.1 协议一览
| 协议 | 角色 | 模式 | 典型用途 |
|---|---|---|---|
| PAIR | 1:1 | 点对点 | 两个进程间的专用通道 |
| PUB | 1:N | 发布 | 向多个订阅者广播消息 |
| SUB | N:1 | 订阅 | 接收发布者的消息 |
| REQ | 1:1 | 请求 | 发起请求并等待应答 |
| REP | 1:1 | 应答 | 接收请求并返回响应 |
| PUSH | 1:N | 推送 | 向多个消费者分发任务 |
| PULL | N:1 | 拉取 | 从多个生产者接收任务 |
| BUS | N:N | 总线 | 所有节点互相通信 |
| SURVEYOR | 1:N | 调查 | 向所有节点发起调查并收集响应 |
| RESPONDENT | N:1 | 响应 | 响应调查者的调查 |
1.4.2 协议分类
消息模式
├── 一对一 (1:1)
│ ├── PAIR —— 简单双向通信
│ └── REQ/REP —— 请求-应答
├── 一对多 (1:N)
│ ├── PUB/SUB —— 发布-订阅
│ ├── PUSH/PULL —— 任务分发
│ └── SURVEYOR/RESPONDENT —— 调查模式
└── 多对多 (N:N)
└── BUS —— 总线广播
每种协议的详细用法将在 第 3 章:协议详解 中深入讲解。
1.5 传输层(Transports)
nanomsg / NNG 支持多种传输方式:
| 传输 | 地址格式 | 说明 |
|---|---|---|
| TCP | tcp://host:port | 跨网络通信 |
| IPC | ipc:///path | 进程间通信(Unix Socket) |
| inproc | inproc://name | 线程间通信(同一进程内) |
| WebSocket | ws://host:port | NNG 支持,浏览器可直接连接 |
| TLS | tls+tcp://host:port | NNG 支持,加密 TCP 通信 |
| WSS | wss://host:port | NNG 支持,加密 WebSocket |
1.6 适用场景
1.6.1 最佳场景
| 场景 | 说明 | 推荐协议 |
|---|---|---|
| 微服务通信 | 轻量级 RPC 调用 | REQ/REP |
| IoT 数据采集 | 设备上报数据,服务端收集 | PUSH/PUB + PULL/SUB |
| 任务队列 | 生产者-消费者模型 | PUSH/PULL |
| 事件广播 | 向多个订阅者推送事件 | PUB/SUB |
| 服务发现 | 节点间互相探测 | BUS / SURVEY |
| 进程内通信 | 同一进程内不同线程通信 | PAIR (inproc) |
| 边缘计算 | 资源受限环境下的高效通信 | NNG + IPC |
1.6.2 不适合的场景
| 场景 | 原因 | 替代方案 |
|---|---|---|
| 大规模消息持久化 | 无磁盘存储能力 | RabbitMQ / Kafka |
| 严格顺序保证 | 无全局排序机制 | Kafka |
| 复杂路由规则 | 路由能力有限 | AMQP / MQTT |
| 浏览器直接通信 | 需要额外封装 | WebSocket + NNG |
1.7 业务场景示例
场景 1:IoT 温度监控
┌──────────┐ PUSH ┌──────────┐ PULL ┌──────────┐
│ 温度传感器 ├─────────→│ 网关设备 ├─────────→│ 云端服务 │
│ (嵌入式) │ │ (Edge) │ │ (分析) │
└──────────┘ └──────────┘ └──────────┘
- 传感器使用 PUSH 协议发送温度数据
- 云端使用 PULL 协议接收并处理
场景 2:微服务 RPC
┌──────────┐ REQ ┌──────────┐ REP ┌──────────┐
│ Web 前端 ├─────────→│ API 网关 ├─────────→│ 用户服务 │
└──────────┘ └──────────┘ └──────────┘
- 前端通过 REQ 协议发起请求
- 用户服务通过 REP 协议返回结果
场景 3:日志广播
PUB
┌──────────┐ ──→ ┌──────────┐
│ 应用服务 │ │ 日志收集器│
└────┬─────┘ └──────────┘
│ PUB
└───→ ┌──────────┐
│ 告警服务 │
└──────────┘
- 应用服务通过 PUB 协议广播日志
- 多个消费者通过 SUB 协议订阅感兴趣的日志
1.8 快速体验
下面是一个最简单的 “Hello World” 示例,使用 NNG 的 REQ/REP 模式:
server.c(应答端):
#include <nng/nng.h>
#include <nng/protocol/reqrep0/rep.h>
#include <stdio.h>
#include <string.h>
int main() {
nng_socket sock;
int rv;
if ((rv = nng_rep_open(&sock)) != 0) {
fprintf(stderr, "nng_rep_open: %s\n", nng_strerror(rv));
return 1;
}
if ((rv = nng_listen(sock, "tcp://*:5555", NULL, 0)) != 0) {
fprintf(stderr, "nng_listen: %s\n", nng_strerror(rv));
return 1;
}
printf("Server listening on tcp://*:5555\n");
for (;;) {
char *buf = NULL;
size_t sz;
if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) {
fprintf(stderr, "nng_recv: %s\n", nng_strerror(rv));
break;
}
printf("Received: %.*s\n", (int)sz, buf);
nng_free(buf, sz);
const char *reply = "OK";
if ((rv = nng_send(sock, (void *)reply, strlen(reply), 0)) != 0) {
fprintf(stderr, "nng_send: %s\n", nng_strerror(rv));
break;
}
}
nng_close(sock);
return 0;
}
client.c(请求端):
#include <nng/nng.h>
#include <nng/protocol/reqrep0/req.h>
#include <stdio.h>
#include <string.h>
int main() {
nng_socket sock;
int rv;
if ((rv = nng_req_open(&sock)) != 0) {
fprintf(stderr, "nng_req_open: %s\n", nng_strerror(rv));
return 1;
}
if ((rv = nng_dial(sock, "tcp://localhost:5555", NULL, 0)) != 0) {
fprintf(stderr, "nng_dial: %s\n", nng_strerror(rv));
return 1;
}
const char *msg = "Hello, NNG!";
if ((rv = nng_send(sock, (void *)msg, strlen(msg), 0)) != 0) {
fprintf(stderr, "nng_send: %s\n", nng_strerror(rv));
return 1;
}
char *buf = NULL;
size_t sz;
if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) {
fprintf(stderr, "nng_recv: %s\n", nng_strerror(rv));
return 1;
}
printf("Reply: %.*s\n", (int)sz, buf);
nng_free(buf, sz);
nng_close(sock);
return 0;
}
编译与运行:
# 编译(需要已安装 NNG)
cc server.c -lnng -o server
cc client.c -lnng -o client
# 终端 1:启动服务端
./server
# 终端 2:启动客户端
./client
# 输出: Reply: OK
1.9 注意事项
nanomsg vs NNG 的选择:nanomsg 已基本停止维护(最后活跃于 2020 年),NNG 是其官方继任者。新项目请直接使用 NNG。
线程安全:nanomsg 的 socket 不是线程安全的,每个线程应使用独立的 socket。NNG 的 socket 本身是线程安全的,但建议使用 Context 实现更好的并发控制。
许可证:nanomsg 和 NNG 均使用 MIT 许可证,可自由用于商业项目。
1.10 扩展阅读
下一章:第 2 章:安装与环境搭建