Linux多线程
线程的概念
什么是线程
- 在Linux系统下,线程即轻量级进程(light weight process),线程是一个进程内部的控制序列
- 每一个进程至少包含一个线程,线程在进程的内部运行,线程的运行依附于进程,因为所有线程共享进程地址空间
- 站在CPU的视角,Linux系统下的PCB要更轻量化,因为Linux系统下,一个PCB代表一个轻量级进程
- 所有线程共享进程地址空间,通过进程地址空间,可以看到进程所拥有的资源,将进程的资源分配给每一个线程,就产生了线程的执行流
在Linux系统下的线程与其它操作系统不同,Linux系统没有为线程创建单独的数据结构,在Linux系统下创建一个线程,本质是创建了一个轻量级进程,内核只是为轻量级进程创建了一个PCB,没有创建进程地址空间和页表等结构
线程的优点
线程与进程相比,具有如下特点
- 创建一个线程要比创建一个进程的代价小很多,创建一个线程,只需要为其创建PCB,之后,线程将使用进程的地址空间和页表;而创建一个进程,不仅需要创建PCB,还需要创建进程地址空间、页表、文件描述符表等结构
- 线程的切换成本要比进程低很多,线程在切换时,不需要切换进程地址空间和页表,线程是在进程的进程地址空间中运行的,同时线程在切换时,不需要刷新CPU中缓存的数据(例如L1,L2,L3Cache中的数据),从硬件层面上决定了线程切换的成本不高;进程在切换时,需要切换进程地址空间和页表,同时CPU缓存的数据需要重新进行预热,代价较高
- 线程占用的资源少,与进程相比,线程只需要操作系统为其创建一个PCB即可,其它资源线程直接从进程“取”
- 多线程能够更充分的利用多处理器的可并行数量,在多处理器环境下,创建多线程能够更高效的使用CPU资源
- 使用多线程在等待慢速I/O操作结束的同时,程序可以执行其它的任务
- 多线程适合计算密集型场景,对于计算密集型应用,在多处理器的系统下,可以将计算任务分配给不同线程执行
- 多线程适合I/O密集型场景,对于I/O密集型应用,可以将I/O操作重叠,利用线程的并发特点,让不同的线程等待不同的I/O操作
线程具有以上优点的本质是Linux在设计线程的时候,采用了轻量级进程的做法,而没有单独创建一套线程的内核数据结构,Linux系统下的线程要比其它系统下的线程更加轻量化
线程的缺点
多线程虽然具有高并发、高可用的特点,但是在编码上难度较高,具有以下特点
- 性能损失,当计算密集型线程的数量比可用的处理器数量更多,且计算密集型线程处理的是很少被外部事件阻塞任务时,就可能会因为线程的频繁切换和调度产生性能上的损失,同时可能需要为了维护线程的同步而产生额外的开销。但性能损失并不是绝对的,应该根据具体场景综合分析
- 健壮性降低,多线程的一个优点是可以很容易的访问共享资源,但是多个线程之间由于缺乏保护和访问控制,在访问共享资源时,往往需要进行加锁
- 缺乏访问控制,访问控制的基本粒度是进程,在一个线程中如过调用某些函数可能会对整个进程造成影响,例如exec系列的进程程序替换函数
- 编程的难度提高,编写与调试一个多线程程序要比单线程的程序难度更高
虽然线程具有上述缺点,但在进行编码时,是可以通过一定的手段避免的,例如线程缺乏访问控制可以通过加锁的方式提供访问控制
线程异常
- 如果一个线程崩溃,极有可能导致整个进程崩溃,例如多线程中某一个线程出现段错误,就会导致整个进程退出。某一个线程出现除0错误,CPU状态寄存器中的状态标记位被设置,整个进程退出,一个进程下的所有线程是共享CPU状态寄存器中的状态标记位的
- 线程进程的地址空间中运行,线程出现异常,就容易导致进程出现异常,进而触发信号机制,使整个进程退出
int main() {
auto f = [] {
int *x = nullptr;
*x = 10;
};
std::thread t(f);
t.join();
printf("----------------------\n");
return 0;
}
线程的应用
- 合理的使用多线程,能够提高CPU密集型程序的执行效率
- 合理的使用多线程,可以提高IO密集型程序的用户体验
Linux下的线程与进程
进程与线程的特点
- 在操作系统的眼中,进程是承担分配系统资源的基本单位,线程是CPU调度的基本单位
- 线程共享进程的代码和数据,但是线程也有自己私有的一部分数据
线程的私有数据
- LWP,又称轻量级进程ID和线程号。LWP是CPU用于调度线程的重要依据,可以通过命令
ps -aL
或ps -Lf +进程pid
查看进程下线程的LWP - 寄存器数据,又称一组寄存器或处理器现场。线程的寄存器数据属于线程的上下文数据,每个线程私有
- 栈。线程的栈是私有的,线程使用的栈不是进程地址空间的栈区,线程的栈区由pthread库提供,每一个线程私有一份。线程的管理工作由操作系统和线程库共同完成,其中操作系统主要负责线程的调度工作,线程库负责维护线程的属性,线程栈区、线程的局部存储都是由线程库提供的
- errno,错误码线程是私有的(错误码不属于线程局部存储的数据)
- 信号屏蔽字,信号屏蔽字是PCB中的字段,线程私有
- 调度的优先级,线程的优先级也属于PCB的字段
线程间的共享数据
- 文件描述符表,进程的文件描述符表只有一份,所有线程共享
- 每种信号的处理方式,虽然handler是线程PCB中的字段,但所有线程对于信号的处理方式是一致的
- 当前工作目录(cwd)
- 用户id和组id
Linux线程控制
POSIX线程库
由于Linux系统本身没有提供轻量级进程的接口,因此需要借助第三方库——POSIX线程库完成线程控制,POSIX线程库又称原生线程库
- POSIX线程库中与线程相关的函数大部分名字都以
pthread_
开头 - 使用POSIX线程库,需要头文件
<pthread.h>
- 链接POSIX线程库的时候需要gcc/g++编译器带上
-lpthread
选项
创建线程
创建线程通过pthread_create
函数
#include<pthread.h>
int pthread_create(pthread_t* thread,const pthread_attr_t* attr,void*(*start_routine)(void*),void* arg);
参数
pthread_t* thread
,输出型参数,用于获取创建的线程的tidconst pthread_attr_t attr
,设置线程的属性,默认可设置为nullptr,表示设置默认属性,也可以创建pthread_attr_t的结构体来设置线程的分离属性void*(*start_routine)(void*)
,创建线程完毕,线程执行的回调函数void* arg
,给线程的回调函数传入的参数
返回值
调用成功返回0,调用失败返回错误码errno
void *ThreadRoutine(void *args) {
printf("hello world\n");
}
int main() {
pthread_t tid; // 接收线程的tid
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); // 分离
int ret = pthread_create(&tid, &attr, ThreadRoutine, NULL);
if (ret) {
printf("pthread_create fail,%s\n", strerror(ret));
}
pthread_attr_destroy(&attr);
sleep(1);
return 0;
}
LWP与tid
- LWP,又称轻量级进程ID,又称线程号,是操作系统用来区分一个进程中不同线程的标准,也是操作系统调度线程的依据
- tid称为线程id,通过
pthread_self
函数可以得到线程的tid,线程的tid本质是一个地址,tid属于NPTL线程库的范畴(Native POSIX Thread Library,本地POSIX线程库),线程库中对线程的操作,都是通过tid完成的
tid的本质就是进程地址空间共享区中mmap区域的一个地址
线程终止
与进程类似,线程在终止时也会出现僵尸线程的问题,需要对僵尸线程进行处理。
线程终止的常见方式
- 在线程函数中调用return返回。该方法对于主线程不适用,在main线程中调用return返回时终止整个进程
- 线程调用
pthread_exit
系统调用- 一个线程中可以通过调用
pthread_cacnel
终止同一个进程中的另外一个线程,一般是主线程调用pthread_cancel终止子线程,尽量不要使用子线程调用pthread_cancel终止主线程,因为其行为是未定义的
pthread_exit
void pthread_exit(void* retval);
参数retval表示线程的退出结果,如果需要自定义退出结果,需要保证retval指向的对象在线程的回调函数调用完毕后不会销毁,否则可能出现问题
struct ThreadExitData {
int num;
char name[20];
};
void *ThreadRoutine(void *args) {
printf("hello world\n");
struct ThreadExitData *pd = (struct ThreadExitData *)malloc(sizeof(struct ThreadExitData));
pd->num = 5;
strcpy(pd->name, "Thread A");
pthread_exit(pd);
return pd;
}
int main() {
pthread_t tid;
pthread_create(&tid, NULL, ThreadRoutine, NULL);
struct ThreadExitData *ret;
pthread_join(tid, (void **)&ret); // 使用pthread_join回收线程
printf("子线程的编号是%d,子线程的名字是%s\n", ret->num, ret->name);
free(ret);
return 0;
}
pthread_cancel
int pthread_cancel(pthread_t thread);
参数:要终止的线程的tid
返回值:调用成功返回0,失败返回错误码
若主线程调用pthread_cancel终止子线程,那么子线程的退出结果是(void*)-1
,在系统中用PTHREAD_CANCELED宏表示
void *ThreadRoutine(void *args) {
printf("hello world\n");
sleep(2);
return NULL;
}
int main() {
pthread_t tid;
pthread_create(&tid, NULL, ThreadRoutine, NULL);
sleep(1);
pthread_cancel(tid); // 取消子线程
int *ret;
pthread_join(tid, (void **)&ret);
printf("子线程的退出结果是%p\n", ret);
printf("子线程的退出结果是%d\n", (int)ret); //-1
return 0;
}
在某些情况下,pthread_cancel函数可能会失效,此时需要在子线程中调用pthread_testcancel
手动设置取消点
线程等待
线程等待的必要性:
- 已经退出的线程,它的PCB内核结构没有被释放
- 创建新的线程不会复用僵尸线程的PCB,如果不对僵尸线程进行回收,会导致资源泄漏
线程等待使用pthread_join
接口
int pthread_join(pthread_t thread,void** retval);
参数:pthread_join函数的第二个参数是一个void**
类型,用于接收子线程的退出结果,如果不关心子线程的退出结果,可以设置为NULL
返回值:成功返回0,失败返回错误码
调用pthread_join函数的线程将阻塞等待并回收子线程,如果子线程是被cancel而终止,退出结果是PTHREAD_CANCELED
#define PTHREAD_CANCELED ((void*)-1)
一般情况下主线程调用pthread_join回收子线程,但是pthread_join可以用在兄弟线程之间,兄弟线程之间可以调用pthread_join相互回收,这是与进程不同的地方,进程回收只能是父进程回收子进程
线程分离
- 在默认情况下,新创建的线程是可回收的(joinable),且一般需要回收,否则可能导致僵尸线程问题
- 如果不关心线程的退出结果,可以使用
pthread_detach
将线程设置为分离状态,此时就不需要通过pthread_join回收线程,也不能通过pthread_join回收线程,操作系统会自动回收线程
int pthread_detach(pthread_t thread);
线程可以自己调用pthread_detach分离自己,也可以兄弟线程之间调用pthread_detach进行线程分离
void *ThreadRoutine(void *args) {
printf("hello world\n");
return NULL;
}
int main() {
pthread_t tid;
pthread_create(&tid, NULL, ThreadRoutine, NULL);
pthread_detach(tid);
sleep(1);
int ret = pthread_join(tid, NULL);
if (ret) {
printf("pthread join error,%s\n", strerror(ret)); // Invalid argument
}
return 0;
}
线程互斥
基本概念
多线程在访问共享资源时,由于CPU调度的随机性,共享数据可能被污染,因此,需要借助一种机制,让同一时刻,只能有一个线程在访问公共资源,这种机制称为互斥。
与互斥相关的基本概念
- 临界资源:把多执行流共享的资源称为临界资源,一般临界资源可以是全局变量、堆空间等
- 临界区:将每一个线程内部访问临界资源的代码称为临界区,一般临界区都比较简短,线程执行的代码除了临界区代码之外,其余的均是正常代码,一般认为线程线程执行正常代码花费的时间较长,执行临界区代码花费的时间较短(去除阻塞在锁上的时间),这是后续理解生产者消费者模型具有高并发特点的关键。
- 互斥:任何时刻,只能有一个线程访问临界资源,其它线程不能访问临界资源,称为互斥,互斥保证了临界资源的安全
- 原子性:不会被任何调用机制打断的操作,称为原子操作,原子操作一般在汇编层面上只对应一条汇编指令,CPU要么执行完,要么不执行,执行与否只有两态,这种性质称为原子性
互斥量mutex
背景
- 在大部分情况下,线程使用的数据都是局部变量,变量的地址在线程栈上,在这些情况下,数据归属单个线程,其它线程一般无法获得这些数据(实际上可以通过指针访问,但一般不这样做)
- 但有时,很多数据是线程间共享的,线程之间可以通过共享数据完成交互,这比进程间通信的成本要低很多
- 当多线程并发的访问共享数据时,可能会带来一些问题
int ticket = 1000;
void *BuyTicket(void *args) {
while (ticket > 0) {
printf("get a ticket,rest is %d\n", --ticket);
}
return NULL;
}
int main() {
const int N = 2; // 线程数量
pthread_t arr[N];
for (size_t i = 0; i < N; i++) {
pthread_create(arr + i, NULL, BuyTicket, NULL);
}
for (size_t i = 0; i < N; i++) {
pthread_join(arr[i], NULL);
}
return 0;
}
上面代码多次执行,可能出现不同的结果,ticket可能会减到-1,原因在于–ticket操作不是原子的,多个线程同时访问共享数据ticket时出现了数据混乱。
–ticket操作对应3条汇编指令
- load:将ticket从内存加载到寄存器
- update:将寄存器中的值-1
- store:将寄存器中的值写回内存
由于多线程环境下,每一个线程的调度是随机的,当线程A在执行–ticket时,可能被切换,带走自己的上下文数据,且此时线程A的3条汇编指令没有执行完毕,只是执行了一部分,线程B执行–ticket,当线程B执行完毕后,线程A继续执行,可能从汇编的第3步开始执行,就覆盖了线程B已经写到内存中的数据。
要解决数据混乱的问题,需要做到以下3点
- 代码必须要有互斥行为,当一个线程进入临界区使,不允许其他线程进入
- 若多个线程同时要求访问临界区,应当通过某种手段保证只有一个线程能够访问
- 如果线程没有执行临界区的代码,那么这个线程不能阻止其他人进入临界区
要做到以上3点,需要一把锁锁住临界区,Linux系统下该锁称为互斥量。
互斥量的接口
pthread_mutex_init
初始化互斥量,初始化互斥量有2种方法:动态初始化和静态初始化,也称动态分配和静态分配
静态分配
pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER;
动态分配
int pthread_mutex_init(pthread_mutex_t* restrict mutex,const pthread_mutexattr_t* restrict attr);
参数:第1个参数表示互斥锁,第二个参数表示设置互斥锁的属性
返回值:成功返回0,失败返回错误码
restrict
关键字是c语言标准的一部分,用于指示编译器在编译过程中可以优化代码,用restrict关键字修饰指针mutex和attr,表示这两个指针不会与其它指针别名
pthread_mutex_destroy
int pthread_mutex_destroy(pthread_mutex_t* mutex);//销毁互斥锁
互斥锁的初始化和销毁
pthread_mutex_t mtx;
pthread_mutex_init(&mtx,NULL);
pthread_mutex_destroy(&mtx);
加锁和解锁
int pthread_mutex_lock(pthread_mutex_t* mutex);
int pthread_mutex_unlock(pthread_mutex_t* mutex);
在调用pthread_lock函数时,可能出现2种情况:
- 互斥锁处于未锁状态,线程调用pthread_lock函数会获得互斥锁,并成功返回
- 互斥锁处于锁定状态,线程调用pthread_lock函数会被阻塞,知道互斥锁解除
使用互斥锁改进售票系统
int ticket = 10000;
pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
void *BuyTicket(void *args) {
while (!pthread_mutex_lock(&mtx) && ticket > 0) {
printf("get a ticket,rest is %d\n", --ticket);
pthread_mutex_unlock(&mtx);
}
pthread_mutex_unlock(&mtx); // 走到这里表示加锁成功且ticket已经减为0
return NULL;
}
int main() {
const int N = 20; // 线程数量
pthread_t arr[N];
for (size_t i = 0; i < N; i++) {
pthread_create(arr + i, NULL, BuyTicket, NULL);
}
for (size_t i = 0; i < N; i++) {
pthread_join(arr[i], NULL);
}
return 0;
}
互斥锁的原理
互斥锁本身也是一种临界资源,但是线程在加锁和解锁时不会出现任何问题,因为加锁和解锁操作可以认为是原子的,加锁和解锁操作的原子性由swap
或exchange
汇编指令实现
加锁操作的汇编代码
lock:
movb $0,%al #将0放到寄存器中(寄存器的数据是线程的上下文)
xchgb %al,mutex #将寄存器中的值与内存中mutex的值交换(一条汇编)
if($(al)>0){
return 0;
}else{
挂起等待;
goto lock;
}
加锁操作是原子的,根本原因是xchgb %al,mutex
通过一条汇编将内存中共享的mutex变成了线程上下文的私有数据,当一个线程加锁成功,其他线程再次xchgb时,得到是0,只能被阻塞挂起
解锁操作的汇编代码
unlock:
movb $1,mutex
唤醒阻塞在锁上的线程;
return 0;
解锁操作就是修改线程共有的mutex内存数据为1
可重入与线程安全
基本概念
- 线程安全:线程安全是指多个线程在没有加锁的情况下并发执行同一段代码,不会出现不同的结果,线程安全偏向于描述线程的特性
- 可重入:指同一个函数被不同执行流并发调用,不会出现问题,这样的函数称为可重入函数,可重入函数是针对函数而言的,描述的是函数的特性
一般的函数都是不可重入函数,可重入函数的占比较少,且编写成本较高,STL中大部分容器提供的接口都是不可重入函数
线程不安全的场景
常见的线程不安全有以下场景:
- 不保护共享变量的函数
- 函数的状态会随着函数的调用而发生变化
- 函数中存在指向静态变量的指针
- 函数中调用了非线程安全的函数
线程安全的场景
常见的线程安全有以下场景:
- 线程对全局变量或静态变量只做读取,不做写入
- 类或者接口对于线程而言都是原子操作,例如c++中提供的原子类
- 多线程之间切换不会导致执行结果出现二义性
不可重入的场景
常见的不可重入有以下场景:
- 函数调用了malloc/free,由于malloc函数采用全局链表管理堆区空间,因此若函数调用了malloc/free,则该函数是不可重入的。需要注意的是,malloc函数虽然使用全局链表管理堆空间,但是在多线程场景下,malloc函数内部是存在锁的,因此多线程场景下通过malloc函数申请空间是可以的。
- 线程调用了标准I/O库函数,标准I/O库中的很多函数都使用了全局数据结构
- 函数体内使用了静态的数据结构
可重入的场景
常见的可重入有以下场景:
- 函数不使用全局数据
- 不使用malloc/new申请堆区空间
- 函数不调用不可重入函数
- 函数不返回静态数据和全局数据
- 函数只使用本地数据,或者函数虽然使用了全局数据,但是是通过制作全局数据的拷贝来使用的
可重入与线程安全的联系
- 一个函数如果是可重入的,那么就是线程安全的,反之则不一定
- 一个函数如果不可重入,有可能引发线程安全问题,也有可能不会引发线程安全,这取决于操作系统调度线程的时机
- 如果一个函数中有全局变量,那么这个函数既不是线程安全的也不是可重入函数
可重入与线程安全的区别
- 可重入函数只是线程安全函数的一种,有些函数不是可重入函数,但是满足线程安全的条件
- 可重入函数一定是线程安全的,但是线程安全不一定是可重入的
- 若函数对于临界资源的访问使用了加锁策略,那么这个函数是线程安全的,但是这个函数不可重入,因为重入这个函数可能导致死锁问题
锁
死锁
死锁是指在多线程环境下,线程占有公共资源,且不释放,同时线程申请占用其他线程不会释放的资源,造成一种永久等待的状态。
一般造成死锁有2种情况:
- 一个线程重复加锁
- 线程A持有锁1,请求锁2,线程B持有锁2,请求锁1
这里演示第二种情况
int x = 10; // 规定访问x需要2把锁
pthread_mutex_t mtx1 = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t mtx2 = PTHREAD_MUTEX_INITIALIZER;
void *AFunc(void *args) {
pthread_mutex_lock(&mtx1);
sleep(1);
pthread_mutex_lock(&mtx2);
printf("++x,x=%d\n", ++x);
pthread_mutex_unlock(&mtx1);
pthread_mutex_unlock(&mtx2);
}
void *BFunc(void *args) {
pthread_mutex_lock(&mtx2);
sleep(1);
pthread_mutex_lock(&mtx1);
printf("++x,x=%d\n", ++x);
pthread_mutex_unlock(&mtx2);
pthread_mutex_unlock(&mtx1);
}
int main() {
pthread_t Atid;
pthread_t Btid;
pthread_create(&Atid, NULL, AFunc, NULL);
pthread_create(&Btid, NULL, BFunc, NULL);
pthread_join(Atid, NULL);
pthread_join(Btid, NULL);
return 0;
}
死锁的四个必要必要条件
- 互斥条件:一个资源每一次只能被一个线程使用
- 请求与保持条件:一个线程因为请求资源而被阻塞时,不会释放已经获得的资源
- 不剥夺条件:一个线程已经获得的资源,不能够被其它线程强行剥夺
- 循环等待条件:若干线程之间形成形成一种头尾连接的循环等待资源的关系
避免死锁
避免死锁的方式
- 破坏造成死锁的4个必要条件
- 在加锁是保证加锁顺序要一致
- 避免加锁后没有释放锁
- 在分配资源时进行一次性分配
避免死锁的常见算法
-
资源分配图算法
资源分配图使用了图这种数据结构,基本思想是将系统中的资源和进程分别用节点表示,并根据资源的请求和释放关系建立边,然后通过对资源分配图进行遍历,检测是否存在环路来判断是否发生死锁。如果存在环路,则说明发生了死锁
-
银行家算法
银行家算法是一种预防死锁的算法,其思想来自于银行向外放贷款。进程在申请资源之前必须告诉系统需要多少资源。如果系统有足够的资源满足该进程的请求,则可以分配资源给该进程,否则进程必须等待。当系统中没有足够的资源分配给任何进程时,进程必须等待,直到有足够的资源为止。
-
等待图算法
等待图算法是一种通过检测等待关系来判断死锁的算法。等待图是一个有向图,节点表示进程或资源,边表示进程等待资源的关系。如果等待图中存在环路,则说明发生了死锁。
-
强连通分量算法
强连通分量算法是一种通过检测系统资源占用情况来判断死锁的算法。强连通分量指的是在一个有向图中,每个节点都可以到达其他节点的集合。如果系统资源被强连通分量占用,而强连通分量中的每个节点都在等待其它节点的资源,则发生了死锁。
线程同步
条件变量
加锁虽然能够解决共享数据的安全问题,但是存在不合理的地方:当线程互斥的访问共享资源时,一个线程访问到了共享资源,其它线程什么也做不了,只能阻塞在锁上,例如线程A访问全局队列,若发现队列为空,只能等待其它线程向队列中添加一个结点,至于其它线程什么时候添加,线程A自己也不知道,在这样的情况下,就需要使用条件变量,条件变量是一种线程之间的通知机制
同步与竞态条件
同步:在能够确保数据安全的前提下,让线程能够按照某种顺序访问临界资源,线程对临界资源的访问机会是平均的,而不是多个线程竞争时,其中一个线程的竞争力强,频繁的加锁解锁访问临界资源
竞态条件:因为时序问题导致程序或数据出现异常
条件变量函数
初始化和销毁
int pthread_cond_init(pthread_cond_t*restrict cond,const pthread_condattr_t* restrict attr);
int pthread_cond_destroy(pthread_cond_t* cond);
条件变量的初始化也有静态分配和动态分配2种方式,其中,静态分配的写法:pthread_cond_t cond=PTHREAD_COND_INITIALIZER
等待条件满足
int pthread_cond_wait(pthread_cond_t*restrict cond,pthread_mutex_t* restrict mutex);
pthread_cond_wait函数的第二个参数是一般互斥锁,在调用pthread_cond_wait函数之前要进行加锁,因为在调用pthread_cond_wait函数之前要判断“某种条件”是否满足,如果不满足,才调用pthread_cond_wait函数,而“某种条件”本身就是属于临界资源。
唤醒等待
int pthread_cond_broadcast(pthread_cond_t* cond);
int pthread_cond_signal(pthread_cond_t* cond);
broadcast表示唤醒阻塞在条件变量上的所有线程,signal表示唤醒阻塞队列的一个队头线程
一般在使用条件变量时,都是一个线程阻塞在条件变量上,然后其他线程唤醒,当阻塞在条件变量上的线程被唤醒时,会自动加锁,此时该线程应该做的不是直接访问资源,而是再次判断条件是否成立
条件变量的简单demo:
struct ThreadData {
pthread_mutex_t *pmtx;
pthread_cond_t *pcond;
};
int x = 0; // x=0表示x是无效值
void *ThreadRead_x(void *args) {
struct ThreadData *ptd = (struct ThreadData *)args;
int cnt = 10;
while (cnt--) {
pthread_mutex_lock(ptd->pmtx);
while (x == 0) { // 使用while
// 说明x是无效值
pthread_cond_wait(ptd->pcond, ptd->pmtx);
}
printf("We get a valid x with a value of %d\n", x);
x = 0; // 将x置为无效,下一次读取新值
pthread_cond_broadcast(ptd->pcond); // 唤醒阻塞在条件变量上的写线程
pthread_mutex_unlock(ptd->pmtx);
}
return NULL;
}
void *ThreadWrite_x(void *args) {
struct ThreadData *ptd = (struct ThreadData *)args;
int cnt = 10;
while (cnt--) {
pthread_mutex_lock(ptd->pmtx);
while (x != 0) { // 使用while
// 说明x是有效值,不要进行write将有效值覆盖
pthread_cond_wait(ptd->pcond, ptd->pmtx);
}
printf("x is in an invalid state and will be written to %d\n", x = rand() % 100 + 1);
pthread_cond_broadcast(ptd->pcond); // 唤醒阻塞在条件变量上的读线程
pthread_mutex_unlock(ptd->pmtx);
}
return NULL;
}
int main() {
srand((unsigned int)time(NULL));
struct ThreadData td;
pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
td.pmtx = &mtx;
td.pcond = &cond;
pthread_t wtid, rtid;
pthread_create(&wtid, NULL, ThreadWrite_x, &td);
pthread_create(&rtid, NULL, ThreadRead_x, &td);
pthread_join(wtid, NULL);
pthread_join(rtid, NULL);
return 0;
}
实际开发中,条件变量应该搭配生产者消费者模型使用
生产者消费者模型
基本概念
生产者消费者模型指的是在多线程的环境下,一些线程充当生产者的角色,向共享区域中写入数据或任务,一些线程充当消费者的角色,从共享区域中读取任务或数据,生产者消费者模型是将线程角色化的一种的方式。
在多线程的生产者消费者中,需要维护3种关系:
- 生产者线程与生产者线程之间的关系
- 生产者线程与消费者线程之间的关系
- 消费者线程与消费者线程之间的关系
存在2种角色:生产者和消费者。除此之外,还有一个公共的缓冲区
生产者消费者模型是通过一个容器(本质是一段共享的缓冲区)来解决生产者线程与消费者线程之间的强耦合问题,生产者和消费者之间不能直接通信,需要借助缓冲区进行通信。
在生产者消费者模型中,生产者与消费者之间的耦合度低,生产者只负责向缓冲区中生产数据,消费者只负责读取缓冲区中的数据,生产者与消费者之间完成了解耦。
生产者消费者模型的优点
- 生产者与消费者之间解耦
- 支持并发。这里的支持并发并不是指的生产者的生产和消费者的消费过程支持并发,生产者向缓冲区写入数据和消费者从缓冲区读取数据都是需要加锁的,这里的支持并发指的是生产者在向缓冲区写入数据或任务时,消费者可以处理从缓冲区读取到的任务。一般认为生产者向缓冲区写入数据和消费者从缓冲区读取数据花费的时间很短,生产者大部分时间上都是在生产数据,向缓冲区中写入数据只花费很少时间,而消费者大部分时间都是在处理数据,从缓冲区中读取数据只花费很少时间
- 支持忙闲不均
基于阻塞队列的生产者消费者模型
blockqueue
在多线程编程中,阻塞队列是一种常用于实现生产者消费者模型的数据结构,阻塞队列与普通队列的区别在于:阻塞队列的大小是固定的,当队列为空时,消费者线程想要从队列中获取任务会被阻塞,当队列为满时,生产者线程向队列中写入任务会被阻塞。
基于阻塞队列实现生产者消费者模型需要使用互斥锁和条件变量
c++queue模拟实现阻塞队列的生产者消费者模型
template <typename T>
class BlockQueue {
public:
BlockQueue(size_t maxsize = 5)
: _cap(maxsize) {
pthread_cond_init(&_full, nullptr);
pthread_cond_init(&_empty, nullptr);
pthread_mutex_init(&_mtx, nullptr);
}
~BlockQueue() {
pthread_cond_destroy(&_full);
pthread_cond_destroy(&_empty);
pthread_mutex_destroy(&_mtx);
}
void Push(const T &task) {
// 向队列中放入数据或任务
pthread_mutex_lock(&_mtx);
while (_buffer.size() == _cap) {
// 表示队列已满
pthread_cond_wait(&_full, &_mtx);
}
_buffer.push(task);
pthread_cond_broadcast(&_empty); // 消费者线程可以读取任务
pthread_mutex_unlock(&_mtx);
}
const T &Pop() {
pthread_mutex_lock(&_mtx);
while (_buffer.empty()) {
// 表示队列为空
pthread_cond_wait(&_empty, &_mtx);
}
const T &task = _buffer.front();
_buffer.pop();
pthread_cond_broadcast(&_full); // 生产者线程可以生产任务
pthread_mutex_unlock(&_mtx);
return task;
}
private:
queue<T> _buffer;
size_t _cap; // 阻塞队列容量
pthread_cond_t _full; // 满的条件变量
pthread_cond_t _empty; // 空的条件变量
pthread_mutex_t _mtx; // 互斥锁
};
void TestBlockQueue() {
srand((unsigned int)time(nullptr));
using func_t = function<void(int, int)>;
auto &&add = [](int x, int y) { cout << x << '+' << y << '=' << x + y << endl, sleep(1); };
auto &&sub = [](int x, int y) { cout << x << '-' << y << '=' << x - y << endl, sleep(1); };
auto &&mul = [](int x, int y) { cout << x << '*' << y << '=' << x * y << endl, sleep(1); };
auto &&div = [](int x, int y) { cout << x << '/' << y << '=' << x / y << endl, sleep(1); }; // sleep(1)模拟消费者处理任务的时间
vector<func_t> taskv = {add, sub, mul, div};
BlockQueue<func_t> myqueue(10); // 阻塞队列中存放任务
const int C_NUM = 5; // 消费者线程的数量
const int P_NUM = 5;
thread carr[C_NUM];
thread parr[P_NUM];
auto PThreadFunc = [&myqueue, &taskv](std::string name) { // 传入线程名
int cnt = 20;
while (cnt--) {
myqueue.Push(taskv[rand() % 4]);
cout << "A task has been added by" << name << endl;
}
};
auto CThreadFunc = [&myqueue, &taskv](std::string name) {
int cnt = 20;
while (cnt--) {
const func_t &task = myqueue.Pop();
cout << name << " start execute task" << endl;
task(rand() % 100, rand() % 100);
}
};
for (size_t i = 0; i < C_NUM; i++) {
carr[i] = thread(CThreadFunc, string("consumer ") + to_string(i));
}
for (size_t i = 0; i < P_NUM; i++) {
parr[i] = thread(PThreadFunc, string("producer ") + to_string(i));
}
for (size_t i = 0; i < C_NUM; i++) {
carr[i].join();
}
for (size_t i = 0; i < P_NUM; i++) {
parr[i].join();
}
}
POSIX信号量
POSIX信号量与SystemV信号量的作用类似,均用于同步操作,区别在于POSIX信号量可以用于实现线程间同步。互斥锁保证了线程的互斥,让同一时刻只能有一个线程在访问共享资源,POSIX信号量可以保证线程的同步的和互斥,它允许同一时刻多个不同的线程访问临界资源的不同区域。
POSIX信号量的一套接口与互斥锁无关,POSIX信号量的使用与场景有关,通常使用到POSIX信号量的场景都需要一把互斥锁
初始化和销毁信号量
#include<semaphore.h>
int sem_init(sem_t* sem,int pshared,unsigned int value);
int sem_destroy(sem_t* sem);
sem_init函数的参数
- pshared,表示信号量是否设置进程间共享,pshared=0表示信号量线程间共享,为其他值表示信号量进程间共享,一般设置为0
- value,设置信号量的初值。信号量的本质是一把计数器,申请信号量相当于对信号量进行–操作,当信号量减为0时,继续申请信号量线程会被阻塞。释放信号量相当于对其进行++操作。信号量的申请和释放操作都是原子的
等待信号量和发布信号量
int sem_wait(sem_t* sem);//申请信号量
int sem_post(sem_t* sem);//发布信号量
基于环形队列的生产者消费者模型
思路:使用数组模拟环形队列,使用模运算来模拟环状特征。使用信号量来决定生产者或消费者是否有资格访问环形队列,若有资格访问,则进行加锁访问,若没有资格访问,则生产者或消费者直接阻塞在信号量上,以此实现线程间的同步
template <class T>
class CircleQueue {
public:
CircleQueue(size_t maxsize = 10)
: _cap(maxsize), _arr(maxsize) {
pthread_mutex_init(&_mtx, nullptr);
sem_init(&_leftsapce, 0, maxsize);
sem_init(&_lefttask, 0, 0);
}
~CircleQueue() {
pthread_mutex_destroy(&_mtx);
sem_destroy(&_leftsapce);
sem_destroy(&_lefttask);
}
void Push(const T &task) {
sem_wait(&_leftsapce); // 在有空格的前提下才能生产
pthread_mutex_lock(&_mtx);
_arr[_producer_step++] = task;
_producer_step %= _cap;
pthread_mutex_unlock(&_mtx);
sem_post(&_lefttask); // 每生产一个,任务的数量+1
}
T Pop() {
sem_wait(&_lefttask); // 有任务的前提下才能Pop
pthread_mutex_lock(&_mtx);
T task = _arr[_consumer_step++];
_consumer_step %= _cap;
pthread_mutex_unlock(&_mtx);
sem_post(&_leftsapce); // 每Pop一个,空格的数量+1
return task;
}
private:
sem_t _leftsapce; // 剩余空格的数量
sem_t _lefttask; // 剩余任务的数量
pthread_mutex_t _mtx;
vector<T> _arr;
size_t _cap; // 环形队列的大小
size_t _consumer_step = 0;
size_t _producer_step = 0;
};
线程池
池化技术
所谓池化技术,指的是先向系统申请过量的资源,等到要使用时,将已申请的资源切成小块分配出去,这样减少了系统调用的次数,常见的池化技术有内存池、连接池、对象池、线程池,c++的STL中,空间配置器也使用了池化技术。
线程池指的是先申请一些线程,让它们处于阻塞状态,当有任务到来时指派其中一个线程去完成任务,任务完成后,该线程继续处于阻塞状态,等待下一次派发任务。
对性能要求较为苛刻的应用常使用线程池,例如服务端程序,线程池能够接受突发性的大量请求,使得服务器不至于在短时间内创建大量线程,造成资源紧张。
template <class T>
class ThreadPool {
public:
~ThreadPool() {
pthread_mutex_destroy(&_mtx);
pthread_cond_destroy(&_read);
}
void Run() {
for (size_t i = 0; i < _cap; i++) {
// 线程回调函数的类型是void*(*)(void*),不含this指针
pthread_create(_tidarr.data() + i, nullptr, Uniform_Entry, this);
}
}
void Push(const T &task) {
pthread_mutex_lock(&_mtx);
_buffer.push(task);
pthread_cond_signal(&_read);
pthread_mutex_unlock(&_mtx);
}
static ThreadPool<T> *GetInstance() {
static ThreadPool<T> _inst;
return &_inst;
}
private:
ThreadPool(size_t maxsize = 10)
: _cap(maxsize), _tidarr(maxsize) {
pthread_mutex_init(&_mtx, nullptr);
pthread_cond_init(&_read, nullptr);
}
static void *Uniform_Entry(void *args) {
ThreadPool<T> *thisptr = reinterpret_cast<ThreadPool<T> *>(args);
while (1) {
pthread_mutex_lock(&thisptr->_mtx);
while (thisptr->_buffer.empty()) {
pthread_cond_wait(&thisptr->_read, &thisptr->_mtx);
}
T task = thisptr->_buffer.front();
thisptr->_buffer.pop();
pthread_mutex_unlock(&thisptr->_mtx);
task(); // 执行任务
}
return nullptr;
}
private:
pthread_mutex_t _mtx;
pthread_cond_t _read; // 能否读取数据
queue<T> _buffer;
size_t _cap; // 线程的数量
vector<pthread_t> _tidarr; // 保存线程的tid
};
单例模式
基本概念
在整个进程中,一些类只允许具有一个实例化的对象,这些类就被成为单例,在c++中,单例模式有2种实现方式:饿汉模式和懒汉模式
饿汉模式与懒汉模式
饿汉模式实现单例
template<typename T>
class A{
static A _inst;
public:
static A<T>* GetInstance(){
return &_inst;
}
private:
A()=default;
A(const A&)=delete;
A& operator=(const A&)=delete;
};
饿汉模式实现单例,类的声明在.h文件,_inst
的定义在.cpp中,避免因为重复包函.h造成_inst
重定义,使用饿汉模式,_inst对象在main函数被调用之前就已经形成,没有线程安全的问题
懒汉模式实现单例
//方式一
class A{
public:
static A* GetInstance(){
static A _inst;//需要的时候在进行实例化
return &_inst;
}
private:
A()=default;
A(const A&)=delete;
A& operator=(const A&)=delete;
};
//方式2
class A{
static A* _inst;//在.cpp中设置初始值为nullptr
public:
static A* GetInstance(){
if(_inst==nullptr){
_inst=new A;
}
return _inst;
}
private:
A()=default;
A(const A&)=delete;
A& operator=(const A&)=delete;
};
其中,方式1是线程安全的,即使有多个线程同时调用GetInstance函数,_inst也只会被初始化一次,c++11标准规定静态局部变量的初始化是线程安全的,在多线程环境下,多个线程同时访问一个静态局部变量的初始化,编译器会确保只有一个线程初始化这个变量,其它线程会进行等待直到初始化完毕。c++11标准要求静态局部变量在初始化中,编译器必须生成线程安全的代码,这通常通过原子操作或互斥锁完成。
方式2是非线程安全的,当多个线程同时访问GetInstance时,对_inst的读取和修改非线程安全,可能会调用多次_inst=new A
,正确写法应该加锁。
class A{
//volatile关键字用于告诉编译器不要优化
volatile static A* _inst;//在.cpp中初始化为nullptr
static mutex _mtx;
public:
static A* GetInstance(){
if(_inst==nullptr){
_mtx.lock();//加锁对_inst修改
if(_inst==nullptr){
_inst=new A;
}
_mtx.unlock();
}
return _inst;//可以确保多个线程同时访问时,只有一个线程可以调用到_inst=new A,同时2层if判断避免了不必要的锁竞争
}
private:
A()=default;
A(const A&)=delete;
A& operator=(const A&)=delete;
}
STL、智能指针与线程安全
STL容器是否线程安全
c++的STL容器在设计时主要考虑的问题是效率,追求效率达到极致,未考虑线程安全问题,在多线程环境下使用STL时,应该由使用者自行制定锁策略来确保线程安全
智能指针是否线程安全
c++中的智能指针不是线程安全的,但是可以通过某些策略减小线程安全的问题。
智能指针是一个类,通常用来管理动态分配的内存,使用RAII的思想,利用对象的生命周期来控制资源的释放。在C++11中的2个重要的智能指针是shared_ptr和unique_ptr,shared_ptr支持多个指针共享一块内存,通过引用计数实现资源的释放。unique_ptr只支持一个指针指向一个对象,防拷贝。
因为智能指针通常用来管理动态内存,多线程同时操作智能指针可能导致数据混乱的问题。在多线程环境下,若使用shared_ptr,可以考虑在每一个线程中都使用std::atomic对引用计数进行原子操作
其它常见的锁
悲观锁:在每一次读取数据时,总是悲观的认为其它线程同时会来读取,因此在每一次读取数据时都会提前加锁
乐观锁:每一次读取数据时,总是乐观的认为不会有其它线程来读取数据,因此不加锁,在进行数据更新时检查是否有其它线程修改了数据,如果没有,则进行更新,否则进行回滚或重试。乐观锁一般使用版本号机制或CAS操作实现,适用于读多写少的数据结构
CAS操作:CAS的全称是Compare and Swap,即比较并交换,CAS算法是一种并发算法,用于实现线程间同步,其基本思想是:比较内存中的值和期望值,若二者相等,则使用新值替换内存中的值,否则不做任何操作,比较和替换的过程是原子的,可保证多个线程同时进行CAS操作时的安全性
在C++11中,CAS算法的实现就被称为原子操作,对应的类就是std::atomic
自旋锁:Spinlock,自旋锁是一种基于忙等待的锁机制,主要特点是当获取锁的线程发现锁被占用时,不会阻塞,而是不停地自旋等待锁的释放,直到获取锁位置。自旋锁的优点是自旋操作是轻量级的,避免了线程切换是资源的开销;但是如果锁被占用的时间过长,自旋锁会浪费大量的CPU资源,影响系统的性能。自旋锁适用于临界区很短的情况,例如内核的中断处理
公平锁:fair lock,公平锁是一种保证多个线程在等待锁时能够按照一定的顺序获取锁的锁机制。公平锁通过队列来实现,根据队列先进先出的规则处理请求,公平锁能够保证多个线程在等待锁时能够按照一定的顺序获取锁。但是由于需要维护等待队列,一般公平锁开销较大,性能相对较低
非公平锁:unfair lock,非公平锁是一种不保证多个线程在等待锁时按照一定的顺序获取锁的机制。非公平锁的实现一般使用CAS操作直接获取锁,不需要进入等待队列,非公平锁在短时间内能够提高锁的性能,但由于不保证公平性,某些线程可能一直无法获取锁,造成饥饿问题
读者写者问题
读写锁
在多线程编程中,存在这样的场景:对于公共数据的修改次数少,读取次数多。这种场景就适合使用读写锁而不是互斥锁,读写锁遵从的原则是:读共享、写独占、写锁优先级高。读写锁允许多个线程并发读取数据,而对于数据的写入,只允许单个线程串行写入。
读写锁与生产者消费者模型的区别在于:使用读写锁时,从缓冲区读取数据并没有对缓冲区做修改。而在生产者消费者模型中,消费者从缓冲区中拿任务对缓冲区做了修改。
读写锁的接口
int pthread_rwlock_init(pthread_rwlock_t*restrict rwlock,const pthread_rwlockattr_t* restrict attr);//读写锁没有静态分配
int pthread_rwlock_destroy(pthread_rwlock_t* rwlock);
int pthread_rwlock_rdlock(pthread_rwlock_t* rwlock);
int pthread_rwlock_tryrdlock(pthread_rwlock_t* rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t* rwlock);
int pthread_rwlock_trywrlock(pthread_rwlock_t* rwlock);
简单demo演示读写锁的使用:
static int x = 0;
pthread_rwlock_t rwlock;
void ReadRoutine() {
int cnt = 1000;
while (cnt--) {
pthread_rwlock_rdlock(&rwlock);
cout << x << endl;
pthread_rwlock_unlock(&rwlock);
}
}
void WriteRoutine() {
int cnt = 10;
while (cnt--) {
pthread_rwlock_wrlock(&rwlock);
x = rand() % 100;
pthread_rwlock_unlock(&rwlock);
}
}
int main() {
srand((unsigned int)time(nullptr));
pthread_rwlock_init(&rwlock, nullptr);
pthread_rwlock_destroy(&rwlock);
thread rd[10];
thread wr[2];
for (size_t i = 0; i < 10; i++) {
rd[i] = thread(ReadRoutine);
}
for (size_t i = 0; i < 2; i++) {
wr[i] = thread(WriteRoutine);
}
for (size_t i = 0; i < 10; i++) {
rd[i].join();
}
for (size_t i = 0; i < 2; i++) {
wr[i].join();
}
return 0;
}