nanomsg / NNG 消息库完全教程 / 第 3 章:可扩展性协议详解
3.1 协议概述
可扩展性协议(Scalability Protocols,SP)是 nanomsg / NNG 的核心抽象。每种协议定义了一种通信拓扑和消息路由语义,应用层无需关心底层传输细节。
3.1.1 协议分层
┌─────────────────────────────┐
│ 应用层 (Your Code) │
├─────────────────────────────┤
│ SP 协议层 (本章重点) │
│ PAIR / PUB / SUB / REQ ... │
├─────────────────────────────┤
│ 传输层 │
│ TCP / IPC / inproc / WS │
└─────────────────────────────┘
3.1.2 协议总览
| 协议 | 通信模式 | 消息路由 | 背压(Backpressure) | 状态 |
|---|---|---|---|---|
| PAIR | 1:1 | 直连 | ✅ | 稳定 |
| PUB | 1:N | 扇出(Fan-out) | ❌ 丢弃慢消费者 | 稳定 |
| SUB | N:1 | 过滤订阅 | N/A | 稳定 |
| REQ | 1:1 | 轮询(Round-robin) | ✅ | 稳定 |
| REP | N:1 | 自动路由回请求者 | ✅ | 稳定 |
| PUSH | 1:N | 负载均衡(Load-balance) | ✅ | 稳定 |
| PULL | N:1 | 公平队列(Fair-queue) | N/A | 稳定 |
| BUS | N:N | 全网广播 | ✅ | 稳定 |
| SURVEYOR | 1:N | 扇出 | ✅ | 稳定 |
| RESPONDENT | N:1 | 自动路由回调查者 | ✅ | 稳定 |
3.2 PAIR(点对点)
3.2.1 概念
PAIR 是最简单的协议,提供两个节点之间的 1:1 双向通信。它没有路由逻辑,消息直接从一端发送到另一端。
┌────────┐ 双向 ┌────────┐
│ Node A │◄───────────►│ Node B │
└────────┘ └────────┘
3.2.2 特性
- 只允许一个对端连接
- 双向通信,双方均可发送和接收
- 最低协议开销
- 适合 inproc(线程间通信)
3.2.3 适用场景
| 场景 | 说明 |
|---|---|
| 线程间通信 | 同一进程内两个线程的专用通道 |
| 命令通道 | 控制面与数据面的分离 |
| 简单 RPC | 无需复杂的请求路由 |
3.2.4 代码示例(NNG)
pair_server.c:
#include <nng/nng.h>
#include <nng/protocol/pair0/pair.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
int main() {
nng_socket sock;
int rv;
if ((rv = nng_pair0_open(&sock)) != 0) {
fprintf(stderr, "nng_pair0_open: %s\n", nng_strerror(rv));
return 1;
}
if ((rv = nng_listen(sock, "tcp://*:5555", NULL, 0)) != 0) {
fprintf(stderr, "nng_listen: %s\n", nng_strerror(rv));
return 1;
}
printf("PAIR server listening on tcp://*:5555\n");
while (1) {
char *buf = NULL;
size_t sz;
if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) == 0) {
printf("Received: %.*s\n", (int)sz, buf);
nng_free(buf, sz);
const char *reply = "PONG";
nng_send(sock, (void *)reply, strlen(reply), 0);
}
}
nng_close(sock);
return 0;
}
pair_client.c:
#include <nng/nng.h>
#include <nng/protocol/pair0/pair.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
int main() {
nng_socket sock;
int rv;
if ((rv = nng_pair0_open(&sock)) != 0) {
fprintf(stderr, "nng_pair0_open: %s\n", nng_strerror(rv));
return 1;
}
if ((rv = nng_dial(sock, "tcp://localhost:5555", NULL, 0)) != 0) {
fprintf(stderr, "nng_dial: %s\n", nng_strerror(rv));
return 1;
}
for (int i = 0; i < 5; i++) {
char msg[32];
snprintf(msg, sizeof(msg), "PING %d", i);
printf("Sending: %s\n", msg);
nng_send(sock, msg, strlen(msg), 0);
char *buf = NULL;
size_t sz;
if (nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC) == 0) {
printf("Received: %.*s\n", (int)sz, buf);
nng_free(buf, sz);
}
sleep(1);
}
nng_close(sock);
return 0;
}
3.2.5 注意事项
PAIR 只支持一个对端。如果有多个客户端尝试连接,只有一个能成功,其他会失败或排队。需要 1:N 通信时,请使用 REQ/REP 或 PUSH/PULL。
3.3 PUB / SUB(发布-订阅)
3.3.1 概念
PUB/SUB 是经典的发布-订阅模式。发布者(PUB)向所有订阅者(SUB)广播消息,订阅者可以设置过滤前缀来只接收感兴趣的消息。
┌──────────┐
┌──────►│ SUB (A) │ 接收所有消息
┌────────┤ └──────────┘
│ PUB │
│ Server ├──────►┌──────────┐
│ │ │ SUB (B) │ 只接收 "weather" 前缀
└────────┘ └──────────┘
└──────►┌──────────┐
│ SUB (C) │ 只接收 "stock" 前缀
└──────────┘
3.3.2 特性
- PUB:扇出(Fan-out)广播,消息发送给所有连接的 SUB
- SUB:通过主题过滤(Topic Filter)选择性接收
- PUB 端无背压:慢速消费者的消息会被丢弃
- SUB 端需要设置订阅过滤,否则默认不接收任何消息
3.3.3 主题过滤
SUB 通过消息前缀匹配进行过滤:
- 设置
""(空字符串)接收所有消息 - 设置
"weather"接收以"weather"开头的消息 - 可设置多个前缀
3.3.4 代码示例(NNG)
pub_server.c:
#include <nng/nng.h>
#include <nng/protocol/pubsub0/pub.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
int main() {
nng_socket sock;
int rv;
if ((rv = nng_pub0_open(&sock)) != 0) {
fprintf(stderr, "nng_pub0_open: %s\n", nng_strerror(rv));
return 1;
}
if ((rv = nng_listen(sock, "tcp://*:5556", NULL, 0)) != 0) {
fprintf(stderr, "nng_listen: %s\n", nng_strerror(rv));
return 1;
}
printf("PUB server on tcp://*:5556\n");
const char *topics[] = {
"weather sunny 25C",
"stock AAPL 150.25",
"weather rainy 18C",
"stock GOOG 2800.00",
"weather cloudy 20C",
};
int i = 0;
while (1) {
const char *msg = topics[i % 5];
printf("Publishing: %s\n", msg);
nng_send(sock, (void *)msg, strlen(msg), 0);
i++;
sleep(1);
}
nng_close(sock);
return 0;
}
sub_client.c:
#include <nng/nng.h>
#include <nng/protocol/pubsub0/sub.h>
#include <stdio.h>
#include <string.h>
int main() {
nng_socket sock;
int rv;
if ((rv = nng_sub0_open(&sock)) != 0) {
fprintf(stderr, "nng_sub0_open: %s\n", nng_strerror(rv));
return 1;
}
// 设置订阅前缀 —— 只接收 "weather" 开头的消息
const char *filter = "weather";
if ((rv = nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, filter, strlen(filter))) != 0) {
fprintf(stderr, "nng_setopt: %s\n", nng_strerror(rv));
return 1;
}
if ((rv = nng_dial(sock, "tcp://localhost:5556", NULL, 0)) != 0) {
fprintf(stderr, "nng_dial: %s\n", nng_strerror(rv));
return 1;
}
printf("SUB client subscribed to 'weather'\n");
while (1) {
char *buf = NULL;
size_t sz;
if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) == 0) {
printf("Received: %.*s\n", (int)sz, buf);
nng_free(buf, sz);
}
}
nng_close(sock);
return 0;
}
3.3.5 注意事项
SUB 默认不接收任何消息:必须调用
NNG_OPT_SUB_SUBSCRIBE设置至少一个过滤前缀(空字符串""表示接收全部)。
PUB 无背压:如果 SUB 消费速度跟不上,PUB 会丢弃消息。不适合需要可靠投递的场景。
消息不持久化:PUB/SUB 是纯内存模式,无磁盘存储。
3.4 REQ / REP(请求-应答)
3.4.1 概念
REQ/REP 实现经典的请求-应答模式。客户端(REQ)发送请求,服务端(REP)处理后返回响应。支持多服务端负载均衡。
┌────────┐ Request ┌────────┐
│ REQ ├───────────►│ REP │
│ Client │◄───────────┤ Server │
└────────┘ Response └────────┘
多个 REQ 连接多个 REP 时,REQ 端自动轮询分发:
┌───► REP Server A
REQ ─────┼───► REP Server B
└───► REP Server C
3.4.2 特性
- REQ 必须严格遵守 发送→接收→发送→接收 的交替模式
- REP 同样严格遵守 接收→发送→接收→发送
- REQ 支持自动重试和超时
- REP 会自动将响应路由回发起请求的 REQ
3.4.3 代码示例(NNG)
rep_server.c:
#include <nng/nng.h>
#include <nng/protocol/reqrep0/rep.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
int main() {
nng_socket sock;
int rv;
if ((rv = nng_rep0_open(&sock)) != 0) {
fprintf(stderr, "nng_rep0_open: %s\n", nng_strerror(rv));
return 1;
}
if ((rv = nng_listen(sock, "tcp://*:5557", NULL, 0)) != 0) {
fprintf(stderr, "nng_listen: %s\n", nng_strerror(rv));
return 1;
}
printf("REP server on tcp://*:5557\n");
while (1) {
char *buf = NULL;
size_t sz;
if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) {
fprintf(stderr, "nng_recv: %s\n", nng_strerror(rv));
break;
}
printf("Request: %.*s\n", (int)sz, buf);
// 简单回转(echo)
nng_send(sock, buf, sz, 0);
nng_free(buf, sz);
}
nng_close(sock);
return 0;
}
req_client.c:
#include <nng/nng.h>
#include <nng/protocol/reqrep0/req.h>
#include <stdio.h>
#include <string.h>
int main() {
nng_socket sock;
int rv;
if ((rv = nng_req0_open(&sock)) != 0) {
fprintf(stderr, "nng_req0_open: %s\n", nng_strerror(rv));
return 1;
}
if ((rv = nng_dial(sock, "tcp://localhost:5557", NULL, 0)) != 0) {
fprintf(stderr, "nng_dial: %s\n", nng_strerror(rv));
return 1;
}
for (int i = 0; i < 3; i++) {
char msg[64];
snprintf(msg, sizeof(msg), "Request #%d", i);
printf("Sending: %s\n", msg);
if ((rv = nng_send(sock, msg, strlen(msg), 0)) != 0) {
fprintf(stderr, "nng_send: %s\n", nng_strerror(rv));
break;
}
char *buf = NULL;
size_t sz;
if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) {
fprintf(stderr, "nng_recv: %s\n", nng_strerror(rv));
break;
}
printf("Reply: %.*s\n", (int)sz, buf);
nng_free(buf, sz);
}
nng_close(sock);
return 0;
}
3.4.4 注意事项
严格交替:REQ 端必须先发后收,REP 端必须先收后发。违反顺序会导致
NNG_ESTATE错误。
REP 自动路由:REP socket 内部维护请求来源信息,send 时自动路由回正确的 REQ,无需应用层处理。
多 REQ 连 REP:REQ 端会对多个 REP 进行轮询(Round-robin),实现简单的负载均衡。
3.5 PUSH / PULL(任务分发)
3.5.1 概念
PUSH/PULL 实现生产者-消费者模型。PUSH 端将任务分发给多个 PULL 端(负载均衡),PULL 端从多个 PUSH 端公平接收任务。
┌───► PULL Worker A
PUSH ────┼───► PULL Worker B (负载均衡)
└───► PULL Worker C
PUSH Producer A ──┐
├──► PULL (公平队列)
PUSH Producer B ──┘
3.5.2 特性
- PUSH:负载均衡(Round-robin)分发消息到 PULL
- PULL:公平队列(Fair-queue)从多个 PUSH 接收消息
- PUSH 有背压:所有 PULL 都忙时,PUSH 会阻塞
- PULL 只能接收,不能发送;PUSH 只能发送,不能接收
3.5.3 典型架构:任务队列
┌─────────────┐
│ Producer │ PUSH
│ (任务提交) ├──────────┐
└─────────────┘ │
┌────▼────┐
│ Worker 1│ PULL
└─────────┘
┌─────────────┐ ┌─────────┐
│ Producer │ PUSH│ Worker 2│ PULL
│ (任务提交) ├────►└─────────┘
└─────────────┘ ┌─────────┐
│ Worker 3│ PULL
└─────────┘
3.5.4 代码示例(NNG)
push_producer.c:
#include <nng/nng.h>
#include <nng/protocol/pipeline0/push.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
int main() {
nng_socket sock;
int rv;
if ((rv = nng_push0_open(&sock)) != 0) {
fprintf(stderr, "nng_push0_open: %s\n", nng_strerror(rv));
return 1;
}
if ((rv = nng_listen(sock, "tcp://*:5558", NULL, 0)) != 0) {
fprintf(stderr, "nng_listen: %s\n", nng_strerror(rv));
return 1;
}
printf("PUSH producer on tcp://*:5558\n");
for (int i = 0; i < 10; i++) {
char task[64];
snprintf(task, sizeof(task), "Task #%d", i);
printf("Sending: %s\n", task);
nng_send(sock, task, strlen(task), 0);
usleep(500000); // 500ms
}
nng_close(sock);
return 0;
}
pull_worker.c:
#include <nng/nng.h>
#include <nng/protocol/pipeline0/pull.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
int main(int argc, char *argv[]) {
nng_socket sock;
int rv;
const char *url = (argc > 1) ? argv[1] : "tcp://localhost:5558";
if ((rv = nng_pull0_open(&sock)) != 0) {
fprintf(stderr, "nng_pull0_open: %s\n", nng_strerror(rv));
return 1;
}
if ((rv = nng_dial(sock, url, NULL, 0)) != 0) {
fprintf(stderr, "nng_dial: %s\n", nng_strerror(rv));
return 1;
}
printf("PULL worker connected to %s\n", url);
while (1) {
char *buf = NULL;
size_t sz;
if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) == 0) {
printf("Processing: %.*s\n", (int)sz, buf);
nng_free(buf, sz);
usleep(1000000); // 模拟处理耗时 1s
}
}
nng_close(sock);
return 0;
}
3.5.5 注意事项
单向通信:PUSH 只能发,PULL 只能收。需要双向通信请用 REQ/REP。
背压机制:PUSH 端在所有 PULL 都满时会阻塞发送,防止消息丢失。但这也意味着 PUSH 可能挂起。
负载均衡粒度:消息按 Round-robin 分配,不感知 Worker 的实际负载。
3.6 BUS(总线)
3.6.1 概念
BUS 实现 N:N 全连接通信。每个节点发送的消息会广播给其他所有节点。
┌────────┐ ┌────────┐
│ Node A │◄────►│ Node B │
└───┬────┘ └────┬───┘
│ │
└──────┬─────────┘
▼
┌────────┐
│ Node C │
└────────┘
3.6.2 特性
- 所有节点地位平等
- 消息广播给所有其他节点(不回显给自己)
- 适合集群内部通信、状态同步
3.6.3 代码示例(NNG)
bus_node.c:
#include <nng/nng.h>
#include <nng/protocol/bus0/bus.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
int main(int argc, char *argv[]) {
nng_socket sock;
int rv;
if (argc < 3) {
fprintf(stderr, "Usage: %s <bind_url> <dial_url>\n", argv[0]);
fprintf(stderr, "Example: %s tcp://*:5560 tcp://localhost:5561\n", argv[0]);
return 1;
}
if ((rv = nng_bus0_open(&sock)) != 0) {
fprintf(stderr, "nng_bus0_open: %s\n", nng_strerror(rv));
return 1;
}
// 绑定一个地址
if ((rv = nng_listen(sock, argv[1], NULL, 0)) != 0) {
fprintf(stderr, "nng_listen(%s): %s\n", argv[1], nng_strerror(rv));
return 1;
}
// 连接其他节点
if ((rv = nng_dial(sock, argv[2], NULL, 0)) != 0) {
fprintf(stderr, "nng_dial(%s): %s\n", argv[2], nng_strerror(rv));
return 1;
}
printf("BUS node: bind=%s, dial=%s\n", argv[1], argv[2]);
// 发送消息
for (int i = 0; i < 5; i++) {
char msg[64];
snprintf(msg, sizeof(msg), "Message from %s #%d", argv[1], i);
nng_send(sock, msg, strlen(msg), 0);
printf("Sent: %s\n", msg);
sleep(1);
}
// 接收消息
nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, 3000);
while (1) {
char *buf = NULL;
size_t sz;
if (nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC) == 0) {
printf("Received: %.*s\n", (int)sz, buf);
nng_free(buf, sz);
} else {
break;
}
}
nng_close(sock);
return 0;
}
多节点运行:
# 节点 A(先启动,绑定两个端口)
./bus_node tcp://*:5560 tcp://localhost:5561
# 节点 B
./bus_node tcp://*:5561 tcp://localhost:5560
# 节点 C
./bus_node tcp://*:5562 tcp://localhost:5560
3.6.4 注意事项
连接拓扑:BUS 需要节点之间相互连接。如果有 N 个节点,需要至少 N-1 条连接才能保证全网可达。
不支持桥接:BUS 协议不会转发消息,只转发自己产生的消息。
3.7 SURVEYOR / RESPONDENT(调查模式)
3.7.1 概念
SURVEYOR/RESPONDENT 实现"调查-响应"模式。调查者(SURVEYOR)向所有响应者(RESPONDENT)广播一个调查请求,然后收集所有响应。
┌──────────────┐
┌──────────►│ RESPONDENT A │
│ └──────────────┘
┌───────┴──────┐ ┌──────────────┐
│ SURVEYOR │───►│ RESPONDENT B │
│ (调查者) │ └──────────────┘
└───────┬──────┘ ┌──────────────┘
└──────────►│ RESPONDENT C │
└──────────────┘
3.7.2 特性
- SURVEYOR 有截止时间(Survey Deadline),超时后不再接收响应
- RESPONDENT 自动将响应路由回发起调查的 SURVEYOR
- 适用于服务发现、健康检查、集群选举等场景
3.7.3 适用场景
| 场景 | 说明 |
|---|---|
| 服务发现 | 新节点上线时广播查询,收集所有已知节点 |
| 健康检查 | 定期调查所有节点的状态 |
| 集群选举 | 发起投票,收集所有节点的意见 |
| 配置同步 | 广播配置请求,收集节点的当前配置 |
3.7.4 代码示例(NNG)
surveyor.c:
#include <nng/nng.h>
#include <nng/protocol/survey0/survey.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
int main() {
nng_socket sock;
int rv;
if ((rv = nng_surveyor0_open(&sock)) != 0) {
fprintf(stderr, "nng_surveyor0_open: %s\n", nng_strerror(rv));
return 1;
}
if ((rv = nng_listen(sock, "tcp://*:5559", NULL, 0)) != 0) {
fprintf(stderr, "nng_listen: %s\n", nng_strerror(rv));
return 1;
}
printf("SURVEYOR on tcp://*:5559\n");
sleep(2); // 等待 RESPONDENT 连接
// 发起调查
const char *survey_msg = "Who are you?";
printf("Survey: %s\n", survey_msg);
nng_send(sock, (void *)survey_msg, strlen(survey_msg), 0);
// 设置接收超时(5 秒收集响应)
nng_setopt_ms(sock, NNG_OPT_SURVEYOR_SURVEYTIMEO, 5000);
// 收集响应
while (1) {
char *buf = NULL;
size_t sz;
rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC);
if (rv == NNG_ETIMEDOUT) {
printf("Survey period ended.\n");
break;
} else if (rv == 0) {
printf("Response: %.*s\n", (int)sz, buf);
nng_free(buf, sz);
}
}
nng_close(sock);
return 0;
}
respondent.c:
#include <nng/nng.h>
#include <nng/protocol/survey0/respond.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
int main(int argc, char *argv[]) {
nng_socket sock;
int rv;
const char *name = (argc > 1) ? argv[1] : "DefaultNode";
if ((rv = nng_respondent0_open(&sock)) != 0) {
fprintf(stderr, "nng_respondent0_open: %s\n", nng_strerror(rv));
return 1;
}
if ((rv = nng_dial(sock, "tcp://localhost:5559", NULL, 0)) != 0) {
fprintf(stderr, "nng_dial: %s\n", nng_strerror(rv));
return 1;
}
printf("RESPONDENT '%s' connected\n", name);
while (1) {
char *buf = NULL;
size_t sz;
if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) == 0) {
printf("Survey received: %.*s\n", (int)sz, buf);
nng_free(buf, sz);
// 响应
char reply[64];
snprintf(reply, sizeof(reply), "I am %s", name);
nng_send(sock, reply, strlen(reply), 0);
}
}
nng_close(sock);
return 0;
}
3.7.5 注意事项
调查超时:必须设置
NNG_OPT_SURVEYOR_SURVEYTIMEO,否则 SURVEYOR 会永远等待。
RESPONDENT 需先连接:SURVEYOR 发起调查前,RESPONDENT 应已连接,否则会丢失响应。
不能在同一 socket 上连续发起调查:必须等待当前调查结束后才能发起下一次。
3.8 协议选型指南
3.8.1 决策树
需要什么样的通信?
│
├── 1:1 双向?
│ └── PAIR
│
├── 请求-应答?
│ └── REQ / REP
│
├── 广播/发布?
│ ├── 允许丢失? ── PUB / SUB
│ └── 需要响应? ── SURVEYOR / RESPONDENT
│
├── 任务分发?
│ └── PUSH / PULL
│
└── 全连接?
└── BUS
3.8.2 场景对照表
| 业务场景 | 推荐协议 | 理由 |
|---|---|---|
| 微服务 RPC | REQ/REP | 请求-应答模式,支持负载均衡 |
| 实时数据推送 | PUB/SUB | 广播高效,支持主题过滤 |
| 日志收集 | PUSH/PULL | 负载均衡分发,背压保护 |
| 服务健康检查 | SURVEYOR/RESPONDENT | 一对多调查,超时机制完善 |
| 集群状态同步 | BUS | 全连接,所有节点互相同步 |
| 命令控制通道 | PAIR | 简单可靠,开销最低 |
| 任务队列 | PUSH/PULL | 生产者-消费者模型 |
| IoT 数据采集 | PUSH/PULL 或 PUB/SUB | 设备少用 PUSH/PULL,设备多用 PUB/SUB |
3.9 注意事项
协议与传输解耦:每种协议可以运行在任何传输层(TCP、IPC、inproc)上,两者独立选择。
单 socket 单协议:一个 socket 只能绑定一种协议,不能混用。
NNG 协议版本:NNG 使用
pair0、pub0等命名(带版本号),为未来协议升级预留空间。
3.10 扩展阅读
上一章:第 2 章:安装与环境搭建 | 下一章:第 4 章:nanomsg C API 详解