线程怎么与IO复用联系起来, IO复用中创建线程? 还是线程中IO复用? 这个问题用在进程也是一样的. 其实两种方式都可以. 本节采用在 IO复用中创建线程, 接下来就来看看具体怎么实现的吧.
epoll 的EPOLLONESHOT事件
还记得 epoll
的 event 可设置的状态吗? 忘了也不急, 这里将 IO复用之epoll函数 的状态粘贴过来.
event值 | 描述 |
---|---|
EPOLLIN | 监听是描述符是否可读 |
EPOLLOUT | 监听是描述符是否可写 |
EPOLLERR | 发生错误 |
EPOLLHUP | 对端挂断, 或其中一端关闭了 |
EPOLLET[1] | 设置为边沿触发模式 |
EPOLLONESHOT | 设置关联文件描述符的一次性行为 |
在epoll中直接创建函数 (如下面伪代码), 如果缓冲区的数据没有一次性读完 (这种情况肯定会出现) 特别是LE模式[[2]]下又会立即触发读事件然后再次创建新的线程. 这样就会导致同一时间段有两个线程在处理同一个TCP连接.
if(event[i].event & EPOLLIN){
pthreaed_create(fun);
}
具体的解决办法就是将文件描述符注册 EPOLLONESHOT
事件就能保证该描述符只能被触发一次, 如果描述符还需要需要被触发, 则在处理后重新注册. 如下 :
fun(int eventfd, int fd){
/* 处理过程 */
...
// 重新注册
epoll_event event;
event.fd = fd;
event.event = EPOLLIN | EPOLLONESHOT;
epoll_ctl(eventfd, EPOLL_CTL_ADD, fd, &event);
}
实验
依旧是线程完成回射部分, 与上节不一样在于 : 文件描述符是非阻塞的, 我们通过sleep(1)
模仿处理过程, 如果处理完后没有数据了或者连接断开, 则线程就直接退出; 如果是因为超时, 则将文件描述符重新注册到监听事件中.
部分代码如下 :
// 回射线程
void *workecho(void *arg){
char buf[BUFSIZE];
int n;
int sockfd, epollfd;
struct eventfds *fds;
fds = (struct eventfds *)arg;
sockfd = fds->sockfd;
epollfd = fds->epollfd;
while(1){
n = recv(sockfd, buf, sizeof(buf), MSG_WAITALL);
if(n == 0){
close(sockfd);
printf("client close\n");
epoll_ctl(epollfd, EPOLL_CTL_DEL, sockfd, NULL);
break;
}
else if(n < 0){
// 如果一秒内没有数据, 则重新注册事件并退出线程
if(errno == EAGAIN){
fprintf(stderr, "read timeout\n");
reset_event(epollfd, sockfd);
break;
}
}
else{
write(sockfd, buf, n);
sleep(1); // 睡眠一秒, 代表数据处理过程
}
}
printf("exit\n");
pthread_exit((void *)0);
}
主函数采用 epoll
监听文件描述符, 并将连接的描述符置为非阻塞, 状态设置为EVENTONESHOT, 保证一个文件描述只能有一个线程进行处理.
部分代码 :
int main(int argc, char *argv[]){
...
listen(servfd, 5);
epollfd = epoll_create(1);
setevent(epollfd, servfd, 0); // servfd 监听描述符不能设置为一次性执行
int n;
while(1){
n = epoll_wait(epollfd, evs, sizeof(evs), -1);
for(int i = 0; i < n; ++i){
if(evs[i].data.fd == servfd){
clifd = accept(servfd, (struct sockaddr *)&cliaddr, &len);
if(clifd < 0)
goto exit;
setevent(epollfd, clifd, 1);
}
else if(evs[i].events & EPOLLIN){
fds.sockfd = clifd;
fds.epollfd = epollfd;
// 传入注意 fds 并非是线程安全的
pthread_create(&tid, NULL, workecho, (void *)&fds);
}
}
}
close(servfd);
exit:
exit(0);
}
服务端 : service.c
gcc service.c -o service -pthread
./service 8080
客户端 : client.c
./client 127.0.0.1 8080
可以看出来一段时间没有数据则线程就会退出. 那么修改程序与前面的 线程回射[3] 又有什么优势呢?
- 只为就绪的TCP连接创建线程, 而不必为每个TCP连接创建一个线程; 这样就可以减少内存的消耗.
- 线程的性能与IO复用的性能基本一样, 所以可以使用线程处理连接来代替IO复用.
可能频繁创建线程会导致性能下降, 但是我们也可以采用线程池[4]来解决这个问题, 所以这里的线程函数可以称为工作线程.
小结
- 清楚 EPOLLONESHOT 事件