POSIX 标准详解教程 / 第四章:线程
第四章:线程
掌握 POSIX 线程(pthread)API,理解线程同步机制:互斥锁、条件变量、读写锁、屏障。
4.1 POSIX 线程概述
4.1.1 线程 vs 进程
| 对比维度 | 进程 (Process) | 线程 (Thread) |
|---|---|---|
| 地址空间 | 独立 | 共享 |
| 创建开销 | 大(复制页表) | 小(仅创建栈) |
| 通信方式 | IPC(管道、共享内存等) | 直接读写共享变量 |
| 切换开销 | 大(TLB 刷新) | 小 |
| 隔离性 | 强(一个崩溃不影响另一个) | 弱(一个崩溃整个进程终止) |
| 创建函数 | fork() | pthread_create() |
4.1.2 POSIX 线程标准
POSIX 线程定义于 POSIX.1c(IEEE Std 1003.1c-1995),后被合并到 POSIX.1-2001 中。Linux 上的实现为 NPTL(Native POSIX Thread Library)。
4.1.3 线程资源模型
进程地址空间
┌─────────────────────────────────────────┐
│ 代码段 (.text) │ ← 共享
├─────────────────────────────────────────┤
│ 数据段 (.data/.bss) │ ← 共享(全局变量)
├─────────────────────────────────────────┤
│ 堆 │ ← 共享(malloc 分配)
├─────────────────────────────────────────┤
│ 线程 1 栈 (8MB) │ ← 线程 1 私有
├─────────────────────────────────────────┤
│ 线程 2 栈 (8MB) │ ← 线程 2 私有
├─────────────────────────────────────────┤
│ 线程 3 栈 (8MB) │ ← 线程 3 私有
└─────────────────────────────────────────┘
编译时需要链接 pthread 库:gcc -Wall -o program program.c -lpthread
4.2 线程创建与终止
4.2.1 pthread_create() 与 pthread_join()
/*
* thread_basic.c - 基本线程创建和等待
* 编译: gcc -Wall -o thread_basic thread_basic.c -lpthread
*/
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
/* 线程函数参数结构体 */
typedef struct {
int id;
char name[32];
} thread_arg_t;
/* 线程入口函数 */
static void *thread_func(void *arg)
{
thread_arg_t *targ = (thread_arg_t *)arg;
printf("线程 %d (%s) 开始运行, TID 区域=%p\n",
targ->id, targ->name, (void *)&targ->id);
/* 模拟工作 */
sleep(1);
printf("线程 %d (%s) 完成\n", targ->id, targ->name);
/* 返回结果(通过 void* 传递) */
int *result = malloc(sizeof(int));
if (result) *result = targ->id * 100;
return result;
}
int main(void)
{
#define NUM_THREADS 3
pthread_t threads[NUM_THREADS];
thread_arg_t args[NUM_THREADS];
const char *names[] = {"Worker-A", "Worker-B", "Worker-C"};
/* 创建线程 */
for (int i = 0; i < NUM_THREADS; i++) {
args[i].id = i;
snprintf(args[i].name, sizeof(args[i].name), "%s", names[i]);
int ret = pthread_create(&threads[i], NULL, thread_func, &args[i]);
if (ret != 0) {
fprintf(stderr, "pthread_create 失败: %s\n", strerror(ret));
return EXIT_FAILURE;
}
}
/* 等待所有线程完成 */
printf("主线程等待所有工作线程...\n");
for (int i = 0; i < NUM_THREADS; i++) {
void *retval;
int ret = pthread_join(threads[i], &retval);
if (ret != 0) {
fprintf(stderr, "pthread_join 失败: %s\n", strerror(ret));
continue;
}
if (retval) {
int *result = (int *)retval;
printf("线程 %d 返回值: %d\n", i, *result);
free(result);
}
}
printf("所有线程已完成\n");
return EXIT_SUCCESS;
}
$ ./thread_basic
主线程等待所有工作线程...
线程 0 (Worker-A) 开始运行, TID 区域=0x7f8a0c000b70
线程 1 (Worker-B) 开始运行, TID 区域=0x7f8a0b800b70
线程 2 (Worker-C) 开始运行, TID 区域=0x7f8a0b000b70
线程 0 (Worker-A) 完成
线程 1 (Worker-B) 完成
线程 2 (Worker-C) 完成
线程 0 返回值: 0
线程 1 返回值: 100
线程 2 返回值: 200
所有线程已完成
4.2.2 pthread_detach():分离线程
如果不需要等待线程结束,可以将其设为分离状态:
/* 分离线程:线程终止时自动释放资源 */
pthread_t tid;
pthread_create(&tid, NULL, thread_func, NULL);
pthread_detach(tid);
/* 此后不能再对该线程调用 pthread_join() */
4.2.3 线程属性
/*
* thread_attr.c - 使用线程属性设置栈大小和分离状态
* 编译: gcc -Wall -o thread_attr thread_attr.c -lpthread
*/
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
static void *thread_func(void *arg)
{
(void)arg;
printf(" 线程: 栈大小已自定义\n");
return NULL;
}
int main(void)
{
pthread_t tid;
pthread_attr_t attr;
pthread_attr_init(&attr);
/* 设置分离状态 */
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
/* 设置栈大小为 4MB(默认通常为 8MB) */
pthread_attr_setstacksize(&attr, 4 * 1024 * 1024);
/* 查询栈大小 */
size_t stacksize;
pthread_attr_getstacksize(&attr, &stacksize);
printf("设置的栈大小: %zu 字节 (%.1f MB)\n",
stacksize, stacksize / (1024.0 * 1024.0));
pthread_create(&tid, &attr, thread_func, NULL);
pthread_join(tid, NULL);
pthread_attr_destroy(&attr);
return 0;
}
4.3 互斥锁 (Mutex)
4.3.1 互斥锁的作用
互斥锁保证同一时刻只有一个线程能访问共享资源,防止竞态条件(Race Condition)。
线程 A: lock(mtx) → 读写共享数据 → unlock(mtx)
线程 B: lock(mtx) → [等待] ..............→ 读写共享数据 → unlock(mtx)
4.3.2 无锁 vs 有锁对比
/*
* mutex_race.c - 演示竞态条件和互斥锁保护
* 编译: gcc -Wall -o mutex_race mutex_race.c -lpthread
*/
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define ITERATIONS 1000000
static long counter_no_lock = 0;
static long counter_with_lock = 0;
static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
static void *increment_no_lock(void *arg)
{
(void)arg;
for (int i = 0; i < ITERATIONS; i++)
counter_no_lock++; /* 数据竞争! */
return NULL;
}
static void *increment_with_lock(void *arg)
{
(void)arg;
for (int i = 0; i < ITERATIONS; i++) {
pthread_mutex_lock(&mtx);
counter_with_lock++;
pthread_mutex_unlock(&mtx);
}
return NULL;
}
int main(void)
{
pthread_t t1, t2;
/* 测试无锁 */
pthread_create(&t1, NULL, increment_no_lock, NULL);
pthread_create(&t2, NULL, increment_no_lock, NULL);
pthread_join(t1, NULL);
pthread_join(t2, NULL);
printf("无锁结果: 期望=%d, 实际=%ld (错误: %ld)\n",
ITERATIONS * 2, counter_no_lock,
ITERATIONS * 2 - counter_no_lock);
/* 测试有锁 */
pthread_create(&t1, NULL, increment_with_lock, NULL);
pthread_create(&t2, NULL, increment_with_lock, NULL);
pthread_join(t1, NULL);
pthread_join(t2, NULL);
printf("有锁结果: 期望=%d, 实际=%ld (错误: %ld)\n",
ITERATIONS * 2, counter_with_lock,
ITERATIONS * 2 - counter_with_lock);
pthread_mutex_destroy(&mtx);
return 0;
}
$ ./mutex_race
无锁结果: 期望=2000000, 实际=1843562 (错误: 156438)
有锁结果: 期望=2000000, 实际=2000000 (错误: 0)
4.3.3 互斥锁类型
| 类型 | 宏 | 说明 |
|---|---|---|
| 普通锁 | PTHREAD_MUTEX_NORMAL | 不检测死锁,默认类型 |
| 检错锁 | PTHREAD_MUTEX_ERRORCHECK | 重复加锁返回错误 |
| 递归锁 | PTHREAD_MUTEX_RECURSIVE | 同一线程可多次加锁 |
| 默认锁 | PTHREAD_MUTEX_DEFAULT | 平台相关(Linux 为普通锁) |
/* 初始化递归锁 */
pthread_mutex_t rmtx;
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(&rmtx, &attr);
pthread_mutexattr_destroy(&attr);
/* 递归锁允许同一线程多次加锁 */
pthread_mutex_lock(&rmtx);
pthread_mutex_lock(&rmtx); /* 成功(普通锁这里会死锁) */
pthread_mutex_unlock(&rmtx);
pthread_mutex_unlock(&rmtx);
4.4 条件变量 (Condition Variable)
4.4.1 概念
条件变量用于等待某个条件为真,典型配合互斥锁使用。它是线程间通知机制的核心:
等待方: 通知方:
lock(mtx) lock(mtx)
while (!条件) 设置条件为真
cond_wait(cv, mtx) cond_signal(cv) / cond_broadcast(cv)
使用共享资源 unlock(mtx)
unlock(mtx)
关键:必须使用
while循环而非if来检查条件(防止虚假唤醒 spurious wakeup)。
4.4.2 生产者-消费者模型
/*
* producer_consumer.c - 使用条件变量实现生产者-消费者
* 编译: gcc -Wall -o producer_consumer producer_consumer.c -lpthread
*/
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#define QUEUE_SIZE 10
#define ITEMS_TO_PRODUCE 20
static int queue[QUEUE_SIZE];
static int head = 0, tail = 0, count = 0;
static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t not_full = PTHREAD_COND_INITIALIZER;
static pthread_cond_t not_empty = PTHREAD_COND_INITIALIZER;
static int done = 0;
static void enqueue(int item)
{
queue[tail] = item;
tail = (tail + 1) % QUEUE_SIZE;
count++;
}
static int dequeue(void)
{
int item = queue[head];
head = (head + 1) % QUEUE_SIZE;
count--;
return item;
}
static void *producer(void *arg)
{
(void)arg;
for (int i = 0; i < ITEMS_TO_PRODUCE; i++) {
pthread_mutex_lock(&mtx);
/* 队列满时等待 */
while (count == QUEUE_SIZE)
pthread_cond_wait(¬_full, &mtx);
enqueue(i);
printf("[生产者] 生产 %d (队列长度: %d)\n", i, count);
pthread_cond_signal(¬_empty); /* 通知消费者 */
pthread_mutex_unlock(&mtx);
usleep(50000); /* 模拟生产耗时 */
}
/* 通知消费者生产完毕 */
pthread_mutex_lock(&mtx);
done = 1;
pthread_cond_broadcast(¬_empty);
pthread_mutex_unlock(&mtx);
return NULL;
}
static void *consumer(void *arg)
{
int id = *(int *)arg;
while (1) {
pthread_mutex_lock(&mtx);
/* 队列空时等待 */
while (count == 0 && !done)
pthread_cond_wait(¬_empty, &mtx);
if (count == 0 && done) {
pthread_mutex_unlock(&mtx);
break;
}
int item = dequeue();
printf(" [消费者 %d] 消费 %d (队列长度: %d)\n", id, item, count);
pthread_cond_signal(¬_full); /* 通知生产者 */
pthread_mutex_unlock(&mtx);
usleep(80000); /* 模拟消费耗时 */
}
return NULL;
}
int main(void)
{
pthread_t prod;
pthread_t cons[2];
int cons_ids[] = {0, 1};
pthread_create(&prod, NULL, producer, NULL);
for (int i = 0; i < 2; i++)
pthread_create(&cons[i], NULL, consumer, &cons_ids[i]);
pthread_join(prod, NULL);
for (int i = 0; i < 2; i++)
pthread_join(cons[i], NULL);
printf("所有生产/消费完毕\n");
pthread_mutex_destroy(&mtx);
pthread_cond_destroy(¬_full);
pthread_cond_destroy(¬_empty);
return 0;
}
4.5 读写锁 (Read-Write Lock)
4.5.1 适用场景
读写锁适用于读多写少的场景:多个读者可以同时读取,但写者需要独占访问。
| 操作 | 读者 | 写者 |
|---|---|---|
| 已有读者 | ✅ 并发读 | ❌ 等待 |
| 已有写者 | ❌ 等待 | ❌ 等待 |
4.5.2 读写锁示例
/*
* rwlock.c - 读写锁保护共享配置
* 编译: gcc -Wall -o rwlock rwlock.c -lpthread
*/
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
/* 共享配置 */
typedef struct {
char key[64];
char value[256];
int version;
} config_entry_t;
static config_entry_t config = {"server_port", "8080", 1};
static pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER;
/* 读线程:读取配置 */
static void *reader_thread(void *arg)
{
int id = *(int *)arg;
for (int i = 0; i < 3; i++) {
pthread_rwlock_rdlock(&rwlock);
printf("[读者 %d] key=%s, value=%s, version=%d\n",
id, config.key, config.value, config.version);
pthread_rwlock_unlock(&rwlock);
usleep(100000);
}
return NULL;
}
/* 写线程:更新配置 */
static void *writer_thread(void *arg)
{
(void)arg;
const char *new_values[] = {"8443", "9090", "443"};
for (int i = 0; i < 3; i++) {
usleep(200000); /* 等待读者先读 */
pthread_rwlock_wrlock(&rwlock);
strncpy(config.value, new_values[i], sizeof(config.value) - 1);
config.version++;
printf("[写者] 更新: value=%s, version=%d\n",
config.value, config.version);
pthread_rwlock_unlock(&rwlock);
}
return NULL;
}
int main(void)
{
pthread_t readers[3], writer;
int reader_ids[] = {0, 1, 2};
for (int i = 0; i < 3; i++)
pthread_create(&readers[i], NULL, reader_thread, &reader_ids[i]);
pthread_create(&writer, NULL, writer_thread, NULL);
for (int i = 0; i < 3; i++)
pthread_join(readers[i], NULL);
pthread_join(writer, NULL);
pthread_rwlock_destroy(&rwlock);
return 0;
}
4.6 屏障 (Barrier)
4.6.1 概念
屏障使一组线程在某一点同步等待,直到所有线程都到达该点才继续执行。
/*
* barrier.c - 使用屏障同步多阶段计算
* 编译: gcc -Wall -o barrier barrier.c -lpthread
*/
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#define NUM_THREADS 4
#define NUM_PHASES 3
static pthread_barrier_t barrier;
static void *worker(void *arg)
{
int id = *(int *)arg;
for (int phase = 0; phase < NUM_PHASES; phase++) {
/* 模拟计算 */
printf(" 线程 %d: 阶段 %d 计算中...\n", id, phase);
usleep((100000 + id * 50000));
/* 等待所有线程完成此阶段 */
int ret = pthread_barrier_wait(&barrier);
if (ret == PTHREAD_BARRIER_SERIAL_THREAD) {
/* 只有一个线程会收到此返回值 */
printf("=== 阶段 %d 全部完成 ===\n\n", phase);
}
}
return NULL;
}
int main(void)
{
pthread_t threads[NUM_THREADS];
int ids[NUM_THREADS];
pthread_barrier_init(&barrier, NULL, NUM_THREADS);
for (int i = 0; i < NUM_THREADS; i++) {
ids[i] = i;
pthread_create(&threads[i], NULL, worker, &ids[i]);
}
for (int i = 0; i < NUM_THREADS; i++)
pthread_join(threads[i], NULL);
pthread_barrier_destroy(&barrier);
printf("所有阶段完成\n");
return 0;
}
4.7 线程特有数据 (Thread-Specific Data)
4.7.1 POSIX Thread-Local Storage
/*
* thread_local.c - 线程特有数据(TSD)
* 编译: gcc -Wall -o thread_local thread_local.c -lpthread
*/
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
static pthread_key_t tsd_key;
static pthread_once_t once_control = PTHREAD_ONCE_INIT;
static void tsd_destructor(void *value)
{
printf(" TSD 析构: %s\n", (char *)value);
free(value);
}
static void tsd_init(void)
{
pthread_key_create(&tsd_key, tsd_destructor);
}
static void *worker(void *arg)
{
int id = *(int *)arg;
/* 确保 key 已创建 */
pthread_once(&once_control, tsd_init);
/* 设置本线程的特有数据 */
char *msg = malloc(64);
snprintf(msg, 64, "线程 %d 的私有数据", id);
pthread_setspecific(tsd_key, msg);
/* 读取本线程的特有数据 */
char *data = (char *)pthread_getspecific(tsd_key);
printf("[线程 %d] TSD = %s\n", id, data);
return NULL;
}
int main(void)
{
pthread_t threads[3];
int ids[] = {0, 1, 2};
for (int i = 0; i < 3; i++)
pthread_create(&threads[i], NULL, worker, &ids[i]);
for (int i = 0; i < 3; i++)
pthread_join(threads[i], NULL);
printf("所有线程已完成\n");
return 0;
}
4.8 自旋锁 (Spin Lock)
| 锁类型 | 等待方式 | 适用场景 |
|---|---|---|
| 互斥锁 (Mutex) | 阻塞(睡眠) | 一般场景,临界区较长 |
| 自旋锁 (Spin Lock) | 忙等待(循环) | 临界区极短,多处理器系统 |
| 读写锁 (RWLock) | 阻塞 | 读多写少 |
/* 自旋锁使用(适合极短临界区) */
pthread_spinlock_t spinlock;
pthread_spin_init(&spinlock, PTHREAD_PROCESS_PRIVATE);
pthread_spin_lock(&spinlock);
/* 极短的原子操作 */
counter++;
pthread_spin_unlock(&spinlock);
pthread_spin_destroy(&spinlock);
4.9 线程安全函数
POSIX 定义了线程安全函数(Thread-Safe Functions)和非线程安全函数:
| 非线程安全 | 线程安全替代 | 说明 |
|---|---|---|
strtok() | strtok_r() | 字符串分割 |
ctime() | ctime_r() | 时间转字符串 |
localtime() | localtime_r() | 获取本地时间 |
gmtime() | gmtime_r() | 获取 UTC 时间 |
rand() | rand_r() | 随机数生成 |
readdir() | readdir_r() | 目录遍历 |
gethostbyname() | gethostbyname_r() | DNS 查询 |
strerror() | strerror_r() | 错误码转字符串 |
规则:在多线程程序中,必须使用
_r(reentrant)后缀的版本。
4.10 注意事项
⚠️ 死锁预防:始终以相同顺序获取多个锁。使用检错锁(
PTHREAD_MUTEX_ERRORCHECK)便于调试。
⚠️ 条件变量虚假唤醒:条件变量可能在没有
signal/broadcast的情况下返回。必须用while循环检查条件。
⚠️ fork() 与线程:
fork()只复制调用线程,其他线程不复存在。持有锁的线程在子进程中消失会导致死锁。使用pthread_atfork()注册清理函数。
⚠️ 栈溢出:线程默认栈大小通常为 8MB。递归过深或大局部变量会导致栈溢出且没有明确的错误信号(段错误)。使用
pthread_attr_setstacksize()或减少递归深度。
⚠️ false sharing:不同线程频繁修改同一缓存行上的不同变量,会导致缓存一致性开销。使用对齐(
__attribute__((aligned(64))))将热点数据隔离到不同缓存行。
4.11 扩展阅读
man 7 pthreads— POSIX 线程概述man 3 pthread_create— 线程创建- APUE 第 11-12 章:Threads, Thread Control
- TLPI 第 29-33 章:POSIX Threads 系列
- 《Programming with POSIX Threads》 — David R. Butenhof 著
- glibc NPTL 实现源码:https://sourceware.org/git/?p=glibc.git;a=tree;f=nptl
4.12 本章小结
| 要点 | 说明 |
|---|---|
| pthread_create/join | 创建和等待线程 |
| 互斥锁 (Mutex) | 保护共享资源,防止竞态条件 |
| 条件变量 (Cond) | 线程间通知机制,配合互斥锁使用 |
| 读写锁 (RWLock) | 读多写少场景的优化 |
| 屏障 (Barrier) | 多线程同步点 |
| 线程特有数据 (TSD) | 线程私有存储 |
| 线程安全函数 | 使用 _r 后缀的可重入版本 |