1、线程池原理
-
任务队列
使用std::queue
存储IOTask
结构体,包含文件描述符、缓冲区、偏移量、操作类型(读/写)和回调函数。 -
**线程池类 (
ThreadPool
)**- 管理工作线程和任务队列。
- 每个工作线程绑定一个独立的
AioHandler
实例,用于处理异步IO请求。
-
**异步IO类 (
AioHandler
)**- 封装
libaio
API,动态分配aiocb
结构体并管理其生命周期。 - 提交读/写请求并通过哈希表关联
aiocb
与回调函数。
- 封装
2、代码实现
-
**任务队列 (
IOTask
)**
包含文件描述符(fd
)、缓冲区(buffer
)、长度(len
)、偏移量(offset
)、回调函数(callback
)和操作类型(is_read
)。 -
AioHandler类
- 动态分配
aiocb
:每次提交任务时动态分配aiocb
,避免悬空指针。 - 回调关联:使用哈希表(
pending_requests
)将aiocb
指针与回调函数绑定,确保异步完成后正确调用。 - 事件处理:通过
aio_suspend
等待事件,aio_nextevent
获取完成事件并触发回调。
- 动态分配
-
**线程池类 (
ThreadPool
)**- 工作线程管理:每个工作线程绑定一个
AioHandler
实例,独立处理异步IO。 - 任务分发:通过条件变量同步任务队列,确保任务被均匀分配到各工作线程。
- 工作线程管理:每个工作线程绑定一个
-
内存管理
aiocb
结构体动态分配并在异步完成后释放,避免内存泄漏。- 回调函数通过
std::function
封装,支持灵活的回调逻辑。
#include <iostream>
#include <vector>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <cstring>
#include <cerrno>
#include <libaio.h>
#include <functional>
#include <unistd.h>
#include <fcntl.h>
using namespace std;
struct IOTask {
int fd;
void* buffer;
size_t len;
off_t offset;
function<void(ssize_t)> callback;
bool is_read;
};
class AioHandler {
public:
AioHandler() {
if (aio_init(&aio_ctx) != 0) {
throw runtime_error("aio_init failed");
}
}
~AioHandler() {
if (aio_destroy(aio_ctx) != 0) {
cerr << "aio_destroy failed: " << strerror(errno) << endl;
}
}
void async_read(const IOTask& task) {
struct aiocb* cb = new aiocb();
memset(cb, 0, sizeof(*cb));
cb->aio_fildes = task.fd;
cb->aio_buf = task.buffer;
cb->aio_nbytes = task.len;
cb->aio_offset = task.offset;
auto handle = make_pair(cb, task.callback);
pending_requests[cb] = handle;
if (aio_submit(aio_ctx, cb) != 0) {
cerr << "aio_submit read failed: " << strerror(errno) << endl;
pending_requests.erase(cb);
delete cb;
}
}
void async_write(const IOTask& task) {
struct aiocb* cb = new aiocb();
memset(cb, 0, sizeof(*cb));
cb->aio_fildes = task.fd;
cb->aio_buf = task.buffer;
cb->aio_nbytes = task.len;
cb->aio_offset = task.offset;
auto handle = make_pair(cb, task.callback);
pending_requests[cb] = handle;
if (aio_submit(aio_ctx, cb) != 0) {
cerr << "aio_submit write failed: " << strerror(errno) << endl;
pending_requests.erase(cb);
delete cb;
}
}
void handle_events() {
while (true) {
if (aio_suspend(aio_ctx, NULL, 0, NULL) != 0) {
cerr << "aio_suspend failed: " << strerror(errno) << endl;
break;
}
while (true) {
struct aiocb* cb = aio_nextevent(aio_ctx, NULL, NULL);
if (!cb) break;
auto it = pending_requests.find(cb);
if (it == pending_requests.end()) {
continue;
}
auto& [cb_ptr, callback] = it->second;
ssize_t res = aio_error(cb_ptr);
if (res == -1) {
cerr << "aio_error: " << strerror(errno) << endl;
} else {
callback(res);
}
pending_requests.erase(it);
delete cb_ptr;
}
}
}
private:
unordered_map<struct aiocb*, pair<struct aiocb*, function<void(ssize_t)>>> pending_requests;
aiocb* aio_ctx;
};
class ThreadPool {
public:
ThreadPool(size_t num_threads)
: stop(false), workers(num_threads), handlers(num_threads) {
for (size_t i = 0; i < num_threads; ++i) {
workers.emplace_back([this, i] {
while (!stop) {
IOTask task;
{
unique_lock<mutex> lock(queue_mutex);
condition.wait(lock, [this] { return !queue.empty() || stop; });
if (stop && queue.empty()) break;
task = queue.front();
queue.pop();
}
handlers[i].handle_events(); // 处理所有完成事件
if (task.is_read) {
handlers[i].async_read(task);
} else {
handlers[i].async_write(task);
}
}
});
}
}
~ThreadPool() {
{
unique_lock<mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for (auto& worker : workers) {
if (worker.joinable()) {
worker.join();
}
}
}
void submit(const IOTask& task) {
unique_lock<mutex> lock(queue_mutex);
queue.push(task);
condition.notify_one();
}
private:
vector<thread> workers;
vector<AioHandler> handlers;
mutex queue_mutex;
condition_variable condition;
bool stop;
};
int main() {
// 打开两个文件用于测试
int fd1 = open("file1.txt", O_RDONLY);
int fd2 = open("file2.txt", O_WRONLY);
if (fd1 == -1 || fd2 == -1) {
perror("open");
return 1;
}
ThreadPool pool(4); // 创建4个工作线程
const size_t buffer_size = 4096;
vector<char> buffer(buffer_size);
// 提交读任务
pool.submit({fd1, buffer.data(), buffer_size, 0,
[](ssize_t res) { cout << "Read " << res << " bytes from file1" << endl; }, true});
// 提交写任务
pool.submit({fd2, buffer.data(), buffer_size, 0,
[](ssize_t res) { cout << "Wrote " << res << " bytes to file2" << endl; }, false});
// 主线程等待一段时间后退出
this_thread::sleep_for(chrono::seconds(5));
close(fd1);
close(fd2);
return 0;
}