nanomsg / NNG 消息库完全教程 / 第 4 章:nanomsg C API 详解
4.1 nanomsg API 概述
nanomsg 的 API 设计遵循 POSIX 风格,核心函数只有十几个,学习成本极低。
4.1.1 API 函数列表
| 函数 | 用途 | 头文件 |
|---|---|---|
nn_socket() | 创建 Socket | <nanomsg/nn.h> |
nn_close() | 关闭 Socket | <nanomsg/nn.h> |
nn_bind() | 绑定地址(服务端) | <nanomsg/nn.h> |
nn_connect() | 连接地址(客户端) | <nanomsg/nn.h> |
nn_send() | 发送消息 | <nanomsg/nn.h> |
nn_recv() | 接收消息 | <nanomsg/nn.h> |
nn_sendmsg() | 发送多段消息(iov) | <nanomsg/nn.h> |
nn_recvmsg() | 接收多段消息(iov) | <nanomsg/nn.h> |
nn_setsockopt() | 设置选项 | <nanomsg/nn.h> |
nn_getsockopt() | 获取选项 | <nanomsg/nn.h> |
nn_shutdown() | 关闭绑定/连接端点 | <nanomsg/nn.h> |
nn_errno() | 获取错误码 | <nanomsg/nn.h> |
nn_strerror() | 错误码转字符串 | <nanomsg/nn.h> |
nn_device() | 创建代理设备 | <nanomsg/nn.h> |
nn_allocmsg() | 分配消息内存 | <nanomsg/nn.h> |
nn_freemsg() | 释放消息内存 | <nanomsg/nn.h> |
4.1.2 头文件引入
#include <nanomsg/nn.h> // 核心 API
#include <nanomsg/pair.h> // PAIR 协议常量
#include <nanomsg/pubsub.h> // PUB/SUB 协议常量
#include <nanomsg/reqrep.h> // REQ/REP 协议常量
#include <nanomsg/pipeline.h> // PUSH/PULL 协议常量
#include <nanomsg/bus.h> // BUS 协议常量
#include <nanomsg/survey.h> // SURVEY 协议常量
4.2 Socket 创建
4.2.1 nn_socket()
int nn_socket(int domain, int protocol);
参数:
| 参数 | 值 | 说明 |
|---|---|---|
domain | AF_SP | 标准 SP Socket |
domain | AF_SP_RAW | 原始 Socket(底层协议访问) |
protocol | NN_PAIR | PAIR 协议 |
protocol | NN_PUB / NN_SUB | PUB/SUB 协议 |
protocol | NN_REQ / NN_REP | REQ/REP 协议 |
protocol | NN_PUSH / NN_PULL | PUSH/PULL 协议 |
protocol | NN_BUS | BUS 协议 |
protocol | NN_SURVEYOR / NN_RESPONDENT | SURVEY 协议 |
返回值: 成功返回 Socket 文件描述符(>= 0),失败返回 -1。
示例:
// 创建 PAIR Socket
int pair_sock = nn_socket(AF_SP, NN_PAIR);
// 创建 PUB Socket
int pub_sock = nn_socket(AF_SP, NN_PUB);
// 创建 REQ Socket
int req_sock = nn_socket(AF_SP, NN_REQ);
// 创建 PUSH Socket
int push_sock = nn_socket(AF_SP, NN_PUSH);
4.2.2 错误处理
int sock = nn_socket(AF_SP, NN_PAIR);
if (sock < 0) {
fprintf(stderr, "nn_socket failed: %s (errno=%d)\n",
nn_strerror(nn_errno()), nn_errno());
exit(1);
}
4.2.3 注意事项
nanomsg 的 Socket 不是文件描述符,不能用于
select()/poll()。如果需要事件驱动,应使用nn_poll()或nn_getfd()获取可轮询的文件描述符。
一个 Socket 只能绑定一种协议。
4.3 绑定与连接
4.3.1 nn_bind()
服务端使用,将 Socket 绑定到一个地址并开始监听:
int nn_bind(int sock, const char *url);
返回值: 成功返回端点 ID(>= 0),失败返回 -1。
示例:
// TCP 绑定
int ep = nn_bind(sock, "tcp://*:5555");
// TCP 绑定到指定接口
int ep = nn_bind(sock, "tcp://eth0:5555");
// IPC 绑定
int ep = nn_bind(sock, "ipc:///tmp/myapp.ipc");
// inproc 绑定
int ep = nn_bind(sock, "inproc://mychannel");
// 多次绑定(同一 Socket 可绑定多个地址)
nn_bind(sock, "tcp://*:5555");
nn_bind(sock, "tcp://*:5556");
4.3.2 nn_connect()
客户端使用,连接到服务端地址:
int nn_connect(int sock, const char *url);
示例:
// TCP 连接
int ep = nn_connect(sock, "tcp://192.168.1.100:5555");
// IPC 连接
int ep = nn_connect(sock, "ipc:///tmp/myapp.ipc");
// inproc 连接
int ep = nn_connect(sock, "inproc://mychannel");
// 连接多个服务端(REQ 可轮询多个 REP)
nn_connect(sock, "tcp://server1:5555");
nn_connect(sock, "tcp://server2:5555");
nn_connect(sock, "tcp://server3:5555");
4.3.3 端点(Endpoint)管理
nn_bind() 和 nn_connect() 返回的端点 ID 可用于后续管理:
// 创建端点
int ep = nn_connect(sock, "tcp://server:5555");
// 关闭特定端点(不影响其他端点)
nn_shutdown(sock, ep);
4.3.4 地址格式
| 传输 | 格式 | 示例 |
|---|---|---|
| TCP | tcp://<interface>:<port> | tcp://*:5555, tcp://192.168.1.1:5555 |
| IPC | ipc://<path> | ipc:///tmp/app.ipc |
| inproc | inproc://<name> | inproc://channel1 |
| WebSocket | ws://<interface>:<port> | ws://*:8080 |
4.3.5 完整示例
#include <nanomsg/nn.h>
#include <nanomsg/reqrep.h>
#include <stdio.h>
#include <string.h>
int main() {
int sock = nn_socket(AF_SP, NN_REP);
if (sock < 0) {
fprintf(stderr, "Socket error: %s\n", nn_strerror(nn_errno()));
return 1;
}
// 绑定多个地址
int ep1 = nn_bind(sock, "tcp://*:5555");
int ep2 = nn_bind(sock, "ipc:///tmp/rep.ipc");
if (ep1 < 0 || ep2 < 0) {
fprintf(stderr, "Bind error: %s\n", nn_strerror(nn_errno()));
nn_close(sock);
return 1;
}
printf("Listening on tcp://*:5555 and ipc:///tmp/rep.ipc\n");
// ... 收发消息 ...
// 关闭特定端点
nn_shutdown(sock, ep1);
// 关闭 Socket
nn_close(sock);
return 0;
}
4.4 消息发送
4.4.1 nn_send()
int nn_send(int sock, const void *buf, size_t len, int flags);
参数:
| 参数 | 说明 |
|---|---|
sock | Socket 文件描述符 |
buf | 消息数据指针 |
len | 消息长度(字节) |
flags | 0(阻塞)或 NN_DONTWAIT(非阻塞) |
返回值: 成功返回发送的字节数,失败返回 -1。
示例:
const char *msg = "Hello, nanomsg!";
int bytes = nn_send(sock, msg, strlen(msg), 0);
if (bytes < 0) {
fprintf(stderr, "nn_send: %s\n", nn_strerror(nn_errno()));
}
4.4.2 非阻塞发送
int bytes = nn_send(sock, msg, len, NN_DONTWAIT);
if (bytes < 0) {
if (nn_errno() == EAGAIN) {
printf("发送缓冲区满,稍后重试\n");
}
}
4.4.3 nn_sendmsg() —— 多段消息(iov)
发送由多个不连续内存块组成的消息(scatter-gather I/O):
struct nn_iov iov[3];
iov[0].iov_base = "Header: ";
iov[0].iov_len = 8;
iov[1].iov_base = "Body content";
iov[1].iov_len = 12;
iov[2].iov_base = " | Footer";
iov[2].iov_len = 9;
int bytes = nn_sendmsg(sock, iov, 3, 0);
4.4.4 零拷贝发送
使用 nn_allocmsg() 分配的消息内存,nanomsg 可以实现零拷贝发送:
// 分配消息缓冲区
void *msg = nn_allocmsg(13, 0);
memcpy(msg, "Hello, NNG!", 13);
// 发送(零拷贝,发送后 msg 被 nanomsg 接管)
nn_send(sock, &msg, NN_MSG, 0);
// 注意:发送成功后不要再释放 msg
NN_MSG 是特殊标记,告诉 nn_send() 第二个参数是指向消息指针的指针。
4.5 消息接收
4.5.1 nn_recv()
int nn_recv(int sock, void *buf, size_t len, int flags);
参数:
| 参数 | 说明 |
|---|---|
sock | Socket 文件描述符 |
buf | 接收缓冲区 |
len | 缓冲区大小 |
flags | 0(阻塞)或 NN_DONTWAIT(非阻塞) |
返回值: 成功返回接收的字节数,失败返回 -1。
示例:
char buf[256];
int bytes = nn_recv(sock, buf, sizeof(buf), 0);
if (bytes > 0) {
printf("Received %d bytes: %.*s\n", bytes, bytes, buf);
}
4.5.2 零拷贝接收
void *buf = NULL;
int bytes = nn_recv(sock, &buf, NN_MSG, 0);
if (bytes > 0) {
printf("Received %d bytes: %.*s\n", bytes, bytes, (char *)buf);
nn_freemsg(buf); // 必须手动释放
}
4.5.3 nn_recvmsg() —— 多段消息接收
struct nn_iov iov[3];
iov[0].iov_base = malloc(100);
iov[0].iov_len = 100;
iov[1].iov_base = malloc(1000);
iov[1].iov_len = 1000;
iov[2].iov_base = malloc(100);
iov[2].iov_len = 100;
int bytes = nn_recvmsg(sock, iov, 3, 0);
4.5.4 非阻塞接收
char buf[256];
int bytes = nn_recv(sock, buf, sizeof(buf), NN_DONTWAIT);
if (bytes < 0) {
if (nn_errno() == EAGAIN) {
printf("没有消息可接收\n");
}
}
4.6 Socket 选项
4.6.1 设置选项
int nn_setsockopt(int sock, int level, int option,
const void *val, size_t len);
4.6.2 常用选项
| 选项 | 类型 | 说明 | 默认值 |
|---|---|---|---|
NN_RCVTIMEO | int (ms) | 接收超时 | -1 (无限) |
NN_SNDTIMEO | int (ms) | 发送超时 | -1 (无限) |
NN_RCVBUF | int | 接收缓冲区大小 (字节) | 128KB |
NN_SNDBUF | int | 发送缓冲区大小 (字节) | 128KB |
NN_LINGER | int (ms) | 关闭时等待未发送消息的时间 | 1000ms |
NN_RECONNECT_IVL | int (ms) | 重连间隔 | 100ms |
NN_RECONNECT_IVL_MAX | int (ms) | 最大重连间隔 | 0 (无退避) |
NN_SNDPRIO | int | 发送优先级 (0-15) | 8 |
NN_RCVPRIO | int | 接收优先级 (0-15) | 8 |
NN_IPV4ONLY | int | 只使用 IPv4 | 1 |
NN_SUB_SUBSCRIBE | string | SUB 订阅前缀 | (无) |
NN_SUB_UNSUBSCRIBE | string | SUB 取消订阅 | (无) |
4.6.3 设置超时示例
// 设置接收超时 5 秒
int timeout = 5000;
nn_setsockopt(sock, NN_SOL_SOCKET, NN_RCVTIMEO, &timeout, sizeof(timeout));
// 设置发送超时 3 秒
timeout = 3000;
nn_setsockopt(sock, NN_SOL_SOCKET, NN_SNDTIMEO, &timeout, sizeof(timeout));
// 现在 recv/send 会在超时后返回 -1,errno 为 EAGAIN
char buf[256];
int bytes = nn_recv(sock, buf, sizeof(buf), 0);
if (bytes < 0 && nn_errno() == EAGAIN) {
printf("接收超时\n");
}
4.6.4 SUB 订阅设置
// 订阅所有消息
const char *all = "";
nn_setsockopt(sock, NN_SUB, NN_SUB_SUBSCRIBE, all, 0);
// 订阅 "weather" 前缀
const char *weather = "weather";
nn_setsockopt(sock, NN_SUB, NN_SUB_SUBSCRIBE, weather, strlen(weather));
// 取消订阅 "weather"
nn_setsockopt(sock, NN_SUB, NN_SUB_UNSUBSCRIBE, weather, strlen(weather));
4.6.5 获取选项
int val;
size_t len = sizeof(val);
nn_getsockopt(sock, NN_SOL_SOCKET, NN_RCVBUF, &val, &len);
printf("Receive buffer: %d bytes\n", val);
4.7 使用 nn_poll() 多路复用
nn_poll() 类似 poll(),可以同时监听多个 Socket 的可读/可写状态:
#include <nanomsg/nn.h>
#include <nanomsg/pair.h>
#include <nanomsg/pubsub.h>
#include <stdio.h>
#include <string.h>
int main() {
int pair = nn_socket(AF_SP, NN_PAIR);
int sub = nn_socket(AF_SP, NN_SUB);
nn_bind(pair, "tcp://*:5560");
nn_connect(sub, "tcp://localhost:5561");
const char *filter = "";
nn_setsockopt(sub, NN_SUB, NN_SUB_SUBSCRIBE, filter, 0);
// 设置 poll
struct nn_pollfd fds[2];
fds[0].fd = pair;
fds[0].events = NN_POLLIN; // 监听可读
fds[1].fd = sub;
fds[1].events = NN_POLLIN;
// 超时 5000ms
int rc = nn_poll(fds, 2, 5000);
if (rc == 0) {
printf("Timeout, no events.\n");
} else if (rc > 0) {
if (fds[0].revents & NN_POLLIN) {
char *buf = NULL;
nn_recv(pair, &buf, NN_MSG, 0);
printf("PAIR: %s\n", (char *)buf);
nn_freemsg(buf);
}
if (fds[1].revents & NN_POLLIN) {
char *buf = NULL;
nn_recv(sub, &buf, NN_MSG, 0);
printf("SUB: %s\n", (char *)buf);
nn_freemsg(buf);
}
} else {
fprintf(stderr, "nn_poll error: %s\n", nn_strerror(nn_errno()));
}
nn_close(pair);
nn_close(sub);
return 0;
}
4.8 设备与代理
nn_device() 创建一个代理(proxy),将一个 Socket 的消息转发到另一个 Socket。常用于构建消息中间件:
#include <nanomsg/nn.h>
#include <nanomsg/pubsub.h>
#include <stdio.h>
int main() {
int front = nn_socket(AF_SP, NN_XPUB);
int back = nn_socket(AF_SP, NN_XSUB);
// XPUB/XSUB 是 PUB/SUB 的中间代理版本
nn_bind(front, "tcp://*:5560"); // 订阅者连接这里
nn_bind(back, "tcp://*:5561"); // 发布者连接这里
printf("Proxy running: XSUB:5561 -> XPUB:5560\n");
// 阻塞,转发消息
nn_device(front, back);
// nn_device 永不返回(除非出错)
nn_close(front);
nn_close(back);
return 0;
}
使用代理的架构:
Publisher A ──┐ ┌── Subscriber A
├──► XSUB ──► XPUB ──┤
Publisher B ──┘ (Proxy) └── Subscriber B
代理使发布者和订阅者完全解耦,无需知道对方地址。
4.9 完整生命周期示例
#include <nanomsg/nn.h>
#include <nanomsg/reqrep.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
int main() {
// 1. 创建 Socket
int sock = nn_socket(AF_SP, NN_REP);
if (sock < 0) {
fprintf(stderr, "nn_socket: %s\n", nn_strerror(nn_errno()));
return 1;
}
// 2. 设置选项
int timeout = 10000; // 10 秒接收超时
nn_setsockopt(sock, NN_SOL_SOCKET, NN_RCVTIMEO, &timeout, sizeof(timeout));
int bufsize = 1048576; // 1MB 接收缓冲区
nn_setsockopt(sock, NN_SOL_SOCKET, NN_RCVBUF, &bufsize, sizeof(bufsize));
// 3. 绑定地址
int ep = nn_bind(sock, "tcp://*:5555");
if (ep < 0) {
fprintf(stderr, "nn_bind: %s\n", nn_strerror(nn_errno()));
nn_close(sock);
return 1;
}
printf("Server started on tcp://*:5555\n");
// 4. 消息循环
while (1) {
// 接收消息(零拷贝)
void *buf = NULL;
int bytes = nn_recv(sock, &buf, NN_MSG, 0);
if (bytes < 0) {
if (nn_errno() == EAGAIN) {
printf("Timeout, continuing...\n");
continue;
}
fprintf(stderr, "nn_recv: %s\n", nn_strerror(nn_errno()));
break;
}
printf("Received %d bytes: %.*s\n", bytes, bytes, (char *)buf);
nn_freemsg(buf);
// 发送响应
const char *reply = "OK";
nn_send(sock, reply, strlen(reply), 0);
}
// 5. 关闭
nn_shutdown(sock, ep);
nn_close(sock);
printf("Server stopped.\n");
return 0;
}
4.10 nanomsg vs NNG API 对照
| 操作 | nanomsg | NNG |
|---|---|---|
| 创建 Socket | nn_socket(AF_SP, NN_PAIR) | nng_pair0_open(&sock) |
| 绑定 | nn_bind(sock, url) | nng_listen(sock, url, &listener, 0) |
| 连接 | nn_connect(sock, url) | nng_dial(sock, url, &dialer, 0) |
| 发送 | nn_send(sock, buf, len, 0) | nng_send(sock, buf, len, 0) |
| 接收 | nn_recv(sock, buf, len, 0) | nng_recv(sock, buf, &sz, 0) |
| 设置超时 | nn_setsockopt(..., NN_RCVTIMEO, ...) | nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, ms) |
| 关闭 | nn_close(sock) | nng_close(sock) |
| 错误信息 | nn_strerror(nn_errno()) | nng_strerror(rv) |
4.11 注意事项
线程安全:nanomsg 的 Socket 不是线程安全的。不要在多个线程中同时操作同一个 Socket。需要并发时,每个线程创建独立的 Socket,通过 inproc 通信。
零拷贝内存管理:使用
NN_MSG接收的消息必须通过nn_freemsg()释放,否则会内存泄漏。
Linger 时间:
nn_close()默认等待 1 秒让未发送的消息完成投递。可设置NN_LINGER为 0 来立即关闭。
4.12 扩展阅读
上一章:第 3 章:可扩展性协议详解 | 下一章:第 5 章:NNG 现代 API 详解