线程池1.1, 第一版1.1.1 设计逻辑1.1.2 CODEheadmianpooltcpInitepollworkertransFilequeue1.2 第二版1.2.1 线程池的退出: 异步拉起同步1.2.2.1 CODEmain1.2.2 线程退出: pthread_cancel1.2.2.1 CODEmain1.2.2.2 CODE: 改进方案worker1.2.3 更好的退出方式: 标记位退出1.2.3.1 CODEheadmainworker
在前面我们谈到过, 进程池设计和线程池设计的一些适用场景。
进程池设计:
每个进程有独立的内存空间, 增加进程间的隔离性。一个进程崩溃不会影响到其它进程。
进程间存在隔离性, 这种解耦促使业务逻辑方便书写。
进程的创建和销毁比线程开销大, 占用的内存空间也比线程大。
上下文的调度切换时间长。
适合并发量低(IO请求数少), 业务复杂(CPU密集), 任务执行事件长的系统设计。
线程池设计:
线程之间共享资源, 隔离新差, 一个线程极容易影响到另一个线程(数据同步和一致性)。
但是隔离性差, 使得线程间通信比进程间通信要更方便。
线程较轻量,创建和销毁的开销较小。
适合并发量高(I/O密集型),内存使用要求高, 业务简单, 可以大量快速、轻量级任务处理的场景。
那么当我们真正在设计线程池的时候又该怎么实现那? 其实线程池的设计本质逻辑上和进程池并无太多异常, 只不过我们在线程间通信的是时候, 要远比进程池要简单的多.
head.h
xxxxxxxxxx
34123
456
7// 定义线程池
8typedef struct pool_s{
9// 所有子线程id
10pthread_t *threadIds;
11// 子线程的数量
12int threadNum;
13// 任务队列
14queue_t queue;
15// 锁
16pthread_mutex_t pool_lock;
17// 条件遍历
18pthread_cond_t cond;
19}pool_t;
20
21// 根据指定数量创建线程池
22int initPool(pool_t *pPool, int num);
23// 定义线程的入口函数
24void *threadMain(void *p);
25
26// 初始化连接
27int initTcpSocket(int *socketfd, char *ip, char *port);
28
29// 添加epoll监听
30int epoll_addfd(int epollfd, int filefd);
31
32// 传送文件
33int sendFile(int net_fd);
34
mian.c
xxxxxxxxxx
3412
3int main(int argc,char*argv[])
4{
5pool_t pool;
6initPool(&pool, 3);
7
8int socket_fd;
9initTcpSocket(&socket_fd, "192.168.106.129", "8080");
10
11int epoll_fd = epoll_create(1);
12epoll_addfd(epoll_fd, socket_fd);
13
14while(1){
15
16struct epoll_event list[1024];
17int epoll_num = epoll_wait(epoll_fd, list, 1024, -1);
18
19for(int i=0; i<epoll_num; i++){
20
21if(list[i].data.fd == socket_fd){
22// 有连接进来
23int net_fd = accept(socket_fd, NULL, NULL);
24
25pthread_mutex_lock( &pool.pool_lock);
26enQueue(&pool.queue, net_fd);
27pthread_cond_signal(&pool.cond);
28pthread_mutex_unlock(&pool.pool_lock);
29}
30}
31}
32
33return 0;
34}
pool.c
xxxxxxxxxx
2112
3int initPool(pool_t *pPool, int num){
4// 给子线程id开辟空间
5pPool->threadIds = (pthread_t *)calloc(num, sizeof(pthread_t));
6// 创建线程
7for(int i=0; i<num; i++){
8pthread_create(&pPool->threadIds[i], NULL, threadMain, pPool);
9}
10
11// 记录子线程个数
12pPool->threadNum = num;
13// 初始化任务队列
14bzero(&pPool->queue, sizeof(queue_t));
15// 初始化锁
16pthread_mutex_init(&pPool->pool_lock, NULL);
17// 初始化条件变量
18pthread_cond_init(&pPool->cond, NULL);
19
20return 0;
21}
tcpInit.c
xxxxxxxxxx
1712
3int initTcpSocket(int * socketfd, char *ip, char *port){
4*socketfd = socket(AF_INET, SOCK_STREAM, 0);
5
6int reuse = 1;
7setsockopt(*socketfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
8
9struct sockaddr_in addr;
10addr.sin_family = AF_INET;
11addr.sin_addr.s_addr = inet_addr(ip);
12addr.sin_port = htons(atoi(port));
13
14bind(*socketfd, (struct sockaddr *) &addr, sizeof(addr));
15listen(*socketfd, 10);
16return 0;
17}
epoll.c
xxxxxxxxxx
1012
3int epoll_addfd(int epollfd, int filefd){
4struct epoll_event event;
5event.data.fd = filefd;
6event.events = EPOLLIN;
7epoll_ctl(epollfd,EPOLL_CTL_ADD, filefd, &event);
8
9return 0;
10}
worker.c
xxxxxxxxxx
2312
3void *threadMain(void *p){
4pool_t *pPool = (pool_t *)p;
5
6while(1){
7
8pthread_mutex_lock(&pPool->pool_lock );
9int net_fd;
10
11while(pPool->queue.size <= 0){
12pthread_cond_wait(&pPool->cond, &pPool->pool_lock);
13}
14net_fd = pPool->queue.head->net_fd;
15
16deQueue(&pPool->queue);
17pthread_mutex_unlock(&pPool->pool_lock);
18
19sendFile(net_fd);
20close(net_fd);
21}
22return NULL;
23}
transFile.c
xxxxxxxxxx
3212
3typedef struct train_s {
4int len;
5char buf[1000];
6} train_t;
7
8int sendFile(int netFd){
9char *file_name = "file.txt";
10// 需要O_RDWR,避免mmap权限不足
11int file_fd = open(file_name, O_RDWR);
12train_t train;
13bzero(&train, 0);
14
15// 获得文件信息
16struct stat stat_file;
17fstat(file_fd, &stat_file);
18// 发送文件长度
19send(netFd, &stat_file.st_size, sizeof(off_t), MSG_NOSIGNAL);
20
21// 发送文件长度和名字
22bzero(&train, 0);
23train.len = strlen(file_name);
24memcpy(train.buf, file_name, train.len);
25send(netFd, &train,sizeof(int)+train.len, MSG_NOSIGNAL);
26
27sendfile(netFd, file_fd,NULL, stat_file.st_size);
28printf("over \n");
29
30close(file_fd);
31return 0;
32}
queue.h
xxxxxxxxxx
21123
45
6// 定义队列结点
7typedef struct node_s{
8int net_fd;
9struct node_s *pNext;
10}node_t;
11// 定义队列
12typedef struct queue_s{
13node_t *head;
14node_t *end;
15int size;
16}queue_t;
17
18int enQueue(queue_t *pQueue, int net_fd);
19int deQueue(queue_t *pQueue);
20
21queue.c
xxxxxxxxxx
3912
3int enQueue(queue_t *pQueue, int net_fd){
4
5// 构建新节点
6node_t *pNew = (node_t *)calloc(1, sizeof(node_t));
7pNew->net_fd = net_fd;
8
9if(pQueue->size == 0){
10// 队列为空
11pQueue->head = pNew;
12pQueue->end = pNew;
13}else{
14pQueue->end->pNext = pNew;
15pQueue->end = pNew;
16}
17
18pQueue->size++;
19
20return 0;
21}
22
23int deQueue(queue_t *pQueue){
24if(pQueue->size == 0){
25return -1;
26}
27
28node_t *p = pQueue->head ;
29pQueue->head = p->pNext ;
30
31if(pQueue->size == 1){
32pQueue->end = NULL;
33}
34
35pQueue->size--;
36free(p);
37
38return 0;
39}
我们不适合在多线程中使用信号(不适合), 因为信号是基于进程设计的, 我们无法确定代码在实际运行过程中, 到底进程中的那个线程收到并且执行了这个信号的处理逻辑, 这可能会导致一些异常问题. 总的来说, 信号的设计和线程的设计从某种程度上是不具有良好的相互适配性.
为了解决上面的问题, 我们可以通过异步拉起同步的方式:
xxxxxxxxxx
11// 既然多线程的程序, 不适合处理信号, 那可以让一个专门的进程(只有一个主线程)的进程, 获取信号, 当这个进程获取到信号之后, 再通过进程间通信的方式, 给最终想根据信号做某些操作的多线程进程发送信息(非信号信息).
main.c
xxxxxxxxxx
6512
3int pipe_fd[2];
4void func(int num){
5// 信号触发: 写管道, 随便写点什么
6write(pipe_fd[1], "1", 1);
7return ;
8}
9int main(int argc,char*argv[])
10{
11pipe(pipe_fd);
12if(fork() != 0){
13// 父进程
14// 设置对2号信号的监听
15signal(2, func);
16wait(NULL);
17
18close(pipe_fd[0]);
19close(pipe_fd[1]);
20exit(0);
21}
22// 子进程
23close(pipe_fd[1]);
24
25pool_t pool;
26initPool(&pool, 3);
27
28int socket_fd;
29initTcpSocket(&socket_fd, "192.168.106.129", "8080");
30
31int epoll_fd = epoll_create(1);
32epoll_addfd(epoll_fd, socket_fd);
33
34// 监听管道: 是否有信号
35epoll_addfd(epoll_fd, pipe_fd[0]);
36
37while(1){
38
39struct epoll_event list[1024];
40int epoll_num = epoll_wait(epoll_fd, list, 1024, -1);
41
42for(int i=0; i<epoll_num; i++){
43
44if(list[i].data.fd == socket_fd){
45// 有连接进来
46int net_fd = accept(socket_fd, NULL, NULL);
47
48pthread_mutex_lock( &pool.pool_lock);
49enQueue(&pool.queue, net_fd);
50pthread_cond_signal(&pool.cond);
51pthread_mutex_unlock(&pool.pool_lock);
52}
53
54// 判断是否有信号触发
55if(list[i].data.fd == pipe_fd[0]){
56printf("信号触发 \n");
57
58char buf[60] = {0};
59read(pipe_fd[0], buf, sizeof(buf));
60}
61}
62}
63
64return 0;
65}
当主线程监听到父进程发过来的信号触发信息, 之后, 我们可以让主线从, 通过pthread_cancel函数,让子线程被动退出.
mian.c
xxxxxxxxxx
6512
3int pipe_fd[2];
4void func(int num){
5write(pipe_fd[1], "1", 1);
6return ;
7}
8int main(int argc,char*argv[])
9{
10pipe(pipe_fd);
11if(fork() != 0){
12signal(2, func);
13wait(NULL);
14
15close(pipe_fd[0]);
16close(pipe_fd[1]);
17exit(0);
18}
19close(pipe_fd[1]);
20
21pool_t pool;
22initPool(&pool, 3);
23
24int socket_fd;
25initTcpSocket(&socket_fd, "192.168.106.129", "8080");
26
27int epoll_fd = epoll_create(1);
28epoll_addfd(epoll_fd, socket_fd);
29epoll_addfd(epoll_fd, pipe_fd[0]);
30while(1){
31
32struct epoll_event list[1024];
33int epoll_num = epoll_wait(epoll_fd, list, 1024, -1);
34
35for(int i=0; i<epoll_num; i++){
36
37if(list[i].data.fd == socket_fd){
38int net_fd = accept(socket_fd, NULL, NULL);
39pthread_mutex_lock( &pool.pool_lock);
40enQueue(&pool.queue, net_fd);
41pthread_cond_signal(&pool.cond);
42pthread_mutex_unlock(&pool.pool_lock);
43}
44
45// 判断是否有信号触发
46if(list[i].data.fd == pipe_fd[0]){
47printf("信号触发 \n");
48char buf[60] = {0};
49read(pipe_fd[0], buf, sizeof(buf));
50
51// 通过pthread_cancel, 取消所有子线程
52for(int i=0; i<pool.threadNum; i++){
53pthread_cancel(pool.threadIds[i]);
54}
55
56for(int i=0; i<pool.threadNum; i++){
57pthread_join(pool.threadIds[i], NULL);
58}
59exit(0);
60}
61}
62}
63
64return 0;
65}
但是当我们真正在代码执行的时候, (ps -elLf)我们发现好像子线程并没有完全退出
xxxxxxxxxx
21// 原因是因为, 无论pthread_cond_wait是被pthread_cond_signal唤醒或者广播唤醒, 或者被进程要求被动退出唤醒, 只要被唤醒第一件事情就是获取锁
2// 又因为pthread_cond_wait是取消点函数, 这回导致进程带锁死亡, 导致别的进程醒来无法获取锁, 产生死锁
在上面的问题的基础上, 我们可以优化资源清理行为. 我们知道pthread_cancel函数导致线程退出, 会自动执行被退出线程的清理函数, 所以我们可以在加锁的时候, 增加清理函数, 防止死锁
worker.c
xxxxxxxxxx
3112
3void cleanLock(void *arg){
4pool_t *pPool = (pool_t *) arg;
5
6pthread_mutex_unlock(&pPool->pool_lock);
7}
8void *threadMain(void *p){
9pool_t *pPool = (pool_t *)p;
10
11while(1){
12
13int net_fd;
14pthread_mutex_lock(&pPool->pool_lock );
15// 调用清理函数
16pthread_cleanup_push(cleanLock, pPool);
17
18while(pPool->queue.size <= 0){
19pthread_cond_wait(&pPool->cond, &pPool->pool_lock);
20}
21net_fd = pPool->queue.head->net_fd;
22
23deQueue(&pPool->queue);
24//pthread_mutex_unlock(&pPool->pool_lock);
25pthread_cleanup_pop(1);
26
27sendFile(net_fd);
28close(net_fd);
29}
30return NULL;
31}
虽然我们上面想了一些办法, 可以使线程退出, 但是上面的退出方式本质上是一种被动退出的方式, 某种程度上这不是一个良好的退出方式, 因为子线程没有办法到底在那个代码运行节点退出, 这可能导致有些资源没有正常释放或者某些必要的逻辑进行了一半, 就使得线程退出.
所以在上述逻辑的基础上, 我们可以取消被动退出的方式, 转而使用一种, 以标志位标记的主动退出模式.
head.h: 增加退出标记位
xxxxxxxxxx
36123
456
7// 定义线程池
8typedef struct pool_s{
9// 所有子线程id
10pthread_t *threadIds;
11// 子线程的数量
12int threadNum;
13// 任务队列
14queue_t queue;
15// 锁
16pthread_mutex_t pool_lock;
17// 条件遍历
18pthread_cond_t cond;
19// 退出标记
20int exitFlag;
21}pool_t;
22
23// 根据指定数量创建线程池
24int initPool(pool_t *pPool, int num);
25// 定义线程的入口函数
26void *threadMain(void *p);
27
28// 初始化连接
29int initTcpSocket(int *socketfd, char *ip, char *port);
30
31// 添加epoll监听
32int epoll_addfd(int epollfd, int filefd);
33
34// 传送文件
35int sendFile(int net_fd);
36
main.c
xxxxxxxxxx
661
2int pipe_fd[2];
3void func(int num){
4write(pipe_fd[1], "1", 1);
5return ;
6}
7int main(int argc,char*argv[])
8{
9pipe(pipe_fd);
10if(fork() != 0){
11signal(2, func);
12wait(NULL);
13
14close(pipe_fd[0]);
15close(pipe_fd[1]);
16exit(0);
17}
18setpgid(0, 0);
19close(pipe_fd[1]);
20
21pool_t pool;
22initPool(&pool, 3);
23
24int socket_fd;
25initTcpSocket(&socket_fd, "192.168.106.129", "8080");
26
27int epoll_fd = epoll_create(1);
28epoll_addfd(epoll_fd, socket_fd);
29epoll_addfd(epoll_fd, pipe_fd[0]);
30while(1){
31
32struct epoll_event list[1024];
33int epoll_num = epoll_wait(epoll_fd, list, 1024, -1);
34
35for(int i=0; i<epoll_num; i++){
36
37if(list[i].data.fd == socket_fd){
38int net_fd = accept(socket_fd, NULL, NULL);
39pthread_mutex_lock( &pool.pool_lock);
40enQueue(&pool.queue, net_fd);
41pthread_cond_signal(&pool.cond);
42pthread_mutex_unlock(&pool.pool_lock);
43}
44
45// 判断是否有信号触发
46if(list[i].data.fd == pipe_fd[0]){
47printf("信号触发 \n");
48char buf[60] = {0};
49read(pipe_fd[0], buf, sizeof(buf));
50
51// 修改标志位
52pthread_mutex_lock(&pool.pool_lock);
53pool.exitFlag = 1;
54pthread_cond_broadcast(&pool.cond);
55pthread_mutex_unlock(&pool.pool_lock);
56
57for(int i=0; i<pool.threadNum; i++){
58pthread_join(pool.threadIds[i], NULL);
59}
60exit(0);
61}
62}
63}
64
65return 0;
66}
worker.c
xxxxxxxxxx
2812
3void *threadMain(void *p){
4pool_t *pPool = (pool_t *)p;
5
6while(1){
7
8int net_fd;
9pthread_mutex_lock(&pPool->pool_lock );
10
11while(pPool->queue.size <= 0 && pPool->exitFlag ==0 ){
12pthread_cond_wait(&pPool->cond, &pPool->pool_lock);
13}
14if(pPool->exitFlag == 1){
15printf("son thread exit \n");
16pthread_mutex_unlock(&pPool->pool_lock);
17pthread_exit(NULL);
18}
19net_fd = pPool->queue.head->net_fd;
20
21deQueue(&pPool->queue);
22pthread_mutex_unlock(&pPool->pool_lock);
23
24sendFile(net_fd);
25close(net_fd);
26}
27return NULL;
28}