进程池和线程池1, 进程池1.1 第一版1.1.1 设计逻辑1.1.2 socketpair代码示例1.2.3 Makefile1.1.4 CODEheadMainpoolworkerlocalSockettcpInitepollclient1.2 第二版1.2.1 文件的传输1.2.2 粘包问题1.2.3 发送大文件问题:问题:1.2.4 进度条1.2.5 零拷贝mmapsendfile1.3 第三版1.3.1 一个问题1.3.2 有序退出回顾:
假设: 我们结合学过的文件操作、网络通信、以及进程和线程的知识, 实现一个基本的文件下载服务器模型, 我们需要做那些准备工作, 或者说我们怎么设计整个数据通信逻辑。
设计一个服务器中, 在大多数情况下, 都会有很多连接的频繁接入和断开, 如果在通信模型中, 我们让一个进程既处理连接接入,又处理业务逻辑, 这样设计在当下应用领域无疑是很低效的。(它既无法做到有效解耦, 增加代码书写的麻烦, 增加并行逻辑设计困难, 又无法有效利用多核性能, 伴随性能瓶颈)
xxxxxxxxxx21// 当然, 如果是单核心系统, 或者基于单个进程或者进程设计的基于事件驱动(基于select或者epoll管控文件对象, 监听事件, 分发业务逻辑)的服务器模型也是有其非常重要的独特之处的。(书写代码麻烦, 内存利用性能有一定优势, 在并发量比较低低的服务器中调度少性能高)2// 当然现在更好的设计是: 我们下面的池化+事件驱动的设计, 让主进程/线程只负责基于事件驱动分发任务, 池中进程/线程执行具体的业务代码。一个良好的架构需要满足许多侧面的要求,其中最基本的要求是可维护性和性能:
可维护性: 是指应用程序对开发者应该足够友好,开发和维护的程序员可以很快速的就能理解程序架构并进行后续开发。为了提供可维护性,项目各个部分的功能应当彼此分离。
性能: 是指应用程序应当充分利用操作系统资源,并且减少不要资源消耗。在多进程程序中, 一种非常浪费资源的操作就是创建和销毁进程。
所以我们无论从代码开发难易上还是性能提升上, 都有必要利用多进程或者多线程来实现业务逻辑和任务管控逻辑分离。
我么可以维护一个主进程只负责接收用户请求, 并把接收到的用户请求分配到不同进程上来处理。但是如果我们对任务进程不做任何限制和管理, 随着任务的到达开始一个进程处理任务, 任务处理结束之后结束这个进程, 这样处理是极其不好的, 因为进程的频繁创建和销毁是一份很大的软硬件开销。
而借用池化思想可以有效的避免这个问题(很多地方都有池化设计) , 我们维护一个进程的池子, 包含多个进程, 当有任务到来, 把任务交给空闲的进程执行, 当任务执行完毕, 并且没有任务可执行, 让进程休眠。这种池化的思想可以显著减少创建和销毁进程的软硬件开销,进而提高程序的执行效率。
主进程负责接收用户请求, 获得连接的文件描述符, 并且维护所有进程池中进程的状态, 以方便把文件描述符对象交给空闲的池中进程来和客户端直接交互。
进程池中进程, 在被主进程唤醒, 接收到对应的连接文件描述符对象, 按照需求读取磁盘文件, 并把读取的磁盘文件发送给客户端。
客户端建立连接, 接收返回的文件。
进程池和线程池
以上述服务模型为例, 我们把进程池换成线程池, 逻辑上也是相同的, 那么在我们设计的时候, 怎么确定使用进程池设计还是线程池设计。
进程池设计:
每个进程有独立的内存空间, 增加进程间的隔离性。一个进程崩溃不会影响到其它进程。
进程间存在隔离性, 这种解耦促使业务逻辑方便书写。
进程的创建和销毁比线程开销大, 占用的内存空间也比线程大。
上下文的调度切换时间长。
适合并发量低(IO请求数少), 业务复杂(CPU密集), 任务执行事件长的系统设计。
线程池设计:
线程之间共享资源, 隔离新差, 一个线程极容易影响到另一个线程(数据同步和一致性)。
但是隔离性差, 使得线程间通信比进程间通信要更方便。
线程较轻量,创建和销毁的开销较小。
适合并发量高(I/O密集型),内存使用要求高, 业务简单, 可以大量快速、轻量级任务处理的场景。
head.h
头文件的引入
函数定义
结构体和类型定义
main.c
调用pool初始化进程池; 并让pool启动线程池中线程worker
调用tcpInit初始化socket对IP&端口监听
调用epoll监听客户端连接进来, 当连接进来, 调用pool把连接交给进程池中进程worker处理, 修改进程状态为忙
调用epoll监听和进程池中线程, 等待进程池中线程处理完客户端请求, 通知main, 把其状态改为空闲
pool.c
根据main的要求, 启动对应数量的进程worker
把启动的进程及其状态记录到数组中, 以供main监控worker结束任务后, 向main发起的通信(由忙置为闲的通信)
当main有客户端连接过来, 通知pool, pool从空闲的进程中选取一个进程执行和客户端通信
worker.c
被pool初始化, 并启动
等待接收main获取的客户端连接
接收到main让pool通知过来的客户端连接, 拿到连接, 与指定的客户端通信, 通信完毕关闭客户端连接
通信结束, 向main发信息, 告诉其自己应该由忙状态更改为闲状态
等待main分发过来新的客户端
tcpInit.c
初始化ip和端口的监控
避免大量的tcp连接代码写在main中, 为了解耦而存在
epoll.c
epoll的操作相关
避免大量epoll添加文件描述符监听代码在main中, 为了解耦而存在
localSocket.c
当main监听到一个客户端连接, 需要把这个客户端连接对象交给工作的worker, 这涉及到进程间通信
这个进程间通信不是一个简单的问题, 因为这个进程间通信不是传递字符串,数字等简单的东西,而是要传递一个文件描述符, 而这个传送的文件描述符还要具有共享文件对象的能力
这个地方就要用到一个特殊的功能强大的本地通信socket, 用于main和worker间进行一个可以最终共享文件对象的文件描述符的传递
一般我们使用它的目的: 为了在两个进程之间, 传输一个文件对象的描述信息(可以让两个进程共享一个文件对象), 而不是单单只传输一个文件描述符数组的下标.
socketpair()函数用于创建一对互相连接的全双工通信socket。相比较普通的用于网络间不同主机通信的socket; socketpair函数创建的socket主要用于在同一台机器上的进程间通信(我们可以称其为本地socket)。 (man socketpair)
xxxxxxxxxx10123// create a pair of connected sockets4int socketpair(5int domain, // 指定socket使用的协议族, 本地通信我们使用: AF_LOCAL6int type, // 指定socket的类型: SOCK_STREAM(TCP), SOCK_DGRAM(UDP)7int protocol, // 指定协议, 默认设置0即可8int sv[2] // 用于返回两个连接的socket描述符(等价与socket的fd), 父子进程可以通过这个文件描述符进行通信(任何发送到sv[0]的数据都可以从sv[1]读取,反之任何发送到sv[1]的数据都可以从sv[0]读取。)9);10// 返回值: 成功返回0, 失败返回-1
全双工通信: (对于socketpair函数构建int sv[2]文件描述符数组)
xxxxxxxxxx21// 不同与可以进程间通信的匿名管道, 数据都必须是从pipe[1]写入管道,然后从pipe[0]读出来。2// socketpair的创建的通信端点, 可以让一个进程从fd[1]写,另一进程从fd[0]读; 也可以让一个进程从fd[0]写,另一进程从fd[1]读。
如果你想通过socketpair函数实现两个本地进程间的文件对象描述符的传输,除了需要socketpair函数创建通信的端点, 还需要借助sendmsg函数和recvmsg函数来实现具体的数据传输。 (man sendmsg) (man recvmsg)
xxxxxxxxxx9123// send a message on a socket4ssize_t sendmsg(5int sockfd, // socket文件描述符(对应上面socketpair中创建的文件描述符sv[]数组)6const struct msghdr *msg, // 要发送的详细信息7int flags // 发送行为, 默认08);9// 返回值: 成功返回一个ssize_t类型的值,表示发送的字节数。失败-1。xxxxxxxxxx9123// receive a message from a socket4ssize_t recvmsg(5int sockfd, // socket文件描述符(对应上面socketpair中创建的文件描述符sv[]数组)6struct msghdr *msg, // 要接收的详细信息7int flags // 接收行为, 默认08);9// 返回值: 成功返回一个ssize_t类型的值,表示接收的字节数。失败-1。xxxxxxxxxx91struct msghdr {2void *msg_name;//记录消息地址, 填充NULL交给系统处理3socklen_t msg_namelen;//地址长度, msg_name为NULL时系统自动填充4struct iovec *msg_iov;//iovec类型的数组, 每个iovec类型都是一份数据 (即:这个数组可以携带多份数据)(比较灵活)5size_t msg_iovlen;//上个参数iovec数组中的长度6void *msg_control;//(本质是一个cmsghdr类型指针) 用于包含控制信息(传输文件对象就要用到它)7size_t msg_controllen;//控制信息缓冲区的长度 (比较复杂)8int msg_flags;//消息的标志, 系统填充9};xxxxxxxxxx41struct iovec{2void *iov_base;//一个数据的指针3size_t iov_len;//数据的长度(字节)4}xxxxxxxxxx161struct cmsghdr {2socklen_t cmsg_len;//数据长度(字节): 头部信息(cmsg_len/cmsg_level/cmsg_type) + cmsg_data长度: 需要计算3int cmsg_level;//协议类型: (socket使用)SOL_SOCKET4int cmsg_type;//消息的类型: (传输文件描述符) SCM_RIGHTS5unsigned char cmsg_data[]//存放具体数据6};7ps1: cmsg_data数组的长度用于存放额外的数据,可以根据需求的变化而变化(比如现在我们使用它, 准备存放文件描述符)8ps2: 我们可以通过CMSG_LEN这个宏用于计算包括cmsg_data在内的完整cmsghdr结构的长度。而使用CMSG_LEN的时候, 其参数只需要传入cmsg_data的长度9// 比如: cmsg_data存储文件描述符(文件描述符int类型), 通过CMSG_LEN(sizeof(int))得到的长度就是整个cmsghdr结构体的长度(cmsg_len + cmsg_level + cmsg_type + cmsg_data)10ps3: CMSG_DATA这个宏用于获取指向cmsghdr结构中数据部分cmsg_data的指针11// 比如: 如果我们准备用cmsghdr传输文件描述符, 就要先通过CMSG_DATA获取指向cmsg_data部分起始位置的指针12// 文件描述符指针: *netfd;13// struct cmsghdr *pcms = (struct cmsghdr *) malloc( CMSG_LEN(sizeof(int)) )14// void *addr = CMSG_DATA(pcms);15// int * p_fd = (int *)addr;16// *p_fd = *netfd
xxxxxxxxxx73123int main(int argc,char*argv[])4{5int fd[2];6socketpair(AF_LOCAL, SOCK_STREAM, 0, fd);78if(fork() == 0){9int fd_txt = open("1.txt", O_WRONLY);10close(fd[0]);11int socket_fd = fd[1];1213struct msghdr hdr;14bzero(&hdr, sizeof(hdr));1516char *str = "hello";17struct iovec vec[1];18vec[0].iov_base = str;19vec[0].iov_len = strlen(str);2021hdr.msg_iov = vec;22hdr.msg_iovlen = 1;2324struct cmsghdr *pcms = (struct cmsghdr *)malloc(CMSG_LEN(sizeof(int)));25pcms->cmsg_len = CMSG_LEN(sizeof(int));26pcms->cmsg_level = SOL_SOCKET;27pcms->cmsg_type = SCM_RIGHTS;28void *addr = CMSG_DATA(pcms);29int *p_fd = (int *)addr;30*p_fd = fd_txt;3132hdr.msg_control = pcms;33hdr.msg_controllen = CMSG_LEN(sizeof(int));3435sendmsg(socket_fd,&hdr, 0);3637printf("son 1.txt fd: %d \n", fd_txt);38}else{39close(fd[1]);40int socket_fd = fd[0];4142struct msghdr hdr;43bzero(&hdr, sizeof(hdr));4445char buf[60] = {0};46struct iovec vec[1];47vec[0].iov_base = buf;48vec[0].iov_len = sizeof(buf);4950hdr.msg_iov = vec;51hdr.msg_iovlen = 1;5253struct cmsghdr *pcms = (struct cmsghdr *)malloc(CMSG_LEN(sizeof(int)));54pcms->cmsg_len = CMSG_LEN(sizeof(int));55pcms->cmsg_level = SOL_SOCKET;56pcms->cmsg_type = SCM_RIGHTS;5758hdr.msg_control = pcms;59hdr.msg_controllen = CMSG_LEN(sizeof(int));6061recvmsg(socket_fd,&hdr, 0);6263void *addr = CMSG_DATA(pcms);64int *fd = (int *)addr;6566wait(NULL);67printf("main 1.txt fd: %d \n", *fd);68printf("main str :%s \n", buf);6970write(*fd, "world", 5);71}72return 0;73}
在一个复杂项目中, 一般情况下我们需要通过多个.c文件, 相互协作, 共同编译出一个可执行文件运行, 那么就需要稍稍修改makefile的书写规则
xxxxxxxxxx191# makefile2# 定义一个srcs变量, 来代指: 使用wildcard函数获取当前目录下的所有.c文件3srcs:=$(wildcard *.c)4# 定义一个objs变量, 来代指: 使用patsubst函数讲srcs中所有的.c文件拓展名替换成.o文件5objs:=$(patsubst %.c,%.o, $(srcs))67# 编译 ' -c $^ '(源依赖项.c文件) 输出到 ' -o $@ '(目标.o)文件8%.o:%.c9gcc -c $^ -o $@ -g10# mian文件依赖于所有的objs文件11# 把所有依赖项($^) 指定输出到当前目标(-o $@)(即:mian)12main:$(objs)13gcc $^ -o $@ -lpthread1415# 清理objs文件 清理mian文件16clean:17$(RM) $(objs) main1819rebuild: clean main
xxxxxxxxxx5412// 定义:一个枚举类型, 分别代表进程`忙`和`空闲`3enum {4BUSY,5FREE6};7// 定义:用来追踪进程池中, 进程的信息和状态8typedef struct worker_status_s{9int pid; // 进程ID10int status; // 进程的状态: 忙 or 空闲11int socket_local_fd; // socket本地通信文件描述符: 用于main和子进程通信12} worker_status_t;1314// 根据指定数量初始化进程池15int initWorkerArr(16worker_status_t *p, // 记录进程池中进程状态的数组(传入传出参数)17int num // 进程池中:要创建的进程个数18);1920// 根据端口和IP构建服务端的网络监听21int initTcpSocket(22int * socketfd,// socket的文件描述符(传入传出参数)23char *ip, // ip地址24char *port // 端口25);26// 给epoll添加要监听的文件描述符27int epoll_addfd(28int epollfd, // epoll的文件描述符29int filefd // 要监听的文件描述符30);31// 把监听到的一个连接交给进程池中某个空闲进程32int toProcessPoolDealNet(33int netfd, // accept获取的连接对象的文件描述符id34worker_status_t *p, // 进程池连接数组35int num // 进程池中进程个数36);37// 启动一个工作进程38int startWorker(39int socket_local_fd // 用以和父进程通信的本地socket文件描述符40);41// 工作进程,从本地socket中读取main进程发过来的客户端连接对象42int read_net_fd(43int socket_local_fd, //用以和main进程和工作进程通信的本地socket文件描述符44int *netfd // 客户端连接对象的文件描述符指针45);46// main进程accept获得的客户端连接对象发给工作进程47int write_net_fd(48int socket_local_fd, // mian进程用来和工作进程通信的本地socket文件描述符49int *netfd // main进程accept的客户端连接对象的文件描述符50);51// 根据客户端连接文件对象和客户端通信52int netToClient(53int netfd // 客户端连接对象的文件描述符54);
xxxxxxxxxx5412int main()3{4// 初始化进程池5worker_status_t workerArr[3];6// 进程池中的进程个数: 一倍~二倍cpu (一倍: cpu密集, 二倍: IO密集, 可以实际调整)7initWorkerArr(workerArr, 3);89// 初始化网络监听10int socketfd;11initTcpSocket(&socketfd, "192.168.106.129", "8080");1213// 初始化epoll获得epoll文件描述符14int epollfd = epoll_create(1);15// 添加连接监听16epoll_addfd(epollfd, socketfd);1718// 子进程完成任务, 要通知进程池:把忙状态改为闲状态19// 所以要监听, 子进程向父进程通信的本地socket, 如果本地socket就绪, 说明状态要变为闲20for(int i=0; i<3; i++){21epoll_addfd(epollfd,workerArr[i].socket_local_fd);22}23// 循环获取网络连接24while(1){25// 就绪集合26struct epoll_event ready_set[1024];27// 监听就绪28int ready_num = epoll_wait(epollfd, ready_set, 1024, -1);2930// 遍历就绪集合31for(int i=0; i<ready_num; i++){32if(ready_set[i].data.fd == socketfd){33// socket监听读就绪, 有连接进来34int netfd = accept(socketfd, NULL, NULL);35// 把监听到的连接交给进程池36toProcessPoolDealNet(netfd, workerArr, 3);37// 主进程不需要再维护连接(已经交给子进程了)38// 关闭连接39close(netfd);40}else {41// 某个字进程完成任务,子进程通过本地socket发回信息42// 导致epoll监听对应本地socket就绪43for(int j=0; j<3; j++){44if(workerArr[j].socket_local_fd = ready_set[i].data.fd){45// workerArr[j]的子进程发来通信46// 由忙状态-》 闲状态47workerArr[j].status = FREE;48}49}50}51}52}53return 0;54}
xxxxxxxxxx52123// 根据指定数量初始化进程池4// 参数1: 追踪进程池中进程状态的数组(传入传出参数)5// 参数2: 进程池中进程个数6int initWorkerArr(worker_status_t *p, int num){78for(int i=0; i<num; i++){9// 使用socketpair初始化本地socket通信两端10// 创建一个用于和子进程通信, 可以传输文件对象的本地socket11int socket_fd[2];12socketpair(AF_LOCAL, SOCK_STREAM, 0, socket_fd);1314int pid = fork();15if(pid == 0){16// 子进程17// 关闭socket_fd[1], 保留socket_fd[0], 用于和主进程通信18close(socket_fd[1]);19// 启动当前子进程20startWorker(socket_fd[0]);21}else{22// 父进程: 保存子进程pid,子进程状态, 和子进程通信的本地socket文件描述符23p[i].pid = pid;24p[i].status = FREE;25p[i].socket_local_fd = socket_fd[1];26// 保留socket_fd[1], 关闭socket_fd[0]27// 也可以:关闭socket_fd[1], 保留socket_fd[0]28// 和子进程相对应的凑成一对即可29close(socket_fd[0]);30}31}32return 0;33}3435// 把监听到的一个连接交给进程池中某个空闲进程36// 参数1:accept获取的网络连接对象的文件描述符37// 参数2:进程池数组38// 参数3:进程池大小39int toProcessPoolDealNet( int netfd, worker_status_t *p, int num){40// 遍历进程池, 寻找空闲进程41for(int i=0; i<num; i++){42if(p[i].status == FREE){43// 把main进程accpet获得的客户端连接文件描述符对象44// 通过本地socket 发送给工作进程45write_net_fd(p[i].socket_local_fd, &netfd);4647// 把当前进程置为忙状态48p[i].status = BUSY;49break;50}51}52}
xxxxxxxxxx36123int startWorker(int socket_local_fd){4while(1){5// 读取主进程通过socketpair传过来的客户端连接对象的文件描述符6// 客户端连接的对象的(主进程accept的获取)7// 通过封装的read_net_fd()读取8int netfd;9read_net_fd(socket_local_fd, &netfd);10printf("worker netfd: %d \n", netfd);1112// 根据获得的客户端连接对象的文件描述符和客户端通信13netToClient(netfd);1415// 关闭和客户端连接16close(netfd);1718// 进程完成任务,通过本地socket,发信息送给主进程(随便发点什么),19// 表示连接通信完成,促使主进程修改忙状态位闲状态20pid_t pid = getpid();21printf("server pid : %d \n", pid);22send(socket_local_fd, &pid, sizeof(pid), 0);23}24return 0;25}2627// 通过客户端的连接的文件描述符和客户端通信28// 参数一: 和客户端连接的文件描述符29int netToClient(int netfd){3031char *str = "hello";32send(netfd, str, strlen(str), 0);33sleep(20);3435return 0;36}
xxxxxxxxxx73123// 工作线程用来读取main线程accept的客户端连接对象4// 参数一: 工进程程用来和main进程通信的特制本地socket5// 参数二: 用来存储从本地socket中读到的客户端连接对象的文件描述符6int read_net_fd(int socket_local_fd, int *netfd){78struct msghdr hdr;9bzero(&hdr, sizeof(hdr));1011struct iovec vec[1];12int flag = 1;13vec[0].iov_base = &flag;14vec[0].iov_len = sizeof(int);1516hdr.msg_iov = vec;17hdr.msg_iovlen = 1;1819struct cmsghdr *pcmsg =20(struct cmsghdr *)malloc(21CMSG_LEN(sizeof(int)));22pcmsg->cmsg_len = CMSG_LEN(sizeof(int));23pcmsg->cmsg_level = SOL_SOCKET;24pcmsg->cmsg_type = SCM_RIGHTS;2526hdr.msg_control = pcmsg;27hdr.msg_controllen = CMSG_LEN(sizeof(int));2829int ret = recvmsg(socket_local_fd, &hdr, 0);30ERROR_CHECK(ret, -1, "recvmsg");3132void *addr = CMSG_DATA(pcmsg);33int *p_fd = (int *)addr;34*netfd = *p_fd;3536return 0;37}3839// main进程accept获得的客户端连接对象发送给工作进程40// 参数一: main进程用来和工作进程通信的本地socket41// 参数二: main进程accept获得到的客户端连接文件对象的文件描述符42int write_net_fd(int socket_local_fd, int* netfd){4344struct msghdr hdr;45bzero(&hdr, sizeof(hdr));4647struct iovec vec[1];48int flag = 1;49vec[0].iov_base = &flag;50vec[0].iov_len = sizeof(int);5152hdr.msg_iov = vec;53hdr.msg_iovlen = 1;5455struct cmsghdr *pcmsg =56(struct cmsghdr *)malloc(57CMSG_LEN(sizeof(int)));58pcmsg->cmsg_len = CMSG_LEN(sizeof(int));59pcmsg->cmsg_level = SOL_SOCKET;60pcmsg->cmsg_type = SCM_RIGHTS;6162void *addr = CMSG_DATA(pcmsg);63int *p_int = (int *)addr;64*p_int = *netfd;6566hdr.msg_control = pcmsg;67hdr.msg_controllen = CMSG_LEN(sizeof(int));6869int ret = sendmsg(socket_local_fd, &hdr, 0);70ERROR_CHECK(ret, -1, "sendmsg");7172return 0;73}
xxxxxxxxxx2612// 根据端口和IP构建服务端的网络监听3// 参数1: socket的文件描述符(传入传出)4// 参数2: ip地址5// 参数3: 端口6int initTcpSocket(int * socketfd, char *ip, char *port){78// 创建socket文件对象9*socketfd = socket(AF_INET, SOCK_STREAM, 0);1011// 解除TIME_WAIT等待时:导致端口占用问题12int reuse = 1;13setsockopt(*socketfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));1415// 构建sockadd_in16struct sockaddr_in addr;17addr.sin_family = AF_INET;18addr.sin_addr.s_addr = inet_addr(ip);19addr.sin_port = htons(atoi(port));2021// 绑定端口22bind(*socketfd, (struct sockaddr *) &addr, sizeof(addr));23// 开始监听24listen(*socketfd, 10);25return 0;26}
xxxxxxxxxx13123// 添加epoll监听4// 参数1: epoll的文件描述符id5// 参数2: 要添加到epoll监控的文件的文件描述符6int epoll_addfd(int epollfd, int filefd){7struct epoll_event event;8event.data.fd = filefd;9event.events = EPOLLIN;10epoll_ctl(epollfd,EPOLL_CTL_ADD, filefd, &event);1112return 0;13}
xxxxxxxxxx24123int main(int argc,char*argv[])4{5char *ip = "192.168.106.129";6char *port = "8080";78int socketfd = socket(AF_INET, SOCK_STREAM, 0);910struct sockaddr_in addr;11addr.sin_family = AF_INET;12addr.sin_addr.s_addr = inet_addr(ip);13addr.sin_port = htons(atoi(port));1415connect(socketfd, (struct sockaddr *)&addr, sizeof(addr));1617char buf[60] = {0};18int res = recv(socketfd, buf, sizeof(buf), 0);19ERROR_CHECK(res, -1, "recv");20printf("buf: %s \n", buf);2122close(socketfd);23return 0;24}
在上面过程中, 我们解决了进程间共享文件对象的问题, 并且实现了服务器的主进程接收客户端连接请求, 并把客户端连接交给进程池中进程具体和客户端进行交互的功能。
假设在上一版本的基础上, 如果客户端的请求是想获得一份在服务端的文件, 我们该把前面向客户端发送简单的字符信息, 变为向客户端传输文件那? 客户端又怎么解决接收文件的问题?
客户端下载文件/服务器向客户端传输文件: 以一个小文件(eg:1000字节)为例
EgCode: 服务器 (修改worker: netToClient -> sendFile)
xxxxxxxxxx191// 向客户端发送文件2// 参数一: 客户端连接的文件对象3int sendFile(int netFd){4// 发送文件名5char *file_name = "file.txt";6send(netFd, file_name, strlen(file_name), 0);78// 打开文件9int file_fd = open(file_name, O_RDONLY);1011// 读取文件数据12char buf[1000] = {0};13ssize_t sret = read(file_fd, buf, sizeof(buf));1415// 发送给客户端16send(netFd, buf, sret, 0);1718return 0;19}EgCode: 客户端( client调用downloadFile )
xxxxxxxxxx181// client: 接收文件2//下载文件3int downloadFile(int netFd){4// 读取文件的名字5char buf_name[60] = {0};6recv(netFd, buf_name, sizeof(buf_name), 0);78// 创建文件9int file_fd = open(buf_name, O_RDWR|O_CREAT, 0666);1011// 读取文件内容12char buf[1000] = {0};13ssize_t res = recv(netFd,buf, sizeof(buf),0);14// 写入文件15write(file_fd, buf, res);1617return 0;18}
在上面示例中, 当我们先调用send向客户端发送文件名, 之后又调用send向客户端发送从文件读取的数据, 连续两次发送, 而我们发送的数据是没有设定的数据边界的(因为TCP的接收缓冲区将多个发送的数据序列视为连续的字节流), 所以当客户端读取的时候, 有可能一次recv读取了两次发送的内容. 并且把文件名+文件内容作为文件名创建一个文件. 这就是说为的粘包问题. (UDP没有粘包问题, 因为UDP发送数据, 并不会像TCP协议那样在TCP层/传输层对数据进行拆分和重组, UDP的拆分重组行为是IP层/网络层进行的, UDP层/传输层无感知, UDP层/传输层只会觉得每一个UDP报文段都是完整的.)
所以我们需要做的事情就是厘定数据传输的边界: 参考我们前面的管道传输文件的示例, 我们可以进行如下改造.
EgCode : worker
xxxxxxxxxx351// 发送信息, 指明长度2typedef struct train_s {3int len;4char buf[1000];5}train_t;67// 向客户端发送文件8// 参数一: 客户端连接的文件对象9int sendFile(int netFd){10// 文件名11char *file_name = "file.txt";12// 初始化文件名信息13train_t train;14bzero(&train, 0);15train.len = strlen(file_name);16memcpy(train.buf, file_name, train.len);1718// 发送文件名长度19send(netFd, &train.len, sizeof(int), 0);20// 根据指定长度发送文件名21send(netFd, train.buf, train.len, 0);2223// 打开文件24int file_fd = open(file_name, O_RDONLY);2526bzero(&train, 0);27// 读取文件数据28ssize_t sret = read(file_fd, train.buf, sizeof(train.buf));29train.len = sret;3031// 发送给客户端32send(netFd, train.buf, train.len, 0);3334return 0;35}EgCode: client
xxxxxxxxxx221//下载文件2int downloadFile(int netFd){3// 读取文件名字的长度4int file_name_len;5recv(netFd, &file_name_len, sizeof(int), 0);6printf("name_len: %d \n", file_name_len);78// 根据文件名字的长度:读取文件的名字9char buf_name[60] = {0};10recv(netFd, buf_name, file_name_len, 0);1112// 创建文件13int file_fd = open(buf_name, O_RDWR|O_CREAT, 0666);1415// 读取文件内容16char buf[1000] = {0};17ssize_t res = recv(netFd,buf, sizeof(buf),0);18// 写入文件19write(file_fd, buf, res);2021return 0;22}
假设我们发送一个大文件: 代码改造如下
EgCode: worker
xxxxxxxxxx421typedef struct train_s {2int len;3char buf[1000];4}train_t;56// 向客户端发送文件7// 参数一: 客户端连接的文件对象8int sendFile(int netFd){9// 文件名10char *file_name = "file.txt";11// 初始化文件名信息12train_t train;13bzero(&train, 0);14train.len = strlen(file_name);15memcpy(train.buf, file_name, train.len);1617// 发送文件名长度18send(netFd, &train.len, sizeof(int), 0);19// 根据指定长度发送文件名20send(netFd, train.buf, train.len, 0);2122// 打开文件23int file_fd = open(file_name, O_RDONLY);2425while(1){26bzero(&train, sizeof(train));27// 读取文件数据28ssize_t sret = read(file_fd, train.buf, sizeof(train.buf));29train.len = sret;30if(sret == 0){31// 文件读完32break;33}34// 发送给客户端 -----------------------> 出问题, 半包问题 (会发送半包)35int net_res = send(netFd, &train, train.len+sizeof(train.len), 0);36if(net_res == -1){37// 发送失败, 对端断开38break;39}40}41return 0;42}EgCode: client
xxxxxxxxxx331//下载文件2int downloadFile(int netFd){3// 读取文件名字的长度4int file_name_len;5recv(netFd, &file_name_len, sizeof(int), 0);6printf("name_len: %d \n", file_name_len);78// 根据文件名字的长度:读取文件的名字9char buf_name[60] = {0};10recv(netFd, buf_name, file_name_len, 0);1112// 创建文件13int file_fd = open(buf_name, O_RDWR|O_CREAT, 0666);1415// 循环读取16while(1){17// 读取长度18int len = 0;19recv(netFd, &len, sizeof(int), 0);20// len值出错会: -----------------------> 出问题, 半包问题21if(len == 0){22// 对方发送完毕23break;24}25// 读取文件内容26char buf[1000] = {0};27ssize_t res = recv(netFd,buf, len,0);28// 写入文件29write(file_fd, buf, res);30}31return 0;32}33
在上面发送的过程中, 我们会发现, 如果一个文件比较大, 偶尔会在客户端接收文件的时候, 产生接收错误. 这是因为, 我们的信息发送行为并不是由send函数控制(send函数本身只是把要发送的数据, 交给操作系统), 具体什么时候真正发送数据, 是由操作系统决定的. 操作系统, 有可能在发送数据的时候, 某个train只发送了一半, 然后被客户端读取, 在一段时之后, 操作系统发送了另一半train给客户端, 客户端先读取数据长度的时候, 出现错误, 导致最终数据读取错误. 这种不可控的行为我们称之为半包问题.
所以在recv函数中, 提供了关于接收行为的标志位中提供了MSG_WAITALL字段, 用于控制recv读取指定的len长度的数据才返回. 进而解决半包问题.
xxxxxxxxxx9123ssize_t recv(4int sockfd,5void *buf,6size_t len,7int flags // 定接收行为的标志位:MSG_WAITALL(等待所有请求的数据才返回)...大多数情况下,flags设置为0。8);9// 返回值: 成功时返回实际读取的字节数。如果连接已经关闭返回0(对方close: 四次挥手)。读取失败返回-1
EgCode: client
xxxxxxxxxx251//下载文件2int downloadFile(int netFd){3int file_name_len;4recv(netFd, &file_name_len, sizeof(int), MSG_WAITALL);56char buf_name[60] = {0};7recv(netFd, buf_name, file_name_len, MSG_WAITALL);89int file_fd = open(buf_name, O_RDWR|O_CREAT, 0666);1011while(1){12int len = 0;13recv(netFd, &len, sizeof(int), MSG_WAITALL);14if(len == 0){15// 对方发送完毕16break;17}18// 读取文件内容19char buf[1000] = {0};20ssize_t res = recv(netFd,buf, len,MSG_WAITALL);21// 写入文件22write(file_fd, buf, res);23}24return 0;25}
在发送大文件的时候, 客户端有可能在发送的时候提前终止, 这会导致发送端/写端(send)因为抛出SIGPIPE导致进程终止.
EgCode: worker
xxxxxxxxxx391typedef struct train_s {2int len;3char buf[1000];4}train_t;56void fun(int num){7printf("sigpeipe \n");8}9int sendFile(int netFd){10// TODO: 注册信号11signal(SIGPIPE, fun);12char *file_name = "file.txt";13int file_fd = open(file_name, O_RDONLY);14train_t train;15bzero(&train, 0);1617// 发送文件长度和名字18train.len = strlen(file_name);19memcpy(train.buf, file_name, train.len);20send(netFd, &train,sizeof(int)+train.len, 0);2122while(1){23bzero(&train, 0);24ssize_t sret = read(file_fd, train.buf, sizeof(train.buf));25train.len = sret;26if(sret == 0){27// 文件读完28break;29}30int net_res = send(netFd, &train, train.len+sizeof(train.len), 0);31sleep(1); // TODO:睡1秒发一次32printf("net_res :%d \n", net_res);33if(net_res == -1){34// 发送失败, 对端断开35break;36}37}38return 0;39}改进
xxxxxxxxxx9123ssize_t send(4int sockfd,5const void *buf,6size_t len,7int flags// 额外选项:MSG_NOSIGNAL防止发送时由于连接断开而引发的SIGPIPE信号 ...大多数情况下,flags参数设置为0。8);9// 返回值: 成功返回实际发送的字节数。失败返回-1xxxxxxxxxx321typedef struct train_s {2int len;3char buf[1000];4}train_t;56int sendFile(int netFd){7char *file_name = "file.txt";8int file_fd = open(file_name, O_RDONLY);9train_t train;10bzero(&train, 0);1112// 发送文件长度和名字13train.len = strlen(file_name);14memcpy(train.buf, file_name, train.len);15send(netFd, &train,sizeof(int)+train.len, MSG_NOSIGNAL); // 设置:MSG_NOSIGNAL1617while(1){18bzero(&train, 0);19ssize_t sret = read(file_fd, train.buf, sizeof(train.buf));20train.len = sret;21if(sret == 0){22// 文件读完23break;24}25int net_res = send(netFd, &train, train.len+sizeof(train.len), MSG_NOSIGNAL);// 设置:MSG_NOSIGNAL26if(net_res == -1){27// 发送失败, 对端断开28break;29}30}31return 0;32}
如果我们想模仿日常下载文件的时候, 进度条显示的效果. 我们可以在文件传输之前, 先传输文件大小给客户端, 在客户端不断接收文件的时候, 根据已经接收的文件的大小/总文件的大小, 显示进度条.
我们需要用到fstat函数获得一个文件的状态信息 (man fstat)
xxxxxxxxxx91234//get file status5int fstat(6int fd, // 文件描述符7struct stat *statbuf // 存储文件状态的指针8);9// 返回值:成功返回0, 失败-1xxxxxxxxxx41struct stat{2off_t st_size; //文件的大小,以字节为单位3// .....很多参数(文件所属用户/组,文件块数, 文件修改时间.....)4}EgCode: worker
xxxxxxxxxx391typedef struct train_s {2int len;3char buf[1000];4}train_t;56int sendFile(int netFd){7char *file_name = "file.txt";8int file_fd = open(file_name, O_RDONLY);9train_t train;10bzero(&train, 0);1112// 获得文件信息13struct stat stat_file;14fstat(file_fd, &stat_file);15// 发送文件长度16send(netFd, &stat_file.st_size, sizeof(off_t), MSG_NOSIGNAL);1718// 发送文件长度和名字19bzero(&train, 0);20train.len = strlen(file_name);21memcpy(train.buf, file_name, train.len);22send(netFd, &train,sizeof(int)+train.len, MSG_NOSIGNAL);2324while(1){25bzero(&train, 0);26ssize_t sret = read(file_fd, train.buf, sizeof(train.buf));27train.len = sret;28if(sret == 0){29// 文件读完30break;31}32int net_res = send(netFd, &train, train.len+sizeof(train.len), MSG_NOSIGNAL);33if(net_res == -1){34// 发送失败, 对端断开35break;36}37}38return 0;39}EgCode: client
xxxxxxxxxx421int downloadFile(int netFd){23// 读取文件长度4off_t file_size = 0;5recv(netFd, &file_size, sizeof(off_t), MSG_WAITALL);6printf("file_size : %ld \n", file_size);78int file_name_len;9recv(netFd, &file_name_len, sizeof(int), MSG_WAITALL);10char buf_name[60] = {0};11recv(netFd, buf_name, file_name_len, MSG_WAITALL);12// 创建文件13int file_fd = open(buf_name, O_RDWR|O_CREAT, 0666);1415// 数据传输标记:记录接收数据量16off_t cursize = 0; // 当前接收17off_t last_update_size = 0; // 每更新一次百分比增加1819while(1){20int len = 0;21recv(netFd, &len, sizeof(int), MSG_WAITALL);22if(len == 0){23// 对方发送完毕24break;25}26char buf[1000] = {0};27ssize_t res = recv(netFd,buf, len,MSG_WAITALL);28write(file_fd, buf, res);2930// 记录当前接收的文件大小31cursize += len;32// 计算相比上一次打印, 增加的百分比33double num = (double)cursize*100/file_size - (double)last_update_size*100/file_size;34if(num > 1){35// 进度增加了百分之一36last_update_size = cursize;37// 打印进度条38printf("now: %.2f \n", (double)last_update_size*100/file_size);39}40}41return 0;42}
以上述代码为例, 在数据传输过程中, 我们的服务端需要先从读取文件到内核态, 然后把内核态数据拷贝到用户态, 再从用户态拷贝到内核态让系统发送数据, 如果我们能避免数据从内核态和用户态的来回拷贝, 当需要发送数据的时候, 直接从磁盘读取到的数据, 在内核态直接转给系统发送, 从逻辑上将显著提高数据传输效率. 这就是所谓的零拷贝问题.
mmap函数用于创建一个新的映射在进程的用户态空间中(分配虚拟的空间未作数据加载)。当我们真正需要使用和访问这个数据的时候, 假设这些数据被socket的send函数调用发送给客户端, 那么内核在执行send发送数据的行为的时候, 是把加载到用户态的文件数据拷贝到内核, 避免了像先read数据那样(先把数据从内核空间拷贝到用户空间), 然后再send的时候(再把用户空间数据拷贝到内核态空间)的两次拷贝, 也就是说这是一次数据拷贝和两次数据拷贝的问题
xxxxxxxxxx11123void *mmap(4void *addr, // 期望映射的起始地址。通常设置为NULL5size_t length, // 映射的长度。字节为单位6int prot, // 控制映射区权限: PROT_READ/页面可被读取,PROT_WRITE/页面可被写入...7int flags, // 映射选项: MAP_SHARED/对映射区域的修改会影响到底层文件,且对其他映射了同一文件的进程可见; MAP_PRIVATE/创建一个写时拷贝的私有映射。对映射区域的修改不会影响原文件,也不对其他映射了同一文件的进程可见8int fd, //被映射文件的文件描述符9off_t offset // 文件映射的起始点10);11// 成功:返回指向映射区域起始地址的指针。失败:返回MAP_FAILE
EgCode: worker
xxxxxxxxxx361typedef struct train_s {2int len;3char buf[1000];4}train_t;56int sendFile(int netFd){7char *file_name = "file.txt";8// 需要O_RDWR,避免mmap权限不足9int file_fd = open(file_name, O_RDWR);10train_t train;11bzero(&train, 0);1213// 获得文件信息14struct stat stat_file;15fstat(file_fd, &stat_file);16// 发送文件长度17send(netFd, &stat_file.st_size, sizeof(off_t), MSG_NOSIGNAL);1819// 发送文件长度和名字20bzero(&train, 0);21train.len = strlen(file_name);22memcpy(train.buf, file_name, train.len);23send(netFd, &train,sizeof(int)+train.len, MSG_NOSIGNAL);2425// mmap26char *p = (char *)mmap(NULL, stat_file.st_size, PROT_READ|PROT_WRITE, MAP_SHARED, file_fd, 0);27ERROR_CHECK(p, MAP_FAILED, "mmap");28send(netFd, p, stat_file.st_size, MSG_NOSIGNAL);2930printf("over \n");31// nummap32munmap(p, stat_file.st_size);3334close(file_fd);35return 0;36}EgCode: client: 版本一, 进度条接收
xxxxxxxxxx421int downloadFile(int netFd){23// 读取文件长度4off_t file_size = 0;5recv(netFd, &file_size, sizeof(off_t), MSG_WAITALL);6printf("file_size : %ld \n", file_size);78int file_name_len;9recv(netFd, &file_name_len, sizeof(int), MSG_WAITALL);10char buf_name[60] = {0};11recv(netFd, buf_name, file_name_len, MSG_WAITALL);1213int file_fd = open(buf_name, O_RDWR|O_CREAT, 0666);1415// 数据传输标记:记录接收数据量16off_t cursize = 0; // 当前接收17off_t last_update_size = 0; // 每更新一次百分比增加1819sleep(10);20while(1){21char buf[1000] = {0};22ssize_t res = recv(netFd,buf, sizeof(buf),MSG_WAITALL);23if(res == 0){24printf("file over \n");25break;26}27write(file_fd, buf, res);2829// 记录当前接收的文件大小30cursize += res;31// 计算相比上一次打印, 增加的百分比32double num = (double)cursize*100/file_size33- (double)last_update_size*100/file_size;34if(num > 1){35// 进度增加了百分之一36last_update_size = cursize;37// 打印进度条38printf("now: %.2f \n", (double)last_update_size*100/file_size);39}40}41return 0;42}EgCode: client: 版本二: mmap接收
xxxxxxxxxx251int downloadFile(int netFd){23// 读取文件长度4off_t file_size = 0;5recv(netFd, &file_size, sizeof(off_t), MSG_WAITALL);6printf("file_size : %ld \n", file_size);78int file_name_len;9recv(netFd, &file_name_len, sizeof(int), MSG_WAITALL);10char buf_name[60] = {0};11recv(netFd, buf_name, file_name_len, MSG_WAITALL);1213int file_fd = open(buf_name, O_RDWR|O_CREAT, 0666);14ftruncate(file_fd, file_size);// 必要 ----TODO1516char *p =(char *)mmap(NULL, file_size, PROT_READ|PROT_WRITE, MAP_SHARED, file_fd, 0);17ERROR_CHECK(p, MAP_FAILED, "mmap");1819recv(netFd, p, file_size, MSG_WAITALL);2021munmap(p, file_size);22close(file_fd);2324return 0;25}
对于sendfile函数(新版的), 它存在的本质意义, 是它能直接在内核空间内传输数据(当socket发送信息到网卡上时, 不再从socket的发送缓冲区发送给网卡, 而是让读取文件的缓冲区发送给网卡)(也就是说: 磁盘文件->内核文件缓冲区->网卡, 不再经过socket的发送缓冲区), 这样当我们需要send发送的数据的时候, 相比较mmap又少了一次拷贝.
EgCode: worker
xxxxxxxxxx311typedef struct train_s {2int len;3char buf[1000];4}train_t;56int sendFile(int netFd){7char *file_name = "file.txt";8// 需要O_RDWR,避免mmap权限不足9int file_fd = open(file_name, O_RDWR);10train_t train;11bzero(&train, 0);1213// 获得文件信息14struct stat stat_file;15fstat(file_fd, &stat_file);16// 发送文件长度17send(netFd, &stat_file.st_size, sizeof(off_t), MSG_NOSIGNAL);1819// 发送文件长度和名字20bzero(&train, 0);21train.len = strlen(file_name);22memcpy(train.buf, file_name, train.len);23send(netFd, &train,sizeof(int)+train.len, MSG_NOSIGNAL);2425// #include <sys/sendfile.h> : 头文件26sendfile(netFd, file_fd,NULL, stat_file.st_size);27printf("over \n");2829close(file_fd);30return 0;31}
当我们在shell窗口上按下ctrl+c: 这是给当前会话的前台进程组的所有进程, 发送信号
(一个会话: 包含一个前台进程组, 多个后台进程组 )
( 进程组: 一组进程: fork()->产生子进程 -> 和父进程在同一个进程组 )
假设我们通过kill -2 pid: 这种行为仅仅是发送信号给指定的pid进程
当我们关闭服务端进程的时候, 我们试图直接ctrl+c可以结束进程, 但仅仅以当前的代码逻辑, 这不是一个良好的退出方式.
EgCode: worker.c
xxxxxxxxxx311int startWorker(int socket_local_fd){2// 脱离当前进程组-> 不再是前台进程组(前台进程组只有一个)3printf("group id: %d \n", getpgrp());4setpgid(0, 0);5printf("group id: %d \n", getpgrp());6while(1){7// 读取主进程通过socketpair传过来的客户端连接对象的文件描述符8// 客户端连接的对象的(主进程accept的获取)9// 通过封装的read_net_fd()读取10int netfd;11read_net_fd(socket_local_fd, &netfd);12if(netfd == 0){13// 读取数据成功, 但是拿到的netfd为0;14// 要么对端发送了错误的0, 要么对端关闭数据发送socket15// 假设main进程关闭传输(比如ctrl+c关闭了前台进程组:现在只有父进程), 读到数据016// 如果对端关闭, 我们假设子进程还有任务没有结束,我们用睡眠代指做事情在17sleep(100);18}1920sendFile(netfd);21// 关闭和客户端连接22close(netfd);2324// 进程完成任务,通过本地socket,发信息送给主进程(随便发点什么),25// 表示连接通信完成,促使主进程修改忙状态位闲状态26pid_t pid = getpid();27printf("server pid : %d \n", pid);28send(socket_local_fd, &pid, sizeof(pid), 0);29}30return 0;31}我们会发现在上述逻辑中(可以通过ps -elf查看), 主进程main的关闭, 子进程已经存活
假设我们希望主进程main的退出, 也能导致子进程退出, 我们可以修改代码逻辑实现.
我们可以监听信号, 当信号触发, 让main进程向子进程发送关闭进程的信息, 子进程收到信息之后, 关闭子进程, 然后主进程等待子进程退出之后再退出.
EgCode: main.c
xxxxxxxxxx681int exit_pipe_fd[2];2void func(int num){3// 向管道中写入内容4write(exit_pipe_fd[1], "1", 1);5}6int main()7{8//初始化管道: 自读(select)自写(信号)管道9pipe(exit_pipe_fd);10// 注册信号11signal(2, func);121314worker_status_t workerArr[3];15initWorkerArr(workerArr, 3);1617int socketfd;18initTcpSocket(&socketfd, "192.168.106.129", "8080");1920int epollfd = epoll_create(1);21epoll_addfd(epollfd, socketfd);2223// 监听管道, 目的是发现是否有2号信号触发写管道24epoll_addfd(epollfd, exit_pipe_fd[0]);252627for(int i=0; i<3; i++){28epoll_addfd(epollfd,workerArr[i].socket_local_fd);29}30while(1){31struct epoll_event ready_set[1024];32int ready_num = epoll_wait(epollfd, ready_set, 1024, -1);3334// 遍历就绪集合35for(int i=0; i<ready_num; i++){36if(ready_set[i].data.fd == exit_pipe_fd[0]){37// 判断是否是因为信号出现, 触发写管道导致epoll就绪38//确定是 -> 向所有子进程发信息, 通知清理资源结束3940for(int i=0;i<3;i++){41// 用-1代指关闭子进程42int close_tag = -1;43write_net_fd(workerArr[i].socket_local_fd, &close_tag);44}4546// 等待所有子进程结束47for(int i=0; i<3; i++){48wait(NULL);49}50// 自己退出51printf("子进程皆以结束, 主进程退出 \n");52exit(0);53}else if(ready_set[i].data.fd == socketfd){54int netfd = accept(socketfd, NULL, NULL);55toProcessPoolDealNet(netfd, workerArr, 3);56close(netfd);57}else {58for(int i=0; i<3; i++){59if(workerArr[i].socket_local_fd = ready_set[i].data.fd){60workerArr[i].status = FREE;61}62}6364}65}66}67return 0;68}EgCode: localSocket.c
xxxxxxxxxx851// 工作线程用来读取main线程accept的客户端连接对象2int read_net_fd(int socket_local_fd, int *netfd){34struct msghdr hdr;5bzero(&hdr, sizeof(hdr));67struct iovec vec[1];8int flag = 1;9vec[0].iov_base = &flag;10vec[0].iov_len = sizeof(int);1112hdr.msg_iov = vec;13hdr.msg_iovlen = 1;1415struct cmsghdr *pcmsg =16(struct cmsghdr *)malloc(17CMSG_LEN(sizeof(int)));18pcmsg->cmsg_len = CMSG_LEN(sizeof(int));19pcmsg->cmsg_level = SOL_SOCKET;20pcmsg->cmsg_type = SCM_RIGHTS;2122hdr.msg_control = pcmsg;23hdr.msg_controllen = CMSG_LEN(sizeof(int));2425int ret = recvmsg(socket_local_fd, &hdr, 0);26ERROR_CHECK(ret, -1, "recvmsg");2728if(*(int *)hdr.msg_iov[0].iov_base == -1){29// 收到对方正文中的-130// 代表main想关闭子进程31// 让netfd = -1 (netfd本不可能产生-1)(产生了-1, 当调用read_net_fd的逻辑明白,意味着关闭子进程)32*netfd = -1;33return 0;34}3536void *addr = CMSG_DATA(pcmsg);37int *p_fd = (int *)addr;38*netfd = *p_fd;3940return 0;41}424344// main进程accept获得的客户端连接对象发送给工作进程45int write_net_fd(int socket_local_fd, int* netfd){4647// 首先要知道, 如果我们使用endmsg传送一个-1的文件描述符48// 这是不合法的, 这回导致sendmsg和recvmsg数显异常49struct msghdr hdr;50bzero(&hdr, sizeof(hdr));5152struct iovec vec[1];53int flag = 1;54// 利用正文vec55if(*netfd == -1){56// 在正文中设置-1, 让对端收到57flag = -1;58*netfd = 1;59}6061vec[0].iov_base = &flag;62vec[0].iov_len = sizeof(int);6364hdr.msg_iov = vec;65hdr.msg_iovlen = 1;6667struct cmsghdr *pcmsg =68(struct cmsghdr *)malloc(69CMSG_LEN(sizeof(int)));70pcmsg->cmsg_len = CMSG_LEN(sizeof(int));71pcmsg->cmsg_level = SOL_SOCKET;72pcmsg->cmsg_type = SCM_RIGHTS;7374void *addr = CMSG_DATA(pcmsg);75int *p_int = (int *)addr;76*p_int = *netfd;7778hdr.msg_control = pcmsg;79hdr.msg_controllen = CMSG_LEN(sizeof(int));8081int ret = sendmsg(socket_local_fd, &hdr, 0);82ERROR_CHECK(ret, -1, "sendmsg");8384return 0;85}EgCode: worker.c
xxxxxxxxxx341int startWorker(int socket_local_fd){2// 脱离当前进程组-> 不再是前台进程组(前台进程组只有一个)3printf("group id: %d \n", getpgrp());4setpgid(0, 0);5printf("group id: %d \n", getpgrp());67while(1){89int netfd;10// 读取main传过来的文件描述符11read_net_fd(socket_local_fd, &netfd);1213if(netfd == -1){14// 对方发送了要求清理资源, 退出进程的通知15printf("子进程 %d 清理资源退出进程 \n", getpid());16exit(0);17}18if(netfd == 0){19// 读取数据成功, 但是拿到的netfd为0;20// 要么对端发送了错误的0, 要么对端关闭数据发送socket21// 假设main进程关闭传输(比如ctrl+c关闭了前台进程组:现在只有父进程), 读到数据022// 如果对端关闭, 我们假设子进程还有任务没有结束,我们用睡眠代指做事情在23sleep(100);24}2526sendFile(netfd);2728// 关闭和客户端连接29close(netfd);30// 通知main->由忙变闲31send(socket_local_fd, &pid, sizeof(pid), 0);32}33return 0;34}
0, 在公司中工作的一般/标准顺序: 接到需求 -> 明确/讨论需求 -> 设计业务逻辑文档 -> 设计代码逻辑文档 (接口)-> 照着文档实现代码 -> 有bug改bug -> 测试(自测) -> 提交测试(测试人员测) -> 改bug
1, 进程的设计逻辑:
a. 主进程逻辑: 启动子进程, 等待客户端连接
一旦有客户端连接过来 -> accept -> 穿给闲状态的子进程 -> 进程共享文件对象(本地socket: socketpair)
b, 读取任务/客户端连接对象:
给客户端交互 (发文件) -> 大文件(粘包-> 设置边界/模拟一个协议/小火车, 半包/MSG_WAITALL)
读取数据的时候关闭了连接 -> 导致服务器抛出信号SIGPIPE -> 给服务器的send: MSG_NOSIGNAL
子进程向客户端发完数据 -> 通知main进程 -> 由忙状态变为闲状态
2, 优化:
a, 进度条: 为了写代码而优化
b, 复制数据的问题:
mmap: 优化传输, 优化了一次拷贝.
sendfile : 相比较传统read/send, 优化了两次拷贝
(有些地方也称sendfile叫零拷贝 )
c, 有序退出:
不想暴力退出 -> 坏处, 不太好
有序退出 -> 指的是通知进程可以退出, 让进程自己选择合适的事件退出 (给进程处理资源的机会)
捕捉信号-> 通过管道发给main进程-> 通过本地socket发给子进程 -> 子进程选择合适的时机退出 -> 主进程等到所有子进程退出之后, 自己也退出.