关于线程池以及生产者消费者模型在前面写C的线程池中已经作了详细说明,感兴趣的可以去看一下,这里我们就来写C++版本的线程池。因为这里就是把C版本的线程池单纯改成C++的写法,所以我们所有的定义干脆就写在头文件里:
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <iostream>
#include <pthread.h>
#include <queue>
using namespace std;
typedef void (*EnterFP)(void*); //线程真正业务逻辑函数格式
template <typename T>
class ThreadPool
{
int thread_cnt; //线程数量
int store_cap; //仓库容量,因为STL中的queue是链式结构,不存在满的情况,所以人为定义容量,限制线程数量
pthread_t* tids; //线程ID
queue<T> store; //队列仓库
EnterFP enter; //业务逻辑函数
pthread_mutex_t* hlock; //队头互斥锁
pthread_mutex_t* tlock; //队尾互斥锁
pthread_cond_t* empty; //空仓条件变量
pthread_cond_t* full; //满仓条件变量
public:
ThreadPool(EnterFP enter,int thread_cnt=10,int store_cap=10)
:thread_cnt(thread_cnt),store_cap(store_cap),enter(enter)
{
tids = new pthread_t[thread_cnt];
hlock = new pthread_mutex_t;
pthread_mutex_init(hlock,NULL);
tlock = new pthread_mutex_t;
pthread_mutex_init(tlock,NULL);
empty = new pthread_cond_t;
pthread_cond_init(empty,NULL);
full = new pthread_cond_t;
pthread_cond_init(full,NULL);
}
~ThreadPool(void)
{
for(int i=0;i<thread_cnt;i++)
{
pthread_cancel(tids[i]);
cout << "线程:" << tids[i] << "已杀死" << endl;
}
delete[] tids;
pthread_cond_destroy(full);
delete full;
pthread_cond_destroy(empty);
delete empty;
pthread_mutex_destroy(tlock);
delete tlock;
pthread_mutex_destroy(hlock);
delete hlock;
}
void start_threadpool(void)
{
for(int i=0;i<thread_cnt;i++)
{
pthread_create(tids+i,NULL,run,this);
cout << "线程:" << tids[i] << "创建成功" << endl;
}
}
//run不能是成员函数,因为成员函数有this指针,而pthread_create的run要求不能有this指针,
//因此,run必须设置为静态成员函数
//线程池中的线程的入口函数
static void* run(void* arg)
{
ThreadPool* threadpool = static_cast<ThreadPool*>(arg);
for(;;)
{
//线程就负责消费数据,能返回就意味着从仓库拿到了数据
T t = (T)threadpool->pop_threadpool();
//拿到数据,去运行线程要执行的业务逻辑函数
threadpool->enter(t);
}
}
//生产数据
void push_threadpool(T task)
{
//队尾加锁
pthread_mutex_lock(tlock);
//如果一直队满,不生产
while(store_cap <= store.size())
{
//唤醒消费数据的线程
pthread_cond_signal(empty);
//睡眠并解锁队尾
pthread_cond_wait(full,tlock);
}
//生产数据存入队尾
store.push(task);
//唤醒一个消费数据的线程
pthread_cond_signal(empty);
//队尾解锁
pthread_mutex_unlock(tlock);
}
//消费数据
void* pop_threadpool(void)
{
//队头加锁
pthread_mutex_lock(hlock);
//如果一直队空,不消费
while(store.empty())
{
//唤醒生产数据的线程
pthread_cond_signal(full);
//睡眠并解锁队头
pthread_cond_wait(empty,hlock);
}
//消费数据
T task = store.front();
store.pop();
//唤醒一个生产数据的线程
pthread_cond_signal(full);
//队头解锁
pthread_mutex_unlock(hlock);
return task;
}
};
#endif//THREADPOOL_H
over