线程池1.1, 第一版1.1.1 设计逻辑1.1.2 CODEheadmianpooltcpInitepollworkertransFilequeue1.2 第二版1.2.1 线程池的退出: 异步拉起同步1.2.2.1 CODEmain1.2.2 线程退出: pthread_cancel1.2.2.1 CODEmain1.2.2.2 CODE: 改进方案worker1.2.3 更好的退出方式: 标记位退出1.2.3.1 CODEheadmainworker
在前面我们谈到过, 进程池设计和线程池设计的一些适用场景。
进程池设计:
每个进程有独立的内存空间, 增加进程间的隔离性。一个进程崩溃不会影响到其它进程。
进程间存在隔离性, 这种解耦促使业务逻辑方便书写。
进程的创建和销毁比线程开销大, 占用的内存空间也比线程大。
上下文的调度切换时间长。
适合并发量低(IO请求数少), 业务复杂(CPU密集), 任务执行事件长的系统设计。
线程池设计:
线程之间共享资源, 隔离新差, 一个线程极容易影响到另一个线程(数据同步和一致性)。
但是隔离性差, 使得线程间通信比进程间通信要更方便。
线程较轻量,创建和销毁的开销较小。
适合并发量高(I/O密集型),内存使用要求高, 业务简单, 可以大量快速、轻量级任务处理的场景。
那么当我们真正在设计线程池的时候又该怎么实现那? 其实线程池的设计本质逻辑上和进程池并无太多异常, 只不过我们在线程间通信的是时候, 要远比进程池要简单的多.
head.h
xxxxxxxxxx341234567// 定义线程池8typedef struct pool_s{9// 所有子线程id10pthread_t *threadIds;11// 子线程的数量12int threadNum;13// 任务队列14queue_t queue;15// 锁16pthread_mutex_t pool_lock;17// 条件遍历18pthread_cond_t cond;19}pool_t;2021// 根据指定数量创建线程池22int initPool(pool_t *pPool, int num);23// 定义线程的入口函数24void *threadMain(void *p);2526// 初始化连接27int initTcpSocket(int *socketfd, char *ip, char *port);2829// 添加epoll监听30int epoll_addfd(int epollfd, int filefd);3132// 传送文件33int sendFile(int net_fd);34
mian.c
xxxxxxxxxx34123int main(int argc,char*argv[])4{5pool_t pool;6initPool(&pool, 3);78int socket_fd;9initTcpSocket(&socket_fd, "192.168.106.129", "8080");1011int epoll_fd = epoll_create(1);12epoll_addfd(epoll_fd, socket_fd);1314while(1){1516struct epoll_event list[1024];17int epoll_num = epoll_wait(epoll_fd, list, 1024, -1);1819for(int i=0; i<epoll_num; i++){2021if(list[i].data.fd == socket_fd){22// 有连接进来23int net_fd = accept(socket_fd, NULL, NULL);2425pthread_mutex_lock( &pool.pool_lock);26enQueue(&pool.queue, net_fd);27pthread_cond_signal(&pool.cond);28pthread_mutex_unlock(&pool.pool_lock);29}30}31}3233return 0;34}
pool.c
xxxxxxxxxx21123int initPool(pool_t *pPool, int num){4// 给子线程id开辟空间5pPool->threadIds = (pthread_t *)calloc(num, sizeof(pthread_t));6// 创建线程7for(int i=0; i<num; i++){8pthread_create(&pPool->threadIds[i], NULL, threadMain, pPool);9}1011// 记录子线程个数12pPool->threadNum = num;13// 初始化任务队列14bzero(&pPool->queue, sizeof(queue_t));15// 初始化锁16pthread_mutex_init(&pPool->pool_lock, NULL);17// 初始化条件变量18pthread_cond_init(&pPool->cond, NULL);1920return 0;21}
tcpInit.c
xxxxxxxxxx17123int initTcpSocket(int * socketfd, char *ip, char *port){4*socketfd = socket(AF_INET, SOCK_STREAM, 0);56int reuse = 1;7setsockopt(*socketfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));89struct sockaddr_in addr;10addr.sin_family = AF_INET;11addr.sin_addr.s_addr = inet_addr(ip);12addr.sin_port = htons(atoi(port));1314bind(*socketfd, (struct sockaddr *) &addr, sizeof(addr));15listen(*socketfd, 10);16return 0;17}
epoll.c
xxxxxxxxxx10123int epoll_addfd(int epollfd, int filefd){4struct epoll_event event;5event.data.fd = filefd;6event.events = EPOLLIN;7epoll_ctl(epollfd,EPOLL_CTL_ADD, filefd, &event);89return 0;10}
worker.c
xxxxxxxxxx23123void *threadMain(void *p){4pool_t *pPool = (pool_t *)p;56while(1){78pthread_mutex_lock(&pPool->pool_lock );9int net_fd;1011while(pPool->queue.size <= 0){12pthread_cond_wait(&pPool->cond, &pPool->pool_lock);13}14net_fd = pPool->queue.head->net_fd;1516deQueue(&pPool->queue);17pthread_mutex_unlock(&pPool->pool_lock);1819sendFile(net_fd);20close(net_fd);21}22return NULL;23}
transFile.c
xxxxxxxxxx32123typedef struct train_s {4int len;5char buf[1000];6} train_t;78int sendFile(int netFd){9char *file_name = "file.txt";10// 需要O_RDWR,避免mmap权限不足11int file_fd = open(file_name, O_RDWR);12train_t train;13bzero(&train, 0);1415// 获得文件信息16struct stat stat_file;17fstat(file_fd, &stat_file);18// 发送文件长度19send(netFd, &stat_file.st_size, sizeof(off_t), MSG_NOSIGNAL);2021// 发送文件长度和名字22bzero(&train, 0);23train.len = strlen(file_name);24memcpy(train.buf, file_name, train.len);25send(netFd, &train,sizeof(int)+train.len, MSG_NOSIGNAL);2627sendfile(netFd, file_fd,NULL, stat_file.st_size);28printf("over \n");2930close(file_fd);31return 0;32}
queue.h
xxxxxxxxxx21123456// 定义队列结点7typedef struct node_s{8int net_fd;9struct node_s *pNext;10}node_t;11// 定义队列12typedef struct queue_s{13node_t *head;14node_t *end;15int size;16}queue_t;1718int enQueue(queue_t *pQueue, int net_fd);19int deQueue(queue_t *pQueue);2021queue.c
xxxxxxxxxx39123int enQueue(queue_t *pQueue, int net_fd){45// 构建新节点6node_t *pNew = (node_t *)calloc(1, sizeof(node_t));7pNew->net_fd = net_fd;89if(pQueue->size == 0){10// 队列为空11pQueue->head = pNew;12pQueue->end = pNew;13}else{14pQueue->end->pNext = pNew;15pQueue->end = pNew;16}1718pQueue->size++;1920return 0;21}2223int deQueue(queue_t *pQueue){24if(pQueue->size == 0){25return -1;26}2728node_t *p = pQueue->head ;29pQueue->head = p->pNext ;3031if(pQueue->size == 1){32pQueue->end = NULL;33}3435pQueue->size--;36free(p);3738return 0;39}
我们不适合在多线程中使用信号(不适合), 因为信号是基于进程设计的, 我们无法确定代码在实际运行过程中, 到底进程中的那个线程收到并且执行了这个信号的处理逻辑, 这可能会导致一些异常问题. 总的来说, 信号的设计和线程的设计从某种程度上是不具有良好的相互适配性.
为了解决上面的问题, 我们可以通过异步拉起同步的方式:
xxxxxxxxxx11// 既然多线程的程序, 不适合处理信号, 那可以让一个专门的进程(只有一个主线程)的进程, 获取信号, 当这个进程获取到信号之后, 再通过进程间通信的方式, 给最终想根据信号做某些操作的多线程进程发送信息(非信号信息).
main.c
xxxxxxxxxx65123int pipe_fd[2];4void func(int num){5// 信号触发: 写管道, 随便写点什么6write(pipe_fd[1], "1", 1);7return ;8}9int main(int argc,char*argv[])10{11pipe(pipe_fd);12if(fork() != 0){13// 父进程14// 设置对2号信号的监听15signal(2, func);16wait(NULL);1718close(pipe_fd[0]);19close(pipe_fd[1]);20exit(0);21}22// 子进程23close(pipe_fd[1]);2425pool_t pool;26initPool(&pool, 3);2728int socket_fd;29initTcpSocket(&socket_fd, "192.168.106.129", "8080");3031int epoll_fd = epoll_create(1);32epoll_addfd(epoll_fd, socket_fd);3334// 监听管道: 是否有信号35epoll_addfd(epoll_fd, pipe_fd[0]);3637while(1){3839struct epoll_event list[1024];40int epoll_num = epoll_wait(epoll_fd, list, 1024, -1);4142for(int i=0; i<epoll_num; i++){4344if(list[i].data.fd == socket_fd){45// 有连接进来46int net_fd = accept(socket_fd, NULL, NULL);4748pthread_mutex_lock( &pool.pool_lock);49enQueue(&pool.queue, net_fd);50pthread_cond_signal(&pool.cond);51pthread_mutex_unlock(&pool.pool_lock);52}5354// 判断是否有信号触发55if(list[i].data.fd == pipe_fd[0]){56printf("信号触发 \n");5758char buf[60] = {0};59read(pipe_fd[0], buf, sizeof(buf));60}61}62}6364return 0;65}
当主线程监听到父进程发过来的信号触发信息, 之后, 我们可以让主线从, 通过pthread_cancel函数,让子线程被动退出.
mian.c
xxxxxxxxxx65123int pipe_fd[2];4void func(int num){5write(pipe_fd[1], "1", 1);6return ;7}8int main(int argc,char*argv[])9{10pipe(pipe_fd);11if(fork() != 0){12signal(2, func);13wait(NULL);1415close(pipe_fd[0]);16close(pipe_fd[1]);17exit(0);18}19close(pipe_fd[1]);2021pool_t pool;22initPool(&pool, 3);2324int socket_fd;25initTcpSocket(&socket_fd, "192.168.106.129", "8080");2627int epoll_fd = epoll_create(1);28epoll_addfd(epoll_fd, socket_fd);29epoll_addfd(epoll_fd, pipe_fd[0]);30while(1){3132struct epoll_event list[1024];33int epoll_num = epoll_wait(epoll_fd, list, 1024, -1);3435for(int i=0; i<epoll_num; i++){3637if(list[i].data.fd == socket_fd){38int net_fd = accept(socket_fd, NULL, NULL);39pthread_mutex_lock( &pool.pool_lock);40enQueue(&pool.queue, net_fd);41pthread_cond_signal(&pool.cond);42pthread_mutex_unlock(&pool.pool_lock);43}4445// 判断是否有信号触发46if(list[i].data.fd == pipe_fd[0]){47printf("信号触发 \n");48char buf[60] = {0};49read(pipe_fd[0], buf, sizeof(buf));5051// 通过pthread_cancel, 取消所有子线程52for(int i=0; i<pool.threadNum; i++){53pthread_cancel(pool.threadIds[i]);54}5556for(int i=0; i<pool.threadNum; i++){57pthread_join(pool.threadIds[i], NULL);58}59exit(0);60}61}62}6364return 0;65}但是当我们真正在代码执行的时候, (ps -elLf)我们发现好像子线程并没有完全退出
xxxxxxxxxx21// 原因是因为, 无论pthread_cond_wait是被pthread_cond_signal唤醒或者广播唤醒, 或者被进程要求被动退出唤醒, 只要被唤醒第一件事情就是获取锁2// 又因为pthread_cond_wait是取消点函数, 这回导致进程带锁死亡, 导致别的进程醒来无法获取锁, 产生死锁
在上面的问题的基础上, 我们可以优化资源清理行为. 我们知道pthread_cancel函数导致线程退出, 会自动执行被退出线程的清理函数, 所以我们可以在加锁的时候, 增加清理函数, 防止死锁
worker.c
xxxxxxxxxx31123void cleanLock(void *arg){4pool_t *pPool = (pool_t *) arg;56pthread_mutex_unlock(&pPool->pool_lock);7}8void *threadMain(void *p){9pool_t *pPool = (pool_t *)p;1011while(1){1213int net_fd;14pthread_mutex_lock(&pPool->pool_lock );15// 调用清理函数16pthread_cleanup_push(cleanLock, pPool);1718while(pPool->queue.size <= 0){19pthread_cond_wait(&pPool->cond, &pPool->pool_lock);20}21net_fd = pPool->queue.head->net_fd;2223deQueue(&pPool->queue);24//pthread_mutex_unlock(&pPool->pool_lock);25pthread_cleanup_pop(1);2627sendFile(net_fd);28close(net_fd);29}30return NULL;31}
虽然我们上面想了一些办法, 可以使线程退出, 但是上面的退出方式本质上是一种被动退出的方式, 某种程度上这不是一个良好的退出方式, 因为子线程没有办法到底在那个代码运行节点退出, 这可能导致有些资源没有正常释放或者某些必要的逻辑进行了一半, 就使得线程退出.
所以在上述逻辑的基础上, 我们可以取消被动退出的方式, 转而使用一种, 以标志位标记的主动退出模式.
head.h: 增加退出标记位
xxxxxxxxxx361234567// 定义线程池8typedef struct pool_s{9// 所有子线程id10pthread_t *threadIds;11// 子线程的数量12int threadNum;13// 任务队列14queue_t queue;15// 锁16pthread_mutex_t pool_lock;17// 条件遍历18pthread_cond_t cond;19// 退出标记20int exitFlag;21}pool_t;2223// 根据指定数量创建线程池24int initPool(pool_t *pPool, int num);25// 定义线程的入口函数26void *threadMain(void *p);2728// 初始化连接29int initTcpSocket(int *socketfd, char *ip, char *port);3031// 添加epoll监听32int epoll_addfd(int epollfd, int filefd);3334// 传送文件35int sendFile(int net_fd);36
main.c
xxxxxxxxxx6612int pipe_fd[2];3void func(int num){4write(pipe_fd[1], "1", 1);5return ;6}7int main(int argc,char*argv[])8{9pipe(pipe_fd);10if(fork() != 0){11signal(2, func);12wait(NULL);1314close(pipe_fd[0]);15close(pipe_fd[1]);16exit(0);17}18setpgid(0, 0);19close(pipe_fd[1]);2021pool_t pool;22initPool(&pool, 3);2324int socket_fd;25initTcpSocket(&socket_fd, "192.168.106.129", "8080");2627int epoll_fd = epoll_create(1);28epoll_addfd(epoll_fd, socket_fd);29epoll_addfd(epoll_fd, pipe_fd[0]);30while(1){3132struct epoll_event list[1024];33int epoll_num = epoll_wait(epoll_fd, list, 1024, -1);3435for(int i=0; i<epoll_num; i++){3637if(list[i].data.fd == socket_fd){38int net_fd = accept(socket_fd, NULL, NULL);39pthread_mutex_lock( &pool.pool_lock);40enQueue(&pool.queue, net_fd);41pthread_cond_signal(&pool.cond);42pthread_mutex_unlock(&pool.pool_lock);43}4445// 判断是否有信号触发46if(list[i].data.fd == pipe_fd[0]){47printf("信号触发 \n");48char buf[60] = {0};49read(pipe_fd[0], buf, sizeof(buf));5051// 修改标志位52pthread_mutex_lock(&pool.pool_lock);53pool.exitFlag = 1;54pthread_cond_broadcast(&pool.cond);55pthread_mutex_unlock(&pool.pool_lock);5657for(int i=0; i<pool.threadNum; i++){58pthread_join(pool.threadIds[i], NULL);59}60exit(0);61}62}63}6465return 0;66}
worker.c
xxxxxxxxxx28123void *threadMain(void *p){4pool_t *pPool = (pool_t *)p;56while(1){78int net_fd;9pthread_mutex_lock(&pPool->pool_lock );1011while(pPool->queue.size <= 0 && pPool->exitFlag ==0 ){12pthread_cond_wait(&pPool->cond, &pPool->pool_lock);13}14if(pPool->exitFlag == 1){15printf("son thread exit \n");16pthread_mutex_unlock(&pPool->pool_lock);17pthread_exit(NULL);18}19net_fd = pPool->queue.head->net_fd;2021deQueue(&pPool->queue);22pthread_mutex_unlock(&pPool->pool_lock);2324sendFile(net_fd);25close(net_fd);26}27return NULL;28}