xxxxxxxxxx
351// head.h
2enum workerStatus {
3 FREE,
4 BUSY
5};
6typedef struct {
7 pid_t pid; // 工作进程的pid
8 int status; // 工作进程的状态
9} processData_t;
10// main.c
11int main(int argc, char *argv[]) {
12 //./main 192.168.135.132 5678 10
13 ARGS_CHECK(argc, 4);
14 int workerNum = atoi(argv[3]);
15 processData_t *workerList = (processData_t *)calloc(sizeof(processData_t), workerNum);
16 // workerList记录了所有工作进程的状态
17 makeChild(workerList, workerNum);
18 while (1);
19}
20// worker.c
21int makeChild(processData_t *pProcssData, int processNum) {
22 pid_t pid;
23 for (int i = 0; i < processNum; ++i) {
24 pid = fork();
25 if (pid == 0) {
26 handleEvent();
27 }
28 pProcssData[i].pid = pid;
29 pProcssData[i].status = FREE;
30 }
31 return 0;
32}
33void handleEvent() { // 工作进程目前的工作是死循环
34 while (1);
35}
xxxxxxxxxx
161int tcpInit(char *ip, char *port, int *pSockFd) {
2 *pSockFd = socket(AF_INET, SOCK_STREAM, 0);
3 struct sockaddr_in addr;
4 bzero(&addr, sizeof(struct sockaddr_in));
5 addr.sin_family = AF_INET;
6 addr.sin_addr.s_addr = inet_addr(ip);
7 addr.sin_port = htons(atoi(port));
8 int reuse = 1;
9 int ret;
10 ret = setsockopt(*pSockFd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
11 ERROR_CHECK(ret, -1, "setsockopt");
12 ret = bind(*pSockFd, (struct sockaddr *)&addr, sizeof(struct sockaddr_in));
13 ERROR_CHECK(ret, -1, "bind");
14 listen(*pSockFd, 10);
15 return 0;
16}
xxxxxxxxxx
11int socketpair(int domain, int type, int protocol, int sv[2]);
在这里domain
必须填写AF_LOCAL,type
可以选择流式数据还是消息数据,protocol
一般填0表示不需要任何额外的协议,sv
这个参数和pipe
的参数一样,是一个长度为2的整型数据,用来存储管道两端的文件描述符(值得注意的是,sv[0]
和sv[1]
没有任何的区别)。一般socketpair
之后会配合fork
函数一起使用,从而实现父子进程之间的通信。从数据传递使用上面来看,本地套接字和网络套接字是完全一致的,但是本地套接字的效率更高,因为它在拷贝数据的时候不需要处理协议相关内容。
父进程会监听特定某个IP:PORT
,如果有某个客户端连接之后,子进程需要能够连上accept
得到的已连接套接字的文件描述符,这样子进程才能和客户端进行通信。这种文件描述符的传递不是简单地传输一个整型数字就行了,而是需要让父子进程共享一个套接字文件对象。
但是这里会遇到麻烦,因为accept
调用是在fork
之后的,所以父子进程之间并不是天然地共享文件对象。倘若想要在父子进程之间共享acccept
调用返回的已连接套接字,需要采用一些特别的手段:一方面,父子进程之间需要使用本地套接字来通信数据。另一方面需要使用sendmsg
和recvmsg
函数来传递数据。
xxxxxxxxxx
181ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags);
2ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags);
3struct iovec { /* Scatter/gather array items */
4 void *iov_base; /* Starting address */
5 size_t iov_len; /* Number of bytes to transfer */
6};
7
8struct msghdr {
9 void *msg_name; /* optional address */
10 socklen_t msg_namelen; /* size of address */
11 struct iovec *msg_iov; /* scatter/gather array */
12 size_t msg_iovlen; /* # elements in msg_iov */
13 void *msg_control; /* ancillary data, see below */
14 size_t msg_controllen; /* ancillary data buffer len */
15 int msg_flags; /* flags on received message */
16};
17ssize_t readv(int fd, const struct iovec *iov, int iovcnt);
18ssize_t writev(int fd, const struct iovec *iov, int iovcnt);
使用sendmsg
和recvmsg
的时候附加一个消息头部,即一个struct msghdr
类型的结构体。
首先,需要将要传递的内容存储入msg_iov
当中,在这里需要注意的是,元素类型为struct iovec
的数组可以存储一组离散的消息,只需要将每个消息的起始地址和本消息的长度存入数组元素中即可。(使用writev
和readv
可以直接读写一组离散的消息)
接下来,需要将文件描述符的信息存入控制字段msg_control
中,这个我们需要存储一个地址值,该地址指向了一个struct cmsghdr
类型的控制信息。如果存在多个控制信息,会构成一个控制信息序列,规范要求使用者绝不能直接操作控制信息序列,而是需要用一系列的cmsg
宏来间接操作。CMSG_FIRSTHDR
用来获取序列中的第一个控制信息(CMSG_NXTHDR
获取下一个),CMSG_DATA
宏用来设置控制信息的具体数据的地址;CMSG_LEN
宏用来设置具体数据占据内存空间的大小。
xxxxxxxxxx
151// man cmsg
2struct cmsghdr {
3 size_t cmsg_len; /* Data byte count, including header
4 (type is socklen_t in POSIX) */
5 int cmsg_level; /* Originating protocol */
6 int cmsg_type; /* Protocol-specific type */
7 /* followed by
8 unsigned char cmsg_data[]; */
9};
10struct cmsghdr *CMSG_FIRSTHDR(struct msghdr *msgh);
11struct cmsghdr *CMSG_NXTHDR(struct msghdr *msgh, struct cmsghdr *cmsg);
12size_t CMSG_ALIGN(size_t length);
13size_t CMSG_SPACE(size_t length);
14size_t CMSG_LEN(size_t length);
15unsigned char *CMSG_DATA(struct cmsghdr *cmsg);
为了传递文件描述符,需要将结构体中的cmsg_level
字段设置为SOL_SOCKET
,而 cmsg_type
字段需要设置为SCM_RIGHTS
,再将数据部分设置为文件描述符。这样,该文件描述符所指的文件对象就可以传递到另一个进程了。
xxxxxxxxxx
421int sendFd(int pipeFd, int fdToSend) {
2 struct msghdr hdr;
3 bzero(&hdr, sizeof(struct msghdr)); // 这一步绝对不能少
4 struct iovec iov[1];
5 char buf[] = "Hello";
6 iov[0].iov_base = buf;
7 iov[0].iov_len = 5;
8 hdr.msg_iov = iov;
9 hdr.msg_iovlen = 1;
10 struct cmsghdr *pcmsghdr = (struct cmsghdr *)calloc(1, CMSG_LEN(sizeof(int)));
11 pcmsghdr->cmsg_len = CMSG_LEN(sizeof(int));
12 // 控制信息的数据部分只有int类型的文件描述符
13 pcmsghdr->cmsg_level = SOL_SOCKET;
14 pcmsghdr->cmsg_type = SCM_RIGHTS; // SCM->socket-level control message
15 // 表示在socket层传递的是访问权力,这样接受进程就可以访问对应文件对象了
16 *(int *)CMSG_DATA(pcmsghdr) = fdToSend;
17 // 数据部分是文件描述符
18 hdr.msg_control = pcmsghdr;
19 hdr.msg_controllen = CMSG_LEN(sizeof(int));
20 int ret = sendmsg(pipeFd, &hdr, 0);
21 ERROR_CHECK(ret, -1, "sendmsg");
22}
23int recvFd(int pipeFd, int *pFd) {
24 struct msghdr hdr;
25 bzero(&hdr, sizeof(struct msghdr));
26 struct iovec iov[1];
27 char buf[6] = {0}; // 除了数据内容以外,其他和sendmsg是一致的
28 iov[0].iov_base = buf;
29 iov[0].iov_len = 5; // 这里一定不能填0
30 hdr.msg_iov = iov;
31 hdr.msg_iovlen = 1;
32 struct cmsghdr *pcmsghdr = (struct cmsghdr *)calloc(1, CMSG_LEN(sizeof(int)));
33 pcmsghdr->cmsg_len = CMSG_LEN(sizeof(int));
34 pcmsghdr->cmsg_level = SOL_SOCKET;
35 pcmsghdr->cmsg_type = SCM_RIGHTS; // SCM->socket-level control message
36 hdr.msg_control = pcmsghdr;
37 hdr.msg_controllen = CMSG_LEN(sizeof(int));
38 int ret = recvmsg(pipeFd, &hdr, 0);
39 ERROR_CHECK(ret, -1, "recvmsg");
40 *pFd = *(int *)CMSG_DATA(pcmsghdr);
41 return 0;
42}
要特别注意的是,传递的文件描述符在数值上完全可能是不相等的,但是它们对应的文件对象确实是同一个,自然文件读写偏移量也是共享的,和之前使用dup
或者是先打开文件再fork
的情况是一致的。
至此就可以实现一个进程池的服务端:
启动父进程
makeChild
:父进程在创建每个子进程时,先调用socketpair
handleEvent
:子进程被创建之后,执行进程工作函数
recvFd
:子进程等待一个文件描述符,在父进程未发送的时候,子进程处于阻塞状态
tcpInit
:父进程初始化一个网络socket
epollFunc
:父进程使用epoll
等IO多路复用机制监听网络socket和每个子进程的本地socket的一端。
如果有客户端通过网络连接父进程,那么父进程会accept
得到一个已连接socket。
sendFd
:选择一个空闲的子进程,将已连接socket发送给子进程,之后父进程就不再和客户端直接网络通信,而是由子进程和客户端通信。
当某个子进程完成了任务之后,子进程可以通过本地socket通知父进程,并且重新将自己设为空闲。
xxxxxxxxxx
1351// 客户端
2int main(int argc, char *argv[]) {
3 ARGS_CHECK(argc, 3);
4 int sockFd = socket(AF_INET, SOCK_STREAM, 0);
5 struct sockaddr_in addr;
6 bzero(&addr, sizeof(struct sockaddr_in));
7 addr.sin_family = AF_INET;
8 addr.sin_addr.s_addr = inet_addr(argv[1]);
9 addr.sin_port = htons(atoi(argv[2]));
10 int ret = connect(sockFd, (struct sockaddr *)&addr, sizeof(struct sockaddr_in));
11 ERROR_CHECK(ret, -1, "connect");
12 char buf[1024] = {0};
13 read(STDIN_FILENO, buf, sizeof(buf));
14 send(sockFd, buf, strlen(buf) - 1, 0);
15 bzero(buf, sizeof(buf));
16 recv(sockFd, buf, sizeof(buf), 0);
17 puts(buf);
18 close(sockFd);
19 return 0;
20}
21// 服务端主进程
22int main(int argc, char *argv[]) {
23 //./main 192.168.135.132 5678 10
24 ARGS_CHECK(argc, 4);
25 int workerNum = atoi(argv[3]);
26 processData_t *workerList = (processData_t *)calloc(sizeof(processData_t), workerNum);
27 makeChild(workerList, workerNum);
28 int sockFd;
29 tcpInit(argv[1], argv[2], &sockFd);
30 int epfd = epollCtor();
31 epollAdd(sockFd, epfd);
32 for (int i = 0; i < workerNum; ++i) {
33 epollAdd(workerList[i].pipeFd, epfd);
34 }
35 int listenSize = workerNum + 1; // socket+每个进程pipe的读端
36 struct epoll_event *readylist = (struct epoll_event *)calloc(listenSize, sizeof(struct epoll_event));
37 while (1) {
38 int readynum = epoll_wait(epfd, readylist, listenSize, -1);
39 for (int i = 0; i < readynum; ++i) {
40 if (readylist[i].data.fd == sockFd) {
41 puts("accept ready");
42 int netFd = accept(sockFd, NULL, NULL);
43 for (int j = 0; j < workerNum; ++j) {
44 if (workerList[j].status == FREE) {
45 printf("No. %d worker gets his job, pid = %d\n", j, workerList[j].pid);
46 sendFd(workerList[j].pipeFd, netFd);
47 workerList[j].status = BUSY;
48 break;
49 }
50 }
51 close(netFd); // 父进程交给子进程一定要关闭
52 } else {
53 puts("One worker finish his task!");
54 int j;
55 for (j = 0; j < workerNum; ++j) {
56 if (workerList[j].pipeFd == readylist[i].data.fd) {
57 pid_t pid;
58 int ret = recv(workerList[j].pipeFd, &pid, sizeof(pid_t), 0);
59 printf("No. %d worker finish, pid = %d\n", j, pid);
60 workerList[j].status = FREE;
61 break;
62 }
63 }
64 }
65 }
66 }
67}
68// 服务端子进程
69int makeChild(processData_t *pProcssData, int processNum) {
70 pid_t pid;
71 for (int i = 0; i < processNum; ++i) {
72 int pipeFd[2];
73 socketpair(AF_LOCAL, SOCK_STREAM, 0, pipeFd);
74 pid = fork();
75 if (pid == 0) {
76 close(pipeFd[0]);
77 handleEvent(pipeFd[1]);
78 }
79 close(pipeFd[1]);
80 printf("pid = %d, pipefd[0] = %d\n", pid, pipeFd[0]);
81 pProcssData[i].pid = pid;
82 pProcssData[i].status = FREE;
83 pProcssData[i].pipeFd = pipeFd[0];
84 }
85 return 0;
86}
87void handleEvent(int pipeFd) {
88 int netFd;
89 while (1) {
90 recvFd(pipeFd, &netFd);
91 char buf[1024] = {0};
92 recv(netFd, buf, sizeof(buf), 0);
93 puts(buf);
94 send(netFd, "Echo", 4, 0);
95 close(netFd);
96 pid_t pid = getpid();
97 send(pipeFd, &pid, sizeof(pid_t), 0);
98 }
99}
100// TCP初始化相关代码
101int tcpInit(char *ip, char *port, int *pSockFd) {
102 *pSockFd = socket(AF_INET, SOCK_STREAM, 0);
103 struct sockaddr_in addr;
104 bzero(&addr, sizeof(struct sockaddr_in));
105 addr.sin_family = AF_INET;
106 addr.sin_addr.s_addr = inet_addr(ip);
107 addr.sin_port = htons(atoi(port));
108 int reuse = 1;
109 int ret;
110 ret = setsockopt(*pSockFd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
111 ERROR_CHECK(ret, -1, "setsockopt");
112 ret = bind(*pSockFd, (struct sockaddr *)&addr, sizeof(struct sockaddr_in));
113 ERROR_CHECK(ret, -1, "bind");
114 listen(*pSockFd, 10);
115 return 0;
116}
117// epoll相关代码
118int epollCtor() {
119 int epfd = epoll_create(1);
120 ERROR_CHECK(epfd, -1, "epoll_create");
121 return epfd;
122}
123int epollAdd(int fd, int epfd) {
124 struct epoll_event event;
125 event.events = EPOLLIN;
126 event.data.fd = fd;
127 int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event);
128 ERROR_CHECK(ret, -1, "epoll_ctl add");
129 return 0;
130}
131int epollDel(int fd, int epfd) {
132 int ret = epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
133 ERROR_CHECK(ret, -1, "epoll_ctl del");
134 return 0;
135}
xxxxxxxxxx
141// 客户端
2//...
3send(sockFd, filename, strlen(filename), 0);
4ret = read(fd, buf, sizeof(buf));
5send(sockFd, buf, ret, 0);
6//...
7
8// 服务端
9//...
10recv(netFd, filename, sizeof(filename), 0);
11int fd = open(filename, O_RDONLY | O_CREAT, 0666);
12ret = recv(netFd, buf, sizeof(buf), 0);
13write(fd, buf, ret);
14//...
这种写法会引入一个非常严重的问题,服务端在接收文件名,实际上并不知道有多长,所以它会试图把网络缓冲区的所有内容都读取出来,但是send
底层基于的协议是TCP协议——这是一种流式协议。这样的情况下,服务端没办法区分到底是哪些部分是文件名而哪些部分是文件内容。完全可能会出现服务端把文件名和文件内容混杂在一起的情况,这种就是所谓的"粘包"问题。
所以我们要做的事情是在应用层上构建一个私有协议,这个协议的目的是规定TCP发送和接收的实际长度从而确定单个消息的边界。
xxxxxxxxxx
521// client.c
2typedef struct train_s {
3 int dataLength;
4 char buf[1000];
5} train_t;
6int recvFile(int netFd);
7int recvn(int netFd, void *pstart, int len);
8int main(int argc, char *argv[]) {
9 ARGS_CHECK(argc, 3);
10 int sockFd = socket(AF_INET, SOCK_STREAM, 0);
11 struct sockaddr_in addr;
12 bzero(&addr, sizeof(struct sockaddr_in));
13 addr.sin_family = AF_INET;
14 addr.sin_addr.s_addr = inet_addr(argv[1]);
15 addr.sin_port = htons(atoi(argv[2]));
16 int ret = connect(sockFd, (struct sockaddr *)&addr, sizeof(struct sockaddr_in));
17 ERROR_CHECK(ret, -1, "connect");
18 recvFile(sockFd);
19 close(sockFd);
20 return 0;
21}
22int recvFile(int netFd) {
23 train_t t;
24 bzero(&t, sizeof(t));
25 // 先接收文件名长度
26 recvn(netFd, &t.dataLength, sizeof(int));
27 // 再接收文件名
28 recvn(netFd, t.buf, t.dataLength);
29 // 接收方创建一个同名文件
30 int fd = open(t.buf, O_WRONLY | O_CREAT, 0666);
31 ERROR_CHECK(fd, -1, "open");
32 while (1) {
33 bzero(&t, sizeof(t));
34 recvn(netFd, &t.dataLength, sizeof(int));
35 if (0 == t.dataLength) {
36 break;
37 }
38 recvn(netFd, t.buf, t.dataLength);
39 write(fd, t.buf, t.dataLength);
40 }
41 close(fd);
42}
43int recvn(int netFd, void *pstart, int len) {
44 int total = 0;
45 int ret;
46 char *p = (char *)pstart;
47 while (total < len) {
48 ret = recv(netFd, p + total, len - total, 0);
49 total += ret; // 每次接收到的字节数加到total上
50 }
51 return 0;
52}
xxxxxxxxxx
1831// main.c
2int main(int argc, char *argv[]) {
3 //./server 192.168.135.132 5678 10
4 ARGS_CHECK(argc, 4);
5 int workerNum = atoi(argv[3]);
6 processData_t *workerList = (processData_t *)calloc(sizeof(processData_t), workerNum);
7 makeChild(workerList, workerNum);
8 int sockFd;
9 tcpInit(argv[1], argv[2], &sockFd);
10 int epfd = epollCtor();
11 epollAdd(sockFd, epfd);
12 for (int i = 0; i < workerNum; ++i) {
13 epollAdd(workerList[i].pipeFd, epfd);
14 }
15 int listenSize = workerNum + 1; // socket+每个进程pipe的读端
16 struct epoll_event *readylist = (struct epoll_event *)calloc(listenSize, sizeof(struct epoll_event));
17 while (1) {
18 int readynum = epoll_wait(epfd, readylist, listenSize, -1);
19 for (int i = 0; i < readynum; ++i) {
20 if (readylist[i].data.fd == sockFd) {
21 puts("accept ready");
22 int netFd = accept(sockFd, NULL, NULL);
23 for (int j = 0; j < workerNum; ++j) {
24 if (workerList[j].status == FREE) {
25 printf("No. %d worker gets his job, pid = %d\n", j, workerList[j].pid);
26 sendFd(workerList[j].pipeFd, netFd);
27 workerList[j].status = BUSY;
28 break;
29 }
30 }
31 close(netFd);
32 } else {
33 puts("One worker finish his task!");
34 int j;
35 for (j = 0; j < workerNum; ++j) {
36 if (workerList[j].pipeFd == readylist[i].data.fd) {
37 pid_t pid;
38 int ret = recv(workerList[j].pipeFd, &pid, sizeof(pid_t), 0);
39 printf("No. %d worker finish, pid = %d\n", j, pid);
40 workerList[j].status = FREE;
41 break;
42 }
43 }
44 }
45 }
46 }
47}
48// worker.c
49int makeChild(processData_t *pProcssData, int processNum) {
50 pid_t pid;
51 for (int i = 0; i < processNum; ++i) {
52 int pipeFd[2];
53 socketpair(AF_LOCAL, SOCK_STREAM, 0, pipeFd);
54 pid = fork();
55 if (pid == 0) {
56 close(pipeFd[0]);
57 handleEvent(pipeFd[1]);
58 }
59 close(pipeFd[1]);
60 printf("pid = %d, pipefd[0] = %d\n", pid, pipeFd[0]);
61 pProcssData[i].pid = pid;
62 pProcssData[i].status = FREE;
63 pProcssData[i].pipeFd = pipeFd[0];
64 }
65 return 0;
66}
67void handleEvent(int pipeFd) {
68 int netFd;
69 while (1) {
70 recvFd(pipeFd, &netFd);
71 transFile(netFd);
72 close(netFd);
73 pid_t pid = getpid();
74 send(pipeFd, &pid, sizeof(pid_t), 0);
75 }
76}
77// tcpInit.c
78int tcpInit(char *ip, char *port, int *pSockFd) {
79 *pSockFd = socket(AF_INET, SOCK_STREAM, 0);
80 struct sockaddr_in addr;
81 bzero(&addr, sizeof(struct sockaddr_in));
82 addr.sin_family = AF_INET;
83 addr.sin_addr.s_addr = inet_addr(ip);
84 addr.sin_port = htons(atoi(port));
85 int reuse = 1;
86 int ret;
87 ret = setsockopt(*pSockFd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
88 ERROR_CHECK(ret, -1, "setsockopt");
89 ret = bind(*pSockFd, (struct sockaddr *)&addr, sizeof(struct sockaddr_in));
90 ERROR_CHECK(ret, -1, "bind");
91 listen(*pSockFd, 10);
92 return 0;
93}
94// epollFunc.c
95int epollCtor() {
96 int epfd = epoll_create(1);
97 ERROR_CHECK(epfd, -1, "epoll_create");
98 return epfd;
99}
100int epollAdd(int fd, int epfd) {
101 struct epoll_event event;
102 event.events = EPOLLIN;
103 event.data.fd = fd;
104 int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event);
105 ERROR_CHECK(ret, -1, "epoll_ctl add");
106 return 0;
107}
108int epollDel(int fd, int epfd) {
109 int ret = epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
110 ERROR_CHECK(ret, -1, "epoll_ctl del");
111 return 0;
112}
113// sendFd.c
114int sendFd(int pipeFd, int fdToSend) {
115 struct msghdr hdr;
116 bzero(&hdr, sizeof(struct msghdr));
117 struct iovec iov[1];
118 char buf[] = "Hello";
119 iov[0].iov_base = buf;
120 iov[0].iov_len = 5;
121 hdr.msg_iov = iov;
122 hdr.msg_iovlen = 1;
123 struct cmsghdr *pcmsghdr = (struct cmsghdr *)calloc(1, sizeof(CMSG_LEN(sizeof(int))));
124 pcmsghdr->cmsg_len = CMSG_LEN(sizeof(int));
125 // 控制信息的数据部分只有int类型的文件描述符
126 pcmsghdr->cmsg_level = SOL_SOCKET;
127 pcmsghdr->cmsg_type = SCM_RIGHTS; // SCM->socket-level control message
128 // 表示在socket层传递的是访问权力,这样接受进程就可以访问对应文件对象了
129 *(int *)CMSG_DATA(pcmsghdr) = fdToSend;
130 // 数据部分是文件描述符
131 hdr.msg_control = pcmsghdr;
132 hdr.msg_controllen = CMSG_LEN(sizeof(int));
133 int ret = sendmsg(pipeFd, &hdr, 0);
134 ERROR_CHECK(ret, -1, "sendmsg");
135}
136int recvFd(int pipeFd, int *pFd) {
137 struct msghdr hdr;
138 bzero(&hdr, sizeof(struct msghdr));
139 struct iovec iov[1];
140 char buf[6] = {0};
141 iov[0].iov_base = buf;
142 iov[0].iov_len = 5;
143 hdr.msg_iov = iov;
144 hdr.msg_iovlen = 1;
145 struct cmsghdr *pcmsghdr = (struct cmsghdr *)calloc(1, sizeof(struct cmsghdr));
146 pcmsghdr->cmsg_len = CMSG_LEN(sizeof(int));
147 // 控制信息的数据部分只有int类型的文件描述符
148 pcmsghdr->cmsg_level = SOL_SOCKET;
149 pcmsghdr->cmsg_type = SCM_RIGHTS; // SCM->socket-level control message
150 hdr.msg_control = pcmsghdr;
151 hdr.msg_controllen = CMSG_LEN(sizeof(int));
152 int ret = recvmsg(pipeFd, &hdr, 0);
153 ERROR_CHECK(ret, -1, "recvmsg");
154 *pFd = *(int *)CMSG_DATA(pcmsghdr);
155 return 0;
156}
157// transFile.c
158int transFile(int netFd) {
159 train_t t = {5, "file2"};
160 send(netFd, &t, 4 + 5, MSG_NOSIGNAL);
161 int fd = open(t.buf, O_RDONLY);
162 ERROR_CHECK(fd, -1, "open");
163 bzero(&t, sizeof(t));
164 while (1) {
165 t.dataLength = read(fd, t.buf, sizeof(t.buf));
166 ERROR_CHECK(t.dataLength, -1, "read");
167 if (t.dataLength != sizeof(t.buf)) {
168 printf("t.dataLength = %d\n", t.dataLength);
169 }
170 if (t.dataLength == 0) {
171 bzero(&t, sizeof(t));
172 send(netFd, &t, 4, MSG_NOSIGNAL);
173 break;
174 }
175 int ret = send(netFd, &t, sizeof(int) + t.dataLength, MSG_NOSIGNAL);
176 if (ret == -1) {
177 perror("send");
178 break;
179 }
180 }
181 close(fd);
182 return 0;
183}
xxxxxxxxxx
291// 服务端略
2// 下面是客户端
3//...
4off_t fileSize;
5bzero(&t, sizeof(t));
6recvn(netFd, &t.dataLength, sizeof(int));
7recvn(netFd, &fileSize, t.dataLength);
8printf("fileSize = %ld\n", fileSize);
9off_t doneSize = 0;
10off_t lastSize = 0;
11off_t slice = fileSize / 100;
12int percentage = 0;
13while (1) {
14 bzero(&t, sizeof(t));
15 recvn(netFd, &t.dataLength, sizeof(int));
16 if (0 == t.dataLength) {
17 break;
18 }
19 doneSize += t.dataLength;
20 if (doneSize - lastSize >= slice) {
21 printf("%5.2lf%%\r", 100.0 * doneSize / fileSize);
22 fflush(stdout);
23 lastSize = doneSize;
24 }
25 recvn(netFd, t.buf, t.dataLength);
26 write(fd, t.buf, t.dataLength);
27}
28printf("100.00%%\n");
29//...
目前我们传输文件的时候是采用read
和send
来组合完成,这种当中的数据流向是怎么样的?首先打开一个普通文件,数据会从磁盘通过DMA设备传输到内存,即文件对象当中的内核缓冲区部分,然后调用read
数据会从内核缓冲区拷贝到一个用户态的buf上面(buf是read
函数的参数),接下来调用send
,就将数据拷贝到了网络发送缓存区,最终实现了文件传输。但是实际上这里涉及了大量的不必要的拷贝操作。
如何减少从内核文件缓冲区到用户态空间的拷贝?解决方案就是使用mmap
系统调用直接建立文件和用户态空间buf的映射。这样的话数据就减少了一次拷贝。在非常多的场景下都会使用mmap
来减少拷贝次数,典型的就是使用图形的应用去操作显卡设备的显存。除此以外,这种传输方式也可以减少由于系统调用导致的CPU用户态和内核态的切换次数。
xxxxxxxxxx
1061// 客户端
2int recvFile(int netFd) {
3 train_t t;
4 bzero(&t, sizeof(t));
5 // 先接收文件名长度
6 recvn(netFd, &t.dataLength, sizeof(int));
7 // 再接收文件名
8 recvn(netFd, t.buf, t.dataLength);
9 // 接收方创建一个同名文件
10 int fd = open(t.buf, O_RDWR | O_CREAT, 0666);
11 ERROR_CHECK(fd, -1, "open");
12 off_t fileSize;
13 bzero(&t, sizeof(t));
14 recvn(netFd, &t.dataLength, sizeof(int));
15 recvn(netFd, &fileSize, t.dataLength);
16 printf("fileSize = %ld\n", fileSize);
17 /* case 1 分批接收
18 off_t doneSize = 0;
19 off_t lastSize = 0;
20 off_t slice = fileSize/100;
21 int percentage = 0;
22 while (1)
23 {
24 bzero(&t,sizeof(t));
25 recvn(netFd,&t.dataLength,sizeof(int));
26 if(0 == t.dataLength){
27 break;
28 }
29 doneSize += t.dataLength;
30 if(doneSize-lastSize >= slice){
31 printf("%5.2lf%%\r", 100.0*doneSize/fileSize);
32 fflush(stdout);
33 lastSize = doneSize;
34 }
35 recvn(netFd,t.buf,t.dataLength);
36 write(fd,t.buf,t.dataLength);
37 }
38 */
39 // case 1一次性接收完 注意此时客户端需要修改
40 ftruncate(fd, fileSize);
41 // 前面open的权限需要改成O_RDWR
42 char *p = (char *)mmap(NULL, fileSize, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
43 ERROR_CHECK(p, MAP_FAILED, "mmap");
44 recvn(netFd, p, fileSize);
45 printf("100.00%%\n");
46 munmap(p, fileSize);
47 close(fd);
48}
49// 服务端 分批发送
50int transFile(int netFd) { // mmap_multi
51 train_t t = {5, "file2"};
52 send(netFd, &t, 4 + 5, MSG_NOSIGNAL);
53 int fd = open(t.buf, O_RDONLY);
54 ERROR_CHECK(fd, -1, "open");
55 struct stat statbuf;
56 int ret = fstat(fd, &statbuf);
57 bzero(&t, sizeof(t));
58 t.dataLength = sizeof(statbuf.st_size);
59 memcpy(t.buf, &statbuf.st_size, t.dataLength);
60 send(netFd, &t, sizeof(off_t) + 4, MSG_NOSIGNAL);
61 char *p = (char *)mmap(NULL, statbuf.st_size, PROT_READ, MAP_SHARED, fd, 0);
62 ERROR_CHECK(p, (void *)-1, "mmap");
63 off_t total = 0;
64 while (total < statbuf.st_size) {
65 if (statbuf.st_size - total > sizeof(t.buf)) {
66 t.dataLength = sizeof(t.buf);
67 } else {
68 t.dataLength = statbuf.st_size - total;
69 }
70 memcpy(t.buf, p + total, t.dataLength);
71 total += t.dataLength;
72 int ret = send(netFd, &t, sizeof(int) + t.dataLength, MSG_NOSIGNAL);
73 if (ret == -1) {
74 perror("send");
75 break;
76 }
77 }
78 // 发送结束标志
79 t.dataLength = 0;
80 send(netFd, &t, 4, MSG_NOSIGNAL);
81 munmap(p, statbuf.st_size);
82 close(fd);
83 return 0;
84}
85// 服务端 一次性发送
86int transFile(int netFd) { // mmap_once
87 train_t t = {5, "file2"};
88 send(netFd, &t, 4 + 5, MSG_NOSIGNAL);
89 int fd = open(t.buf, O_RDONLY);
90 ERROR_CHECK(fd, -1, "open");
91 struct stat statbuf;
92 int ret = fstat(fd, &statbuf);
93 bzero(&t, sizeof(t));
94 t.dataLength = sizeof(statbuf.st_size);
95 memcpy(t.buf, &statbuf.st_size, t.dataLength);
96 send(netFd, &t, sizeof(off_t) + 4, MSG_NOSIGNAL);
97 char *p = (char *)mmap(NULL, statbuf.st_size, PROT_READ, MAP_SHARED, fd, 0);
98 ERROR_CHECK(p, (void *)-1, "mmap");
99 send(netFd, p, statbuf.st_size, MSG_NOSIGNAL);
100 // 发送结束标志
101 t.dataLength = 0;
102 send(netFd, &t, 4, MSG_NOSIGNAL);
103 munmap(p, statbuf.st_size);
104 close(fd);
105 return 0;
106}
使用mmap
系统调用只能减少数据从磁盘文件的文件对象到用户态空间的拷贝,但是依然无法避免从用户态到内核已连接套接字的拷贝(因为网络设备文件对象不支持mmap
)。sendfile
系统调用可以解决这个问题,它可以使数据直接在内核中传递而不需要经过用户态空间,调用sendfile
系统调用可以直接将磁盘文件的文件对象的数据直接传递给已连接套接字文件对象,从而直接发送到网卡设备之上(在内核的底层实现中,实际上是让内核磁盘文件缓冲区和网络缓冲区对应同一片物理内存)。
xxxxxxxxxx
21
2ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);
使用sendfile
的时候要特别注意,out_fd
一般只能填写网络套接字的描述符,表示写入的文件描述符,in_fd
一般是一个磁盘文件,表示读取的文件描述符。从上述的需求可以得知,sendfile
只能用于发送文件方的零拷贝实现,无法用于接收方,并且发送文件的大小上限通常是2GB。
xxxxxxxxxx
181int transFile(int netFd) {
2 train_t t = {5, "file2"};
3 send(netFd, &t, 4 + 5, MSG_NOSIGNAL);
4 int fd = open(t.buf, O_RDONLY);
5 ERROR_CHECK(fd, -1, "open");
6 struct stat statbuf;
7 int ret = fstat(fd, &statbuf);
8 bzero(&t, sizeof(t));
9 t.dataLength = sizeof(statbuf.st_size);
10 memcpy(t.buf, &statbuf.st_size, t.dataLength);
11 send(netFd, &t, sizeof(off_t) + 4, MSG_NOSIGNAL);
12 // 发送结束标志
13 sendfile(netFd, fd, NULL, statbuf.st_size);
14 t.dataLength = 0;
15 send(netFd, &t, 4, MSG_NOSIGNAL);
16 close(fd);
17 return 0;
18}
考虑到sendfile
只能将数据从磁盘文件发送到网络设备中,那么接收方如何在避免使用mmap
的情况下使用零拷贝技术呢?一种方式就是采用管道配合splice
的做法。splice
系统调用可以直接将数据从内核管道文件缓冲区发送到另一个内核文件缓冲区,也可以反之,将一个内核文件缓冲区的数据直接发送到内核管道缓冲区中。所以只需要在内核创建一个匿名管道,这个管道用于本进程中,在磁盘文件和网络文件之间无拷贝地传递数据。
xxxxxxxxxx
11ssize_t splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags);
xxxxxxxxxx
131//...
2recvn(netFd, &t.dataLength, sizeof(int));
3recvn(netFd, &fileSize, t.dataLength);
4printf("fileSize = %ld\n", fileSize);
5int pipefds[2];
6pipe(pipefds);
7int total = 0;
8while (total < fileSize) {
9 int ret = splice(netFd, NULL, pipefds[1], NULL, 4096, SPLICE_F_MORE);
10 total += ret;
11 splice(pipefds[0], NULL, fd, NULL, ret, SPLICE_F_MORE);
12}
13//...
进程池的简单退出要实现功能很简单,就是让父进程收到信号之后,再给每个子进程发送信号使其终止,这种实现方案只需要让父进程在一个目标信号(通常是10信号SIGUSR1
)的过程给目标子进程发送信号即可。
在实现的过程需要注意的是signal
函数和fork
函数之间调用顺序,因为父进程会修改默认递送行为,而子进程会执行默认行为,所以fork
应该要在signal
的之后调用。
xxxxxxxxxx
191processData_t *workerList; // 需要改成全局变量
2int workerNum;
3void sigFunc(int signum) {
4 printf("signum = %d\n", signum);
5 for (int i = 0; i < workerNum; ++i) {
6 kill(workerList[i].pid, SIGUSR1);
7 }
8 for (int i = 0; i < workerNum; ++i) {
9 wait(NULL);
10 }
11 puts("process pool is over!");
12 exit(0);
13}
14int main() {
15 //..
16 makeChild(workerList, workerNum);
17 signal(SIGUSR1, sigFunc);
18 // 注意fork和signal的顺序
19}
采用信号就不可避免要使用全局变量,因为信号处理函数当中只能存储有限的信息,有没有办法避免全局的进程数量和进程数组呢?一种解决方案就是采取“异步拉起同步”的策略:虽然还是需要创建一个管道全局变量,但是该管道只用于处理进程池退出,不涉及其他的进程属性。这个管道的读端需要使用IO多路复用机制管理起来,而当信号产生之后,主进程递送信号的时候会往管道中写入数据,此时可以依靠epoll
的就绪事件,在事件处理中来完成退出的逻辑。
xxxxxxxxxx
241int pipeFd[2];
2void sigFunc(int signum) {
3 printf("signum = %d\n", signum);
4 write(pipeFd[1], "1", 1);
5}
6int main() {
7 //...
8 pipe(pipeFd);
9 epollAdd(pipeFd[0], epfd);
10 //...
11 //...epoll就绪事件处理
12 else if (readylist[i].data.fd == pipeFd[0]) {
13 for (int j = 0; j < workerNum; ++j) {
14 kill(workerList[j].pid, SIGINT);
15 puts("send signal to worker!");
16 }
17 for (int j = 0; j < workerNum; ++j) {
18 wait(NULL);
19 }
20 printf("Parent process exit!\n");
21 exit(0);
22 }
23 //...
24}
上述的退出机制存在一个问题,就是即使工作进程正在传输文件中,父进程也会通过信号将其终止。如何实现进程池在退出的时候,子进程要完成传输文件的工作之后才能退出呢?
一种典型的方案是使用sigprocmask
在文件传输的过程中设置信号屏蔽字,这样可以实现上述的机制。
另一种方案就是调整sendFd
的设计,每个工作进程在传输完文件之后总是循环地继续下一个事件,而在每个事件处理的开始,工作进程总是会调用recvFd
来使自己处于阻塞状态直到有事件到达。我们可以对进程池的终止作一些调整:用户发送信号给父进程表明将要退出进程池;随后父进程通过sendFd
给所有的工作进程发送终止的信息,工作进程在完成了一次工作任务了之后就会recvFd
收到进程池终止的信息,然后工作进程就可以主动退出;随着所有的工作进程终止,父进程亦随后终止,整个进程池就终止了。
xxxxxxxxxx
581int sendFd(int pipeFd, int fdToSend, int exitFlag) {
2 struct msghdr hdr;
3 bzero(&hdr, sizeof(struct msghdr));
4 struct iovec iov[1];
5 iov[0].iov_base = &exitFlag;
6 iov[0].iov_len = sizeof(int);
7 hdr.msg_iov = iov;
8 hdr.msg_iovlen = 1;
9 //...
10}
11int recvFd(int pipeFd, int *pFd, int *exitFlag) {
12 struct msghdr hdr;
13 bzero(&hdr, sizeof(struct msghdr));
14 struct iovec iov[1];
15 iov[0].iov_base = exitFlag;
16 iov[0].iov_len = sizeof(int);
17 hdr.msg_iov = iov;
18 hdr.msg_iovlen = 1;
19 //.....
20}
21void handleEvent(int pipeFd) {
22 int netFd;
23 while (1) {
24 int exitFlag;
25 recvFd(pipeFd, &netFd, &exitFlag);
26 if (exitFlag == 1) {
27 puts("I am closing!");
28 exit(0);
29 }
30 //...
31 }
32}
33//... epoll
34for (int i = 0; i < readynum; ++i) {
35 if (readylist[i].data.fd == sockFd) {
36 puts("accept ready");
37 int netFd = accept(sockFd, NULL, NULL);
38 for (int j = 0; j < workerNum; ++j) {
39 if (workerList[j].status == FREE) {
40 printf("No. %d worker gets his job, pid = %d\n", j, workerList[j].pid);
41 sendFd(workerList[j].pipeFd, netFd, 0);
42 workerList[j].status = BUSY;
43 break;
44 }
45 }
46 close(netFd);
47 } else if (readylist[i].data.fd == exitpipeFd[0]) {
48 for (int j = 0; j < workerNum; ++j) {
49 puts("set exitFlag to worker!");
50 sendFd(workerList[j].pipeFd, 0, 1);
51 }
52 for (int j = 0; j < workerNum; ++j) {
53 wait(NULL);
54 }
55 printf("Parent process exit!\n");
56 exit(0);
57 }
58//....
用进程池的思路来解决并发连接是一种经典的基于事件驱动模型的解决方案,但是由于进程天生具有隔离性,导致进程之间通信十分困难,一种优化的思路就是用线程来取代进程,即所谓的线程池。
由于多线程是共享地址空间的,所以主线程和工作线程天然地通过共享文件描述符数值的形式共享网络文件对象,但是这种共享也会带来麻烦:每当有客户端发起请求时,主线程会分配一个空闲的工作线程完成任务,而任务正是在多个线程之间共享的资源,所以需要采用一定的互斥和同步的机制来避免竞争。
我们可以将任务设计成一个队列,任务队列就成为多个线程同时访问的共享资源,此时问题就转化成了一个典型的生产者-消费者问题:任务队列中的任务就是商品,主线程是生产者,每当有连接到来的时候,就将一个任务放入任务队列,即生产商品,而各个工作线程就是消费者,每当队列中任务到来的时候,就负责取出任务并执行。
下面是线程池的基本设计方案:
xxxxxxxxxx
321// 通常把构建实际对象的函数称为工厂函数
2// factory.h
3
4
5
6// 这里用来描述整个进程池的信息,也是线程间共享的数据
7typedef struct factory_s {
8 pthread_t *tidArr;
9 int threadNum;
10 taskQueue_t taskQueue;
11} factory_t;
12int factoryInit(factory_t *pFactory, int threadNum);
13
14// 任务队列的设计
15// taskQueue.h
16
17
18
19typedef struct task_s {
20 int netFd;
21 struct task_s *pNext;
22} task_t;
23typedef struct taskQueue_s {
24 task_t *pFront;
25 task_t *pRear;
26 int queueSize; // 当前任务的个数
27 pthread_mutex_t mutex; // 任务队列的锁
28 pthread_cond_t cond;
29} taskQueue_t;
30int taskEnQueue(taskQueue_t *pTaskQueue, int netFd);
31int taskDeQueue(taskQueue_t *pTaskQueue);
32
xxxxxxxxxx
191int factoryInit(factory_t *pFactory, int threadNum) {
2 bzero(pFactory, sizeof(factory_t));
3 pFactory->threadNum = threadNum;
4 pFactory->tidArr = (pthread_t *)calloc(threadNum, sizeof(pthread_t));
5 pthread_cond_init(&pFactory->cond, NULL);
6 bzero(&pFactory->taskQueue, sizeof(taskQueue_t));
7 pthread_mutex_init(&pFactory->taskQueue.mutex, NULL);
8}
9int main(int argc, char *argv[]) {
10 //./main 192.168.135.132 5678 10
11 ARGS_CHECK(argc, 4);
12 int workerNum = atoi(argv[3]);
13 factory_t factory;
14 factoryInit(&factory, workerNum);
15 makeWorker(&factory);
16 int sockFd;
17 tcpInit(argv[1], argv[2], &sockFd);
18 //...
19}
接下来,主线程需要accept
客户端的连接并且需要将任务加入到任务队列。(目前会引发主线程阻塞的行为只有accept
,但是为了可维护性,即后续的需求可能需要主线程管理更多的文件描述符,所以我们使用epoll
将网络文件加入监听)。一旦有新的客户端连接,那么主线程就会将新的任务加入任务队列,并且使用条件变量通知子线程。(如果没有空闲的子线程处于等待状态,这个任务会被直接丢弃)
xxxxxxxxxx
181int epfd = epollCtor();
2int sockFd;
3tcpInit(argv[1], argv[2], &sockFd);
4epollAdd(sockFd, epfd);
5struct epoll_event evs[2];
6while (1) {
7 int readyNum = epoll_wait(epfd, evs, 2, -1);
8 for (int i = 0; i < readyNum; ++i) {
9 if (evs[i].data.fd == sockFd) {
10 int netFd = accept(sockFd, NULL, NULL);
11 pthread_mutex_lock(&factory.taskQueue.mutex);
12 taskEnQueue(&factory.taskQueue, netFd);
13 printf("New Task!\n");
14 pthread_cond_signal(&factory.taskQueue.cond);
15 pthread_mutex_unlock(&factory.taskQueue.mutex);
16 }
17 }
18}
子线程在启动的时候,会使用条件变量使自己处于阻塞状态,一旦条件满足之后,就立即从任务队列中取出任务并且处理该事件。
xxxxxxxxxx
251void makeWorker(factory_t *pFactory) {
2 for (int i = 0; i < pFactory->threadNum; ++i) {
3 pthread_create(pFactory->tidArr + i, NULL, threadFunc, (void *)pFactory);
4 }
5}
6void *threadFunc(void *pArgs) {
7 factory_t *pFactory = (factory_t *)pArgs;
8 while (1) {
9 pthread_mutex_lock(&pFactory->taskQueue.mutex);
10 while (pFactory->taskQueue.queueSize == 0) {
11 pthread_cond_wait(&pFactory->taskQueue.cond, &pFactory->taskQueue.mutex);
12 }
13 printf("Get Task!\n");
14 int netFd = pFactory->taskQueue.pFront->netFd;
15 taskDeQueue(&pFactory->taskQueue);
16 pthread_mutex_unlock(&pFactory->taskQueue.mutex);
17 handleEvent(netFd);
18 printf("pthread done! tid = %lu\n", pthread_self());
19 }
20}
21int handleEvent(int netFd) {
22 transFile(netFd);
23 close(netFd);
24 return 0;
25}
xxxxxxxxxx
201int taskEnQueue(taskQueue_t *pTaskQueue, int netFd) {
2 task_t *pTask = (task_t *)calloc(1, sizeof(task_t));
3 pTask->netFd = netFd;
4 if (pTaskQueue->queueSize == 0) {
5 pTaskQueue->pFront = pTask;
6 pTaskQueue->pRear = pTask;
7 } else {
8 pTaskQueue->pRear->pNext = pTask;
9 pTaskQueue->pRear = pTask;
10 }
11 ++pTaskQueue->queueSize;
12 return 0;
13}
14int taskDeQueue(taskQueue_t *pTaskQueue) {
15 task_t *pCur = pTaskQueue->pFront;
16 pTaskQueue->pFront = pTaskQueue->pFront->pNext;
17 free(pCur);
18 --pTaskQueue->queueSize;
19 return 0;
20}
xxxxxxxxxx
511int exitPipe[2];
2void sigFunc(int signum) {
3 printf("signum = %d\n", signum);
4 write(exitPipe[1], "1", 1);
5 puts("Parent exit!");
6}
7int main(int argc, char *argv[]) {
8 //./main 192.168.135.132 5678 10
9 ARGS_CHECK(argc, 4);
10 pipe(exitPipe);
11 if (fork() != 0) {
12 close(exitPipe[0]);
13 signal(SIGUSR1, sigFunc);
14 wait(NULL);
15 exit(0);
16 }
17 close(exitPipe[1]);
18 int workerNum = atoi(argv[3]);
19 factory_t factory;
20 factoryInit(&factory, workerNum);
21 makeWorker(&factory);
22 int epfd = epollCtor();
23 int sockFd;
24 tcpInit(argv[1], argv[2], &sockFd);
25 epollAdd(sockFd, epfd);
26 epollAdd(exitPipe[0], epfd);
27 struct epoll_event evs[2];
28 while (1) {
29 int readyNum = epoll_wait(epfd, evs, 2, -1);
30 for (int i = 0; i < readyNum; ++i) {
31 if (evs[i].data.fd == sockFd) {
32 int netFd = accept(sockFd, NULL, NULL);
33 pthread_mutex_lock(&factory.taskQueue.mutex);
34 taskEnQueue(&factory.taskQueue, netFd);
35 printf("New Task!\n");
36 pthread_cond_signal(&factory.taskQueue.cond);
37 pthread_mutex_unlock(&factory.taskQueue.mutex);
38 } else if (evs[i].data.fd == exitPipe[0]) {
39 puts("exit threadPool!");
40 for (int j = 0; j < workerNum; ++j) {
41 pthread_cancel(factory.tidArr[j]);
42 }
43 for (int j = 0; j < workerNum; ++j) {
44 pthread_join(factory.tidArr[j], NULL);
45 }
46 puts("done");
47 exit(0);
48 }
49 }
50 }
51}
直接使用上述代码会存在一个问题,那就是只能关闭掉一个子线程,这里的原因其实比较简单pthread_cond_wait
是一个取消点,所以收到了取消之后,线程会唤醒并终止,然而由于条件变量的设计,所以线程终止的时候它是持有锁的,这就导致死锁。这种死锁的解决方案就是引入资源清理机制,在加锁行为执行的时候立刻将清理行为压入资源清理栈当中。
xxxxxxxxxx
211void cleanFunc(void *pArgs) {
2 factory_t *pFactory = (factory_t *)pArgs;
3 pthread_mutex_unlock(&pFactory->taskQueue.mutex);
4}
5void *threadFunc(void *pArgs) {
6 int netFd;
7 while (1) {
8 factory_t *pFactory = (factory_t *)pArgs;
9 pthread_mutex_lock(&pFactory->taskQueue.mutex);
10 pthread_cleanup_push(cleanFunc, (void *)pFactory);
11 while (pFactory->taskQueue.queueSize == 0) {
12 pthread_cond_wait(&pFactory->taskQueue.cond, &pFactory->taskQueue.mutex);
13 }
14 printf("Get Task!\n");
15 netFd = pFactory->taskQueue.pFront->netFd;
16 taskDeQueue(&pFactory->taskQueue);
17 pthread_cleanup_pop(1);
18 handleEvent(netFd);
19 printf("pthread done! tid = %lu\n", pthread_self());
20 }
21}
如果使用pthread_cancel
,由于读写文件的函数是取消点,那么正在工作线程也会被终止,从而导致正在执行的下载任务无法完成。如何实现线程池的优雅退出呢?一种解决方案就是不使用pthread_cancel
,而是让每个工作线程在事件循环开始的时候,检查一下线程池是否处于终止的状态,这样子线程就会等待当前任务执行完成了之后才会终止。
xxxxxxxxxx
421//...//
2else if (evs[i].data.fd == exitPipe[0]) {
3 puts("exit threadPool!");
4 factory.runningFlag = 0;
5 pthread_cond_broadcast(&factory.taskQueue.cond);
6 for (int j = 0; j < workerNum; ++j) {
7 pthread_join(factory.tidArr[j], NULL);
8 }
9 puts("done");
10 exit(0);
11}
12//..//
13void *threadFunc(void *pArgs) {
14 int netFd;
15 while (1) {
16 factory_t *pFactory = (factory_t *)pArgs;
17 pthread_mutex_lock(&pFactory->taskQueue.mutex);
18 pthread_cleanup_push(cleanFunc, (void *)pFactory);
19 while (pFactory->taskQueue.queueSize == 0) {
20 pthread_cond_wait(&pFactory->taskQueue.cond, &pFactory->taskQueue.mutex);
21 if (pFactory->runningFlag == 0) {
22 puts("child exit");
23 pthread_exit(NULL);
24 }
25 }
26 printf("Get Task!\n");
27 netFd = pFactory->taskQueue.pFront->netFd;
28 taskDeQueue(&pFactory->taskQueue);
29 pthread_cleanup_pop(1);
30 handleEvent(netFd, pFactory);
31 printf("pthread done! tid = %lu\n", pthread_self());
32 }
33}
34int handleEvent(int netFd, factory_t *pFactory) {
35 transFile(netFd);
36 close(netFd);
37 if (pFactory->runningFlag == 0) {
38 puts("child exit");
39 pthread_exit(NULL);
40 }
41 return 0;
42}