半双工通信,只能在有亲缘关系的进程间使用
适用场景: 父子进程间通信
xxxxxxxxxx
21
2int pipe(int pipefd[2]);
参数说明:
pipefd[2]
: 文件描述符数组
pipefd[0]
: 读端
pipefd[1]
: 写端
返回值:
成功返回0
失败返回-1,并设置errno
相关函数
xxxxxxxxxx
41
2ssize_t read(int fd, void *buf, size_t count);
3ssize_t write(int fd, const void *buf, size_t count);
4int close(int fd);
Note
读写操作默认是阻塞的
内核缓冲区大小通常为64KB
必须正确关闭不使用的端
写入已关闭读端的管道会产生SIGPIPE
x1
2
3
4
5
6using namespace std;
7
8int main() {
9 int pipefd[2];
10 pid_t pid;
11 char buffer[1024];
12
13 // 创建管道
14 if (pipe(pipefd) == -1) {
15 perror("pipe创建失败");
16 return -1;
17 }
18
19 cout << "管道创建成功" << endl;
20 cout << "读端文件描述符: " << pipefd[0] << endl;
21 cout << "写端文件描述符: " << pipefd[1] << endl;
22
23 // 创建子进程
24 pid = fork();
25
26 if (pid == -1) {
27 perror("fork失败");
28 return -1;
29 }
30
31 if (pid == 0) {
32 // 子进程:读取数据
33 cout << "子进程启动,PID: " << getpid() << endl;
34
35 // 关闭写端
36 close(pipefd[1]);
37
38 // 从管道读取数据
39 while (true) {
40 memset(buffer, 0, sizeof(buffer));
41 ssize_t bytes_read = read(pipefd[0], buffer, sizeof(buffer) - 1);
42
43 if (bytes_read > 0) {
44 buffer[bytes_read] = '\0';
45 cout << "子进程收到: " << buffer << endl;
46
47 // 如果收到"quit"则退出
48 if (strcmp(buffer, "quit") == 0) {
49 break;
50 }
51 } else if (bytes_read == 0) {
52 cout << "管道写端已关闭" << endl;
53 break;
54 } else {
55 perror("读取失败");
56 break;
57 }
58 }
59
60 close(pipefd[0]);
61 cout << "子进程结束" << endl;
62
63 } else {
64 // 父进程:发送数据
65 cout << "父进程启动,PID: " << getpid() << endl;
66 cout << "子进程PID: " << pid << endl;
67
68 // 关闭读端
69 close(pipefd[0]);
70
71 // 向管道写入数据
72 string messages[] = {
73 "Hello from parent!",
74 "This is message 1",
75 "This is message 2",
76 "quit"
77 };
78
79 for (const string& msg : messages) {
80 cout << "父进程发送: " << msg << endl;
81 write(pipefd[1], msg.c_str(), msg.length());
82 sleep(1); // 延时1秒
83 }
84
85 close(pipefd[1]);
86
87 // 等待子进程结束
88 int status;
89 wait(&status);
90 cout << "父进程结束" << endl;
91 }
92
93 return 0;
94}
可以在无亲缘关系的进程间使用
适用场景: 不相关进程间的单向通信
xxxxxxxxxx
21
2int mkfifo(const char *pathname, mode_t mode);
参数说明:
pathname
: FIFO文件路径
mode
: 文件权限(如0666)
返回值:
成功返回0
失败返回-1,并设置errno
相关函数
xxxxxxxxxx
31
2int open(const char *pathname, int flags);
3int open(const char *pathname, int flags, mode_t mode);
常用flags
:
O_RDONLY
: 只读模式
O_WRONLY
: 只写模式
O_RDWR
: 读写模式
O_NONBLOCK
: 非阻塞模式
Note
创建时需要设置合适的权限
打开FIFO时可能阻塞,直到另一端也打开
使用完毕后应删除FIFO文件
多个进程可以同时访问同一个FIFO
xxxxxxxxxx
601
2
3
4
5
6
7using namespace std;
8
9const char* FIFO_PATH = "/tmp/my_fifo";
10
11int main() {
12 int fd;
13
14 // 创建命名管道
15 if (mkfifo(FIFO_PATH, 0666) == -1) {
16 perror("mkfifo失败");
17 // 如果文件已存在,继续执行
18 if (errno != EEXIST) {
19 return -1;
20 }
21 }
22
23 cout << "命名管道创建成功: " << FIFO_PATH << endl;
24
25 // 以写模式打开FIFO
26 cout << "等待读进程连接..." << endl;
27 fd = open(FIFO_PATH, O_WRONLY);
28 if (fd == -1) {
29 perror("打开FIFO失败");
30 return -1;
31 }
32
33 cout << "读进程已连接,开始发送数据" << endl;
34
35 // 发送消息
36 string messages[] = {
37 "Hello from writer process!",
38 "Message 1: FIFO communication",
39 "Message 2: Inter-process communication",
40 "Message 3: Named pipe example",
41 "quit"
42 };
43
44 for (const string& msg : messages) {
45 cout << "发送: " << msg << endl;
46
47 ssize_t bytes_written = write(fd, msg.c_str(), msg.length());
48 if (bytes_written == -1) {
49 perror("写入失败");
50 break;
51 }
52
53 sleep(2); // 延时2秒
54 }
55
56 close(fd);
57 cout << "写进程结束" << endl;
58
59 return 0;
60}
示例代码:读进程
xxxxxxxxxx
681
2
3
4
5
6
7using namespace std;
8
9const char* FIFO_PATH = "/tmp/my_fifo";
10
11int main() {
12 int fd;
13 char buffer[1024];
14
15 cout << "读进程启动,PID: " << getpid() << endl;
16
17 // 检查FIFO是否存在
18 struct stat st;
19 if (stat(FIFO_PATH, &st) != 0) {
20 cout << "FIFO不存在,请先运行写进程" << endl;
21 return -1;
22 }
23
24 // 以读模式打开FIFO
25 cout << "打开FIFO: " << FIFO_PATH << endl;
26 fd = open(FIFO_PATH, O_RDONLY);
27 if (fd == -1) {
28 perror("打开FIFO失败");
29 return -1;
30 }
31
32 cout << "FIFO打开成功,等待数据..." << endl;
33
34 // 读取数据
35 while (true) {
36 memset(buffer, 0, sizeof(buffer));
37 ssize_t bytes_read = read(fd, buffer, sizeof(buffer) - 1);
38
39 if (bytes_read > 0) {
40 buffer[bytes_read] = '\0';
41 cout << "收到: " << buffer << endl;
42
43 // 如果收到"quit"则退出
44 if (strcmp(buffer, "quit") == 0) {
45 break;
46 }
47 } else if (bytes_read == 0) {
48 cout << "写进程已关闭连接" << endl;
49 break;
50 } else {
51 perror("读取失败");
52 break;
53 }
54 }
55
56 close(fd);
57
58 // 删除FIFO文件
59 if (unlink(FIFO_PATH) == -1) {
60 perror("删除FIFO失败");
61 } else {
62 cout << "FIFO文件已删除" << endl;
63 }
64
65 cout << "读进程结束" << endl;
66
67 return 0;
68}
异步通信机制,用于通知进程发生了某个事件,信号是软件中断,可以在任何时候发送给进程,进程必须告诉内核如何处理每一种信号
适用场景: 简单的事件通知,如进程终止、用户中断等
限制: 只能传递信号类型,无法传递复杂数据
Note
信号可能在任何时候到达
同类型信号在处理期间再次到达会被丢弃
信号可能中断系统调用
需要小心处理信号处理函数中的共享数据
某些信号在不同系统上行为可能不同
信号名 | 信号值 | 默认动作 | 说明 |
---|---|---|---|
SIGINT | 2 | 终止 | 中断信号(Ctrl+C) |
SIGQUIT | 3 | 终止+核心转储 | 退出信号(Ctrl+) |
SIGKILL | 9 | 终止 | 强制杀死(不可捕获) |
SIGTERM | 15 | 终止 | 终止信号(可捕获) |
SIGCHLD | 17 | 忽略 | 子进程状态改变 |
SIGUSR1 | 10 | 终止 | 用户自定义信号1 |
SIGUSR2 | 12 | 终止 | 用户自定义信号2 |
SIGALRM | 14 | 终止 | 定时器信号 |
1.signal() 函数
xxxxxxxxxx
31
2typedef void (*sighandler_t)(int);
3sighandler_t signal(int signum, sighandler_t handler);
参数说明:
signum
: 信号编号
handler
: 信号处理函数指针,可以是:
自定义函数指针
SIG_DFL
: 默认处理
SIG_IGN
: 忽略信号
返回值:
成功:返回之前的信号处理函数
失败:返回SIG_ERR
2. sigaction() 函数
xxxxxxxxxx
101
2int sigaction(int signum, const struct sigaction *act, struct sigaction *oldact);
3
4struct sigaction {
5 void (*sa_handler)(int); // 信号处理函数
6 void (*sa_sigaction)(int, siginfo_t *, void *); // 扩展处理函数
7 sigset_t sa_mask; // 信号屏蔽集
8 int sa_flags; // 标志位
9 void (*sa_restorer)(void); // 已废弃
10};
参数说明:
signum
: 信号编号
act
: 新的信号处理方式
oldact
: 保存原来的信号处理方式
3. kill() 函数
xxxxxxxxxx
21
2int kill(pid_t pid, int sig);
参数说明:
pid
: 目标进程ID
> 0
: 发送给指定进程
= 0
: 发送给同组所有进程
= -1
: 发送给所有有权限的进程
< -1
: 发送给进程组|-pid|
sig
: 信号编号
4. raise() 函数
xxxxxxxxxx
21
2int raise(int sig);
功能: 向当前进程发送信号,等价于kill(getpid(), sig)
xxxxxxxxxx
1181
2
3
4
5
6
7using namespace std;
8
9// 全局变量用于进程间通信
10volatile sig_atomic_t message_count = 0;
11volatile sig_atomic_t should_exit = 0;
12
13// 父进程信号处理函数
14void parent_handler(int sig) {
15 switch(sig) {
16 case SIGUSR1:
17 message_count++;
18 cout << "父进程收到子进程消息 #" << message_count << endl;
19 break;
20 case SIGUSR2:
21 cout << "父进程收到子进程完成信号" << endl;
22 should_exit = 1;
23 break;
24 case SIGCHLD:
25 cout << "子进程状态改变" << endl;
26 break;
27 }
28}
29
30// 子进程信号处理函数
31void child_handler(int sig) {
32 switch(sig) {
33 case SIGUSR1:
34 cout << "子进程收到父进程指令" << endl;
35 break;
36 case SIGTERM:
37 cout << "子进程收到终止信号" << endl;
38 exit(0);
39 break;
40 }
41}
42
43int main() {
44 pid_t pid;
45
46 cout << "进程间信号通信示例" << endl;
47
48 // 创建子进程
49 pid = fork();
50
51 if (pid == -1) {
52 perror("fork失败");
53 return -1;
54 }
55
56 if (pid == 0) {
57 // 子进程
58 cout << "子进程启动,PID: " << getpid() << endl;
59 cout << "父进程PID: " << getppid() << endl;
60
61 // 注册信号处理函数
62 signal(SIGUSR1, child_handler);
63 signal(SIGTERM, child_handler);
64
65 // 子进程工作循环
66 for(int i = 1; i <= 5; i++) {
67 sleep(2);
68 cout << "子进程工作步骤 " << i << "/5" << endl;
69
70 // 向父进程发送进度信号
71 kill(getppid(), SIGUSR1);
72 }
73
74 // 工作完成,通知父进程
75 cout << "子进程工作完成" << endl;
76 kill(getppid(), SIGUSR2);
77
78 // 等待父进程指令
79 while(true) {
80 sleep(1);
81 }
82
83 } else {
84 // 父进程
85 cout << "父进程启动,PID: " << getpid() << endl;
86 cout << "子进程PID: " << pid << endl;
87
88 // 注册信号处理函数
89 signal(SIGUSR1, parent_handler);
90 signal(SIGUSR2, parent_handler);
91 signal(SIGCHLD, parent_handler);
92
93 // 等待子进程完成工作
94 cout << "等待子进程完成工作..." << endl;
95
96 while(!should_exit) {
97 sleep(1);
98 }
99
100 cout << "总共收到 " << message_count << " 个进度消息" << endl;
101
102 // 向子进程发送指令
103 cout << "向子进程发送指令" << endl;
104 kill(pid, SIGUSR1);
105 sleep(1);
106
107 // 终止子进程
108 cout << "终止子进程" << endl;
109 kill(pid, SIGTERM);
110
111 // 等待子进程结束
112 int status;
113 wait(&status);
114 cout << "子进程已结束,状态: " << status << endl;
115 }
116
117 return 0;
118}
消息队列是一种重要的进程间通信机制,允许进程通过消息进行异步通信。Linux提供了两种消息队列实现:System V消息队列和POSIX消息队列。
消息队列提供了可靠的异步通信机制,特别适合生产者-消费者模式的应用场景。
特点: 消息有类型,支持优先级
msgget()
- 创建或获取消息队列
xxxxxxxxxx
11int msgget(key_t key, int msgflg);
key
: 消息队列的键值(通常使用ftok()
生成)
msgflg
: 权限标志和创建标志
返回值: 消息队列标识符
msgsnd()
- 发送消息
xxxxxxxxxx
11int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
msqid
: 消息队列标识符
msgp
: 指向消息结构的指针
msgsz
: 消息数据的大小
msgflg
: 控制标志
msgrcv()
- 接收消息
xxxxxxxxxx
11ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
msgtyp
: 消息类型(0表示接收任意类型)
msgctl()
- 控制消息队列
xxxxxxxxxx
11int msgctl(int msqid, int cmd, struct msqid_ds *buf);
cmd
: 控制命令(如IPC_RMID删除队列)
发送端:
xxxxxxxxxx
481
2
3
4
5
6
7// 消息结构体
8struct Message {
9 long msg_type; // 消息类型,必须大于0
10 char msg_text[256]; // 消息内容
11};
12
13int main() {
14 // 生成唯一键值
15 key_t key = ftok("/tmp", 'A');
16 if (key == -1) {
17 perror("ftok failed");
18 return 1;
19 }
20
21 // 创建或获取消息队列
22 int msgid = msgget(key, IPC_CREAT | 0666);
23 if (msgid == -1) {
24 perror("msgget failed");
25 return 1;
26 }
27
28 std::cout << "System V消息队列创建成功,ID: " << msgid << std::endl;
29
30 Message msg;
31 for (int i = 1; i <= 5; ++i) {
32 msg.msg_type = i % 3 + 1; // 消息类型1-3
33 snprintf(msg.msg_text, sizeof(msg.msg_text),
34 "消息 %d,类型 %ld,PID: %d", i, msg.msg_type, getpid());
35
36 // 发送消息
37 if (msgsnd(msgid, &msg, strlen(msg.msg_text) + 1, 0) == -1) {
38 perror("msgsnd failed");
39 continue;
40 }
41
42 std::cout << "发送: " << msg.msg_text << std::endl;
43 sleep(1);
44 }
45
46 std::cout << "所有消息发送完成" << std::endl;
47 return 0;
48}
接收端:
xxxxxxxxxx
661
2
3
4
5
6
7struct Message {
8 long msg_type;
9 char msg_text[256];
10};
11
12int msgid;
13bool running = true;
14
15void cleanup(int sig) {
16 std::cout << "\n正在清理资源..." << std::endl;
17
18 // 删除消息队列
19 if (msgctl(msgid, IPC_RMID, nullptr) == -1) {
20 perror("msgctl IPC_RMID failed");
21 } else {
22 std::cout << "消息队列已删除" << std::endl;
23 }
24
25 running = false;
26}
27
28int main() {
29 // 设置信号处理
30 signal(SIGINT, cleanup);
31 signal(SIGTERM, cleanup);
32
33 // 获取消息队列
34 key_t key = ftok("/tmp", 'A');
35 if (key == -1) {
36 perror("ftok failed");
37 return 1;
38 }
39
40 msgid = msgget(key, 0666);
41 if (msgid == -1) {
42 perror("msgget failed");
43 return 1;
44 }
45
46 std::cout << "连接到消息队列,ID: " << msgid << std::endl;
47 std::cout << "等待消息... (Ctrl+C退出)" << std::endl;
48
49 Message msg;
50 while (running) {
51 // 接收任意类型的消息
52 ssize_t bytes = msgrcv(msgid, &msg, sizeof(msg.msg_text), 0, IPC_NOWAIT);
53
54 if (bytes > 0) {
55 std::cout << "接收到消息 [类型 " << msg.msg_type << "]: "
56 << msg.msg_text << std::endl;
57 } else if (bytes == -1 && errno != ENOMSG) {
58 perror("msgrcv failed");
59 break;
60 }
61
62 usleep(100000); // 100ms
63 }
64
65 return 0;
66}
特点: 更现代的接口,支持异步通知
mq_open()
- 打开或创建消息队列
xxxxxxxxxx
11mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);
name
: 消息队列名称(以"/"开头)
oflag
: 打开标志(O_RDONLY
, O_WRONLY
, O_RDWR
等)
mq_send()
- 发送消息
xxxxxxxxxx
11int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio);
msg_prio
: 消息优先级(0-31,数值越大优先级越高)
mq_receive()
- 接收消息
xxxxxxxxxx
11ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio);
mq_close()
- 关闭消息队列
xxxxxxxxxx
11int mq_close(mqd_t mqdes);
mq_unlink()
- 删除消息队列
xxxxxxxxxx
11int mq_unlink(const char *name);
发送端:
xxxxxxxxxx
491
2
3
4
5
6
7
8int main() {
9 const char* queue_name = "/test_queue";
10
11 // 设置消息队列属性
12 struct mq_attr attr;
13 attr.mq_flags = 0;
14 attr.mq_maxmsg = 10; // 最大消息数
15 attr.mq_msgsize = 256; // 最大消息大小
16 attr.mq_curmsgs = 0;
17
18 // 创建或打开消息队列
19 mqd_t mq = mq_open(queue_name, O_CREAT | O_WRONLY, 0644, &attr);
20 if (mq == (mqd_t)-1) {
21 perror("mq_open failed");
22 return 1;
23 }
24
25 std::cout << "POSIX消息队列创建成功" << std::endl;
26
27 // 发送不同优先级的消息
28 for (int i = 1; i <= 5; ++i) {
29 char message[256];
30 unsigned int priority = i % 3; // 优先级0-2
31
32 snprintf(message, sizeof(message),
33 "POSIX消息 %d,优先级 %u,PID: %d", i, priority, getpid());
34
35 if (mq_send(mq, message, strlen(message) + 1, priority) == -1) {
36 perror("mq_send failed");
37 continue;
38 }
39
40 std::cout << "发送: " << message << " [优先级: " << priority << "]" << std::endl;
41 sleep(1);
42 }
43
44 // 关闭消息队列
45 mq_close(mq);
46 std::cout << "所有消息发送完成" << std::endl;
47
48 return 0;
49}
接收端:
xxxxxxxxxx
711
2
3
4
5
6
7const char* queue_name = "/test_queue";
8mqd_t mq;
9bool running = true;
10
11void cleanup(int sig) {
12 std::cout << "\n正在清理资源..." << std::endl;
13
14 // 关闭并删除消息队列
15 mq_close(mq);
16 if (mq_unlink(queue_name) == -1) {
17 perror("mq_unlink failed");
18 } else {
19 std::cout << "消息队列已删除" << std::endl;
20 }
21
22 running = false;
23}
24
25int main() {
26 // 设置信号处理
27 signal(SIGINT, cleanup);
28 signal(SIGTERM, cleanup);
29
30 // 打开消息队列
31 mq = mq_open(queue_name, O_RDONLY);
32 if (mq == (mqd_t)-1) {
33 perror("mq_open failed");
34 return 1;
35 }
36
37 // 获取队列属性
38 struct mq_attr attr;
39 if (mq_getattr(mq, &attr) == -1) {
40 perror("mq_getattr failed");
41 return 1;
42 }
43
44 std::cout << "连接到POSIX消息队列" << std::endl;
45 std::cout << "最大消息数: " << attr.mq_maxmsg << std::endl;
46 std::cout << "最大消息大小: " << attr.mq_msgsize << std::endl;
47 std::cout << "等待消息... (Ctrl+C退出)" << std::endl;
48
49 char buffer[attr.mq_msgsize];
50 unsigned int priority;
51
52 while (running) {
53 // 设置超时时间
54 struct timespec timeout;
55 clock_gettime(CLOCK_REALTIME, &timeout);
56 timeout.tv_sec += 1; // 1秒超时
57
58 ssize_t bytes = mq_timedreceive(mq, buffer, attr.mq_msgsize, &priority, &timeout);
59
60 if (bytes > 0) {
61 buffer[bytes] = '\0';
62 std::cout << "接收到消息 [优先级 " << priority << "]: "
63 << buffer << std::endl;
64 } else if (bytes == -1 && errno != ETIMEDOUT) {
65 perror("mq_timedreceive failed");
66 break;
67 }
68 }
69
70 return 0;
71}
共享内存是最快的进程间通信方式,允许多个进程直接访问同一块物理内存区域。
Note
共享内存本身不提供同步机制,需要配合其他IPC方式:
信号量: 控制对共享内存的访问
互斥锁: 保护临界区
条件变量: 等待特定条件
文件锁: 基于文件的锁机制
shmget()
- 创建或获取共享内存段
xxxxxxxxxx
11int shmget(key_t key, size_t size, int shmflg);
key
: 共享内存的键值(通常使用ftok()
生成)
size
: 共享内存段的大小(字节)
shmflg
: 权限标志和创建标志
返回值: 共享内存标识符
shmat()
- 连接共享内存段到进程地址空间
xxxxxxxxxx
11void *shmat(int shmid, const void *shmaddr, int shmflg);
shmid
: 共享内存标识符
shmaddr
: 指定连接地址(通常为NULL
,让系统选择)
shmflg
: 连接标志(如SHM_RDONLY
只读)
返回值: 共享内存段的起始地址
shmdt()
- 断开共享内存段连接
xxxxxxxxxx
11int shmdt(const void *shmaddr);
shmaddr
: 要断开的共享内存地址
shmctl()
- 控制共享内存段
xxxxxxxxxx
11int shmctl(int shmid, int cmd, struct shmid_ds *buf);
cmd
: 控制命令(如IPC_RMID删除共享内存)
写进程:
xxxxxxxxxx
1041
2
3
4
5
6
7
8// 共享数据结构
9struct SharedData {
10 int counter;
11 char message[256];
12 bool ready;
13 bool finished;
14};
15
16int shmid;
17SharedData* shared_data;
18
19void cleanup(int sig) {
20 std::cout << "\n正在清理资源..." << std::endl;
21
22 if (shared_data) {
23 shared_data->finished = true;
24 shmdt(shared_data);
25 }
26
27 // 删除共享内存
28 if (shmctl(shmid, IPC_RMID, nullptr) == -1) {
29 perror("shmctl IPC_RMID failed");
30 } else {
31 std::cout << "共享内存已删除" << std::endl;
32 }
33
34 exit(0);
35}
36
37int main() {
38 // 设置信号处理
39 signal(SIGINT, cleanup);
40 signal(SIGTERM, cleanup);
41
42 // 生成唯一键值
43 key_t key = ftok("/tmp", 'S');
44 if (key == -1) {
45 perror("ftok failed");
46 return 1;
47 }
48
49 // 创建共享内存段
50 shmid = shmget(key, sizeof(SharedData), IPC_CREAT | 0666);
51 if (shmid == -1) {
52 perror("shmget failed");
53 return 1;
54 }
55
56 // 连接共享内存
57 shared_data = (SharedData*)shmat(shmid, nullptr, 0);
58 if (shared_data == (SharedData*)-1) {
59 perror("shmat failed");
60 return 1;
61 }
62
63 std::cout << "System V共享内存创建成功,ID: " << shmid << std::endl;
64 std::cout << "共享内存地址: " << shared_data << std::endl;
65
66 // 初始化共享数据
67 shared_data->counter = 0;
68 shared_data->ready = false;
69 shared_data->finished = false;
70 strcpy(shared_data->message, "初始化消息");
71
72 // 写入数据
73 for (int i = 1; i <= 10; ++i) {
74 shared_data->counter = i;
75 snprintf(shared_data->message, sizeof(shared_data->message),
76 "写进程消息 %d,PID: %d", i, getpid());
77 shared_data->ready = true;
78
79 std::cout << "写入: counter=" << shared_data->counter
80 << ", message=" << shared_data->message << std::endl;
81
82 sleep(2);
83
84 // 等待读进程处理
85 while (shared_data->ready && !shared_data->finished) {
86 usleep(100000); // 100ms
87 }
88
89 if (shared_data->finished) {
90 break;
91 }
92 }
93
94 shared_data->finished = true;
95 std::cout << "写进程完成" << std::endl;
96
97 // 保持程序运行,等待手动清理
98 std::cout << "按Ctrl+C退出并清理资源" << std::endl;
99 while (true) {
100 sleep(1);
101 }
102
103 return 0;
104}
读进程:
xxxxxxxxxx
701
2
3
4
5
6
7struct SharedData {
8 int counter;
9 char message[256];
10 bool ready;
11 bool finished;
12};
13
14SharedData* shared_data;
15bool running = true;
16
17void cleanup(int sig) {
18 std::cout << "\n读进程退出" << std::endl;
19 if (shared_data) {
20 shmdt(shared_data);
21 }
22 running = false;
23}
24
25int main() {
26 signal(SIGINT, cleanup);
27 signal(SIGTERM, cleanup);
28
29 // 获取共享内存
30 key_t key = ftok("/tmp", 'S');
31 if (key == -1) {
32 perror("ftok failed");
33 return 1;
34 }
35
36 int shmid = shmget(key, sizeof(SharedData), 0666);
37 if (shmid == -1) {
38 perror("shmget failed - 请先运行写进程");
39 return 1;
40 }
41
42 // 连接共享内存
43 shared_data = (SharedData*)shmat(shmid, nullptr, 0);
44 if (shared_data == (SharedData*)-1) {
45 perror("shmat failed");
46 return 1;
47 }
48
49 std::cout << "连接到共享内存,ID: " << shmid << std::endl;
50 std::cout << "共享内存地址: " << shared_data << std::endl;
51 std::cout << "等待数据... (Ctrl+C退出)" << std::endl;
52
53 int last_counter = -1;
54
55 while (running && !shared_data->finished) {
56 if (shared_data->ready && shared_data->counter != last_counter) {
57 std::cout << "读取: counter=" << shared_data->counter
58 << ", message=" << shared_data->message << std::endl;
59
60 last_counter = shared_data->counter;
61 shared_data->ready = false; // 标记已处理
62 }
63
64 usleep(100000); // 100ms
65 }
66
67 std::cout << "读进程完成" << std::endl;
68 shmdt(shared_data);
69 return 0;
70}
特点: 基于文件系统,更易使用
shm_open()
- 创建或打开共享内存对象
xxxxxxxxxx
11int shm_open(const char *name, int oflag, mode_t mode);
name
: 共享内存对象名称(以"/"开头)
oflag
: 打开标志(O_CREAT
, O_RDWR
等)
mode
: 权限模式
返回值: 文件描述符
ftruncate()
- 设置共享内存对象大小
xxxxxxxxxx
11int ftruncate(int fd, off_t length);
mmap()
- 将共享内存映射到进程地址空间
xxxxxxxxxx
11void *mmap(void *addr, size_t length, int prot, int flags, int fd, off_t offset);
prot
: 内存保护标志(PROT_READ
, PROT_WRITE
等)
flags
: 映射标志(MAP_SHARED
等)
munmap()
- 取消内存映射
xxxxxxxxxx
11int munmap(void *addr, size_t length);
shm_unlink()
- 删除共享内存对象
xxxxxxxxxx
11int shm_unlink(const char *name);
写进程:
xxxxxxxxxx
1141
2
3
4
5
6
7
8
9
10struct SharedData {
11 int counter;
12 char message[256];
13 bool finished;
14};
15
16const char* shm_name = "/posix_shared_memory";
17const char* sem_name = "/posix_semaphore";
18int shm_fd;
19SharedData* shared_data;
20sem_t* semaphore;
21
22void cleanup(int sig) {
23 std::cout << "\n正在清理资源..." << std::endl;
24
25 if (shared_data) {
26 shared_data->finished = true;
27 munmap(shared_data, sizeof(SharedData));
28 }
29
30 if (shm_fd != -1) {
31 close(shm_fd);
32 }
33
34 // 删除共享内存和信号量
35 shm_unlink(shm_name);
36 sem_close(semaphore);
37 sem_unlink(sem_name);
38
39 std::cout << "POSIX共享内存和信号量已删除" << std::endl;
40 exit(0);
41}
42
43int main() {
44 signal(SIGINT, cleanup);
45 signal(SIGTERM, cleanup);
46
47 // 创建信号量用于同步
48 semaphore = sem_open(sem_name, O_CREAT, 0644, 1);
49 if (semaphore == SEM_FAILED) {
50 perror("sem_open failed");
51 return 1;
52 }
53
54 // 创建共享内存对象
55 shm_fd = shm_open(shm_name, O_CREAT | O_RDWR, 0644);
56 if (shm_fd == -1) {
57 perror("shm_open failed");
58 return 1;
59 }
60
61 // 设置共享内存大小
62 if (ftruncate(shm_fd, sizeof(SharedData)) == -1) {
63 perror("ftruncate failed");
64 return 1;
65 }
66
67 // 映射共享内存
68 shared_data = (SharedData*)mmap(nullptr, sizeof(SharedData),
69 PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
70 if (shared_data == MAP_FAILED) {
71 perror("mmap failed");
72 return 1;
73 }
74
75 std::cout << "POSIX共享内存创建成功" << std::endl;
76 std::cout << "共享内存地址: " << shared_data << std::endl;
77
78 // 初始化共享数据
79 shared_data->counter = 0;
80 shared_data->finished = false;
81 strcpy(shared_data->message, "POSIX初始化消息");
82
83 // 写入数据
84 for (int i = 1; i <= 10; ++i) {
85 // 获取信号量
86 sem_wait(semaphore);
87
88 shared_data->counter = i;
89 snprintf(shared_data->message, sizeof(shared_data->message),
90 "POSIX写进程消息 %d,PID: %d", i, getpid());
91
92 std::cout << "写入: counter=" << shared_data->counter
93 << ", message=" << shared_data->message << std::endl;
94
95 // 释放信号量
96 sem_post(semaphore);
97
98 sleep(2);
99
100 if (shared_data->finished) {
101 break;
102 }
103 }
104
105 shared_data->finished = true;
106 std::cout << "写进程完成,按Ctrl+C退出并清理资源" << std::endl;
107
108 // 保持程序运行
109 while (true) {
110 sleep(1);
111 }
112
113 return 0;
114}
读进程:
xxxxxxxxxx
851
2
3
4
5
6
7
8
9struct SharedData {
10 int counter;
11 char message[256];
12 bool finished;
13};
14
15const char* shm_name = "/posix_shared_memory";
16const char* sem_name = "/posix_semaphore";
17SharedData* shared_data;
18sem_t* semaphore;
19bool running = true;
20
21void cleanup(int sig) {
22 std::cout << "\n读进程退出" << std::endl;
23 if (shared_data) {
24 munmap(shared_data, sizeof(SharedData));
25 }
26 if (semaphore) {
27 sem_close(semaphore);
28 }
29 running = false;
30}
31
32int main() {
33 signal(SIGINT, cleanup);
34 signal(SIGTERM, cleanup);
35
36 // 打开信号量
37 semaphore = sem_open(sem_name, 0);
38 if (semaphore == SEM_FAILED) {
39 perror("sem_open failed - 请先运行写进程");
40 return 1;
41 }
42
43 // 打开共享内存对象
44 int shm_fd = shm_open(shm_name, O_RDONLY, 0644);
45 if (shm_fd == -1) {
46 perror("shm_open failed - 请先运行写进程");
47 return 1;
48 }
49
50 // 映射共享内存
51 shared_data = (SharedData*)mmap(nullptr, sizeof(SharedData),
52 PROT_READ, MAP_SHARED, shm_fd, 0);
53 if (shared_data == MAP_FAILED) {
54 perror("mmap failed");
55 return 1;
56 }
57
58 close(shm_fd); // 映射后可以关闭文件描述符
59
60 std::cout << "连接到POSIX共享内存" << std::endl;
61 std::cout << "共享内存地址: " << shared_data << std::endl;
62 std::cout << "等待数据... (Ctrl+C退出)" << std::endl;
63
64 int last_counter = -1;
65
66 while (running && !shared_data->finished) {
67 // 获取信号量
68 if (sem_trywait(semaphore) == 0) {
69 if (shared_data->counter != last_counter) {
70 std::cout << "读取: counter=" << shared_data->counter
71 << ", message=" << shared_data->message << std::endl;
72 last_counter = shared_data->counter;
73 }
74
75 // 释放信号量
76 sem_post(semaphore);
77 }
78
79 usleep(500000); // 500ms
80 }
81
82 std::cout << "读进程完成" << std::endl;
83 cleanup(0);
84 return 0;
85}
信号量是一种用于进程同步的重要机制,主要用于控制对共享资源的访问。
特点: 支持信号量集合
semget()
- 创建或获取信号量集
xxxxxxxxxx
11int semget(key_t key, int nsems, int semflg);
key
: 信号量集的键值(通常使用ftok()
生成)
nsems
: 信号量集中信号量的数量
semflg
: 权限标志和创建标志
返回值: 信号量集标识符
semop()
- 对信号量进行操作
xxxxxxxxxx
11int semop(int semid, struct sembuf *sops, size_t nsops);
semid
: 信号量集标识符
sops
: 指向操作数组的指针
nsops
: 操作数量
semctl()
- 控制信号量集
xxxxxxxxxx
11int semctl(int semid, int semnum, int cmd, ...);
semnum
: 信号量编号
cmd
: 控制命令(如SETVAL设置值、IPC_RMID删除)
sembuf结构体
xxxxxxxxxx
51struct sembuf {
2 unsigned short sem_num; // 信号量编号
3 short sem_op; // 操作值
4 short sem_flg; // 操作标志
5};
生产者-消费者模型
xxxxxxxxxx
1341
2
3
4
5
6
7
8
9// 信号量索引
10// 互斥信号量
11// 空缓冲区数量
12// 满缓冲区数量
13
14// 共享缓冲区
15struct SharedBuffer {
16 int buffer[10];
17 int in; // 生产者索引
18 int out; // 消费者索引
19 int count; // 当前元素数量
20};
21
22int semid, shmid;
23SharedBuffer* shared_buf;
24
25// 信号量操作辅助函数
26void sem_wait(int semid, int sem_num) {
27 struct sembuf sb;
28 sb.sem_num = sem_num;
29 sb.sem_op = -1; // P操作(等待)
30 sb.sem_flg = 0;
31 semop(semid, &sb, 1);
32}
33
34void sem_signal(int semid, int sem_num) {
35 struct sembuf sb;
36 sb.sem_num = sem_num;
37 sb.sem_op = 1; // V操作(信号)
38 sb.sem_flg = 0;
39 semop(semid, &sb, 1);
40}
41
42void cleanup(int sig) {
43 std::cout << "\n正在清理资源..." << std::endl;
44
45 if (shared_buf) {
46 shmdt(shared_buf);
47 }
48
49 // 删除信号量集和共享内存
50 semctl(semid, 0, IPC_RMID);
51 shmctl(shmid, IPC_RMID, nullptr);
52
53 std::cout << "资源清理完成" << std::endl;
54 exit(0);
55}
56
57int main() {
58 signal(SIGINT, cleanup);
59 signal(SIGTERM, cleanup);
60
61 // 生成键值
62 key_t key = ftok("/tmp", 'P');
63 if (key == -1) {
64 perror("ftok failed");
65 return 1;
66 }
67
68 // 创建信号量集(3个信号量)
69 semid = semget(key, 3, IPC_CREAT | 0666);
70 if (semid == -1) {
71 perror("semget failed");
72 return 1;
73 }
74
75 // 初始化信号量
76 semctl(semid, SEM_MUTEX, SETVAL, 1); // 互斥信号量初始值为1
77 semctl(semid, SEM_EMPTY, SETVAL, 10); // 空缓冲区初始值为10
78 semctl(semid, SEM_FULL, SETVAL, 0); // 满缓冲区初始值为0
79
80 // 创建共享内存
81 shmid = shmget(key + 1, sizeof(SharedBuffer), IPC_CREAT | 0666);
82 if (shmid == -1) {
83 perror("shmget failed");
84 return 1;
85 }
86
87 shared_buf = (SharedBuffer*)shmat(shmid, nullptr, 0);
88 if (shared_buf == (SharedBuffer*)-1) {
89 perror("shmat failed");
90 return 1;
91 }
92
93 // 初始化共享缓冲区
94 shared_buf->in = 0;
95 shared_buf->out = 0;
96 shared_buf->count = 0;
97 memset(shared_buf->buffer, 0, sizeof(shared_buf->buffer));
98
99 std::cout << "System V信号量生产者启动" << std::endl;
100 std::cout << "信号量集ID: " << semid << ", 共享内存ID: " << shmid << std::endl;
101
102 // 生产数据
103 for (int i = 1; i <= 20; ++i) {
104 // 等待空缓冲区
105 sem_wait(semid, SEM_EMPTY);
106
107 // 获取互斥锁
108 sem_wait(semid, SEM_MUTEX);
109
110 // 临界区:生产数据
111 shared_buf->buffer[shared_buf->in] = i;
112 std::cout << "生产: " << i << " -> buffer[" << shared_buf->in << "]"
113 << ", 当前数量: " << (shared_buf->count + 1) << std::endl;
114 shared_buf->in = (shared_buf->in + 1) % 10;
115 shared_buf->count++;
116
117 // 释放互斥锁
118 sem_signal(semid, SEM_MUTEX);
119
120 // 信号满缓冲区
121 sem_signal(semid, SEM_FULL);
122
123 sleep(1);
124 }
125
126 std::cout << "生产完成,按Ctrl+C退出" << std::endl;
127
128 // 保持程序运行
129 while (true) {
130 sleep(1);
131 }
132
133 return 0;
134}
消费者:
xxxxxxxxxx
1191
2
3
4
5
6
7
8
9
10
11
12struct SharedBuffer {
13 int buffer[10];
14 int in;
15 int out;
16 int count;
17};
18
19SharedBuffer* shared_buf;
20bool running = true;
21
22void sem_wait(int semid, int sem_num) {
23 struct sembuf sb;
24 sb.sem_num = sem_num;
25 sb.sem_op = -1;
26 sb.sem_flg = 0;
27 semop(semid, &sb, 1);
28}
29
30void sem_signal(int semid, int sem_num) {
31 struct sembuf sb;
32 sb.sem_num = sem_num;
33 sb.sem_op = 1;
34 sb.sem_flg = 0;
35 semop(semid, &sb, 1);
36}
37
38void cleanup(int sig) {
39 std::cout << "\n消费者退出" << std::endl;
40 if (shared_buf) {
41 shmdt(shared_buf);
42 }
43 running = false;
44}
45
46int main() {
47 signal(SIGINT, cleanup);
48 signal(SIGTERM, cleanup);
49
50 // 获取信号量集和共享内存
51 key_t key = ftok("/tmp", 'P');
52 if (key == -1) {
53 perror("ftok failed");
54 return 1;
55 }
56
57 int semid = semget(key, 3, 0666);
58 if (semid == -1) {
59 perror("semget failed - 请先运行生产者");
60 return 1;
61 }
62
63 int shmid = shmget(key + 1, sizeof(SharedBuffer), 0666);
64 if (shmid == -1) {
65 perror("shmget failed");
66 return 1;
67 }
68
69 shared_buf = (SharedBuffer*)shmat(shmid, nullptr, 0);
70 if (shared_buf == (SharedBuffer*)-1) {
71 perror("shmat failed");
72 return 1;
73 }
74
75 std::cout << "System V信号量消费者启动" << std::endl;
76 std::cout << "等待数据... (Ctrl+C退出)" << std::endl;
77
78 int consumed_count = 0;
79
80 while (running) {
81 // 等待满缓冲区
82 struct sembuf sb;
83 sb.sem_num = SEM_FULL;
84 sb.sem_op = -1;
85 sb.sem_flg = IPC_NOWAIT; // 非阻塞
86
87 if (semop(semid, &sb, 1) == -1) {
88 if (errno == EAGAIN) {
89 usleep(100000); // 100ms
90 continue;
91 } else {
92 perror("semop failed");
93 break;
94 }
95 }
96
97 // 获取互斥锁
98 sem_wait(semid, SEM_MUTEX);
99
100 // 临界区:消费数据
101 int data = shared_buf->buffer[shared_buf->out];
102 std::cout << "消费: " << data << " <- buffer[" << shared_buf->out << "]"
103 << ", 剩余数量: " << (shared_buf->count - 1) << std::endl;
104 shared_buf->out = (shared_buf->out + 1) % 10;
105 shared_buf->count--;
106 consumed_count++;
107
108 // 释放互斥锁
109 sem_signal(semid, SEM_MUTEX);
110
111 // 信号空缓冲区
112 sem_signal(semid, SEM_EMPTY);
113
114 sleep(2); // 消费速度比生产慢
115 }
116
117 std::cout << "总共消费了 " << consumed_count << " 个数据" << std::endl;
118 return 0;
119}
特点: 更简单的接口
sem_open()
- 创建或打开命名信号量
xxxxxxxxxx
11sem_t *sem_open(const char *name, int oflag, mode_t mode, unsigned int value);
name
: 信号量名称(以"/"开头)
oflag
: 打开标志(O_CREAT等)
value
: 初始值
sem_wait()
- 等待信号量(P操作)
xxxxxxxxxx
11int sem_wait(sem_t *sem);
sem_trywait()
- 非阻塞等待
xxxxxxxxxx
11int sem_trywait(sem_t *sem);
sem_post()
- 释放信号量(V操作)
xxxxxxxxxx
11int sem_post(sem_t *sem);
sem_close()
- 关闭信号量
xxxxxxxxxx
11int sem_close(sem_t *sem);
sem_unlink()
- 删除命名信号量
xxxxxxxxxx
11int sem_unlink(const char *name);
生产者:
xxxxxxxxxx
1241
2
3
4
5
6
7
8
9
10struct SharedBuffer {
11 int buffer[10];
12 int in;
13 int out;
14 int count;
15};
16
17const char* shm_name = "/posix_buffer";
18const char* sem_mutex_name = "/posix_mutex";
19const char* sem_empty_name = "/posix_empty";
20const char* sem_full_name = "/posix_full";
21
22int shm_fd;
23SharedBuffer* shared_buf;
24sem_t *sem_mutex, *sem_empty, *sem_full;
25
26void cleanup(int sig) {
27 std::cout << "\n正在清理资源..." << std::endl;
28
29 if (shared_buf) {
30 munmap(shared_buf, sizeof(SharedBuffer));
31 }
32
33 if (shm_fd != -1) {
34 close(shm_fd);
35 }
36
37 // 删除信号量和共享内存
38 sem_close(sem_mutex);
39 sem_close(sem_empty);
40 sem_close(sem_full);
41
42 sem_unlink(sem_mutex_name);
43 sem_unlink(sem_empty_name);
44 sem_unlink(sem_full_name);
45 shm_unlink(shm_name);
46
47 std::cout << "POSIX资源清理完成" << std::endl;
48 exit(0);
49}
50
51int main() {
52 signal(SIGINT, cleanup);
53 signal(SIGTERM, cleanup);
54
55 // 创建信号量
56 sem_mutex = sem_open(sem_mutex_name, O_CREAT, 0644, 1); // 互斥
57 sem_empty = sem_open(sem_empty_name, O_CREAT, 0644, 10); // 空缓冲区
58 sem_full = sem_open(sem_full_name, O_CREAT, 0644, 0); // 满缓冲区
59
60 if (sem_mutex == SEM_FAILED || sem_empty == SEM_FAILED || sem_full == SEM_FAILED) {
61 perror("sem_open failed");
62 return 1;
63 }
64
65 // 创建共享内存
66 shm_fd = shm_open(shm_name, O_CREAT | O_RDWR, 0644);
67 if (shm_fd == -1) {
68 perror("shm_open failed");
69 return 1;
70 }
71
72 if (ftruncate(shm_fd, sizeof(SharedBuffer)) == -1) {
73 perror("ftruncate failed");
74 return 1;
75 }
76
77 shared_buf = (SharedBuffer*)mmap(nullptr, sizeof(SharedBuffer),
78 PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
79 if (shared_buf == MAP_FAILED) {
80 perror("mmap failed");
81 return 1;
82 }
83
84 // 初始化共享缓冲区
85 shared_buf->in = 0;
86 shared_buf->out = 0;
87 shared_buf->count = 0;
88 memset(shared_buf->buffer, 0, sizeof(shared_buf->buffer));
89
90 std::cout << "POSIX信号量生产者启动" << std::endl;
91
92 // 生产数据
93 for (int i = 1; i <= 20; ++i) {
94 // 等待空缓冲区
95 sem_wait(sem_empty);
96
97 // 获取互斥锁
98 sem_wait(sem_mutex);
99
100 // 临界区:生产数据
101 shared_buf->buffer[shared_buf->in] = i;
102 std::cout << "生产: " << i << " -> buffer[" << shared_buf->in << "]"
103 << ", 当前数量: " << (shared_buf->count + 1) << std::endl;
104 shared_buf->in = (shared_buf->in + 1) % 10;
105 shared_buf->count++;
106
107 // 释放互斥锁
108 sem_post(sem_mutex);
109
110 // 信号满缓冲区
111 sem_post(sem_full);
112
113 sleep(1);
114 }
115
116 std::cout << "生产完成,按Ctrl+C退出" << std::endl;
117
118 // 保持程序运行
119 while (true) {
120 sleep(1);
121 }
122
123 return 0;
124}
消费者:
xxxxxxxxxx
1091
2
3
4
5
6
7
8
9struct SharedBuffer {
10 int buffer[10];
11 int in;
12 int out;
13 int count;
14};
15
16const char* shm_name = "/posix_buffer";
17const char* sem_mutex_name = "/posix_mutex";
18const char* sem_empty_name = "/posix_empty";
19const char* sem_full_name = "/posix_full";
20
21SharedBuffer* shared_buf;
22sem_t *sem_mutex, *sem_empty, *sem_full;
23bool running = true;
24
25void cleanup(int sig) {
26 std::cout << "\n消费者退出" << std::endl;
27
28 if (shared_buf) {
29 munmap(shared_buf, sizeof(SharedBuffer));
30 }
31
32 if (sem_mutex) sem_close(sem_mutex);
33 if (sem_empty) sem_close(sem_empty);
34 if (sem_full) sem_close(sem_full);
35
36 running = false;
37}
38
39int main() {
40 signal(SIGINT, cleanup);
41 signal(SIGTERM, cleanup);
42
43 // 打开信号量
44 sem_mutex = sem_open(sem_mutex_name, 0);
45 sem_empty = sem_open(sem_empty_name, 0);
46 sem_full = sem_open(sem_full_name, 0);
47
48 if (sem_mutex == SEM_FAILED || sem_empty == SEM_FAILED || sem_full == SEM_FAILED) {
49 perror("sem_open failed - 请先运行生产者");
50 return 1;
51 }
52
53 // 打开共享内存
54 int shm_fd = shm_open(shm_name, O_RDWR, 0644);
55 if (shm_fd == -1) {
56 perror("shm_open failed");
57 return 1;
58 }
59
60 shared_buf = (SharedBuffer*)mmap(nullptr, sizeof(SharedBuffer),
61 PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
62 if (shared_buf == MAP_FAILED) {
63 perror("mmap failed");
64 return 1;
65 }
66
67 close(shm_fd);
68
69 std::cout << "POSIX信号量消费者启动" << std::endl;
70 std::cout << "等待数据... (Ctrl+C退出)" << std::endl;
71
72 int consumed_count = 0;
73
74 while (running) {
75 // 非阻塞等待满缓冲区
76 if (sem_trywait(sem_full) == -1) {
77 if (errno == EAGAIN) {
78 usleep(100000); // 100ms
79 continue;
80 } else {
81 perror("sem_trywait failed");
82 break;
83 }
84 }
85
86 // 获取互斥锁
87 sem_wait(sem_mutex);
88
89 // 临界区:消费数据
90 int data = shared_buf->buffer[shared_buf->out];
91 std::cout << "消费: " << data << " <- buffer[" << shared_buf->out << "]"
92 << ", 剩余数量: " << (shared_buf->count - 1) << std::endl;
93 shared_buf->out = (shared_buf->out + 1) % 10;
94 shared_buf->count--;
95 consumed_count++;
96
97 // 释放互斥锁
98 sem_post(sem_mutex);
99
100 // 信号空缓冲区
101 sem_post(sem_empty);
102
103 sleep(2); // 消费速度比生产慢
104 }
105
106 std::cout << "总共消费了 " << consumed_count << " 个数据" << std::endl;
107 cleanup(0);
108 return 0;
109}
P操作(等待/减少)
System V: sem_op = -1
POSIX: sem_wait()
如果信号量值 > 0,则减1并继续
如果信号量值 = 0,则阻塞等待
V操作(信号/增加)
System V: sem_op = 1
POSIX: sem_post()
信号量值加1
如果有等待的进程,唤醒一个
Unix域套接字(Unix Domain Socket)是一种高效的本地进程间通信机制,它使用文件系统路径作为地址,仅在本地机器上工作,性能优于网络套接字。
特点: 本地进程间通信,性能优于网络套接字
套接字类型
SOCK_STREAM
- 流式套接字
提供可靠的、有序的、双向的字节流
类似于TCP,但用于本地通信
面向连接
SOCK_DGRAM
- 数据报套接字
提供无连接的数据报服务
类似于UDP,但用于本地通信
保证数据完整性,但不保证顺序
地址结构
xxxxxxxxxx
41struct sockaddr_un {
2 sa_family_t sun_family; // AF_UNIX
3 char sun_path[108]; // 套接字文件路径
4};
常用函数
socket()
- 创建套接字
xxxxxxxxxx
11int socket(int domain, int type, int protocol);
domain
: AF_UNIX(Unix域)
type
: SOCK_STREAM
或 SOCK_DGRAM
protocol
: 通常为0
bind()
- 绑定地址
xxxxxxxxxx
11int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
listen()
- 监听连接(仅SOCK_STREAM
)
xxxxxxxxxx
11int listen(int sockfd, int backlog);
accept()
- 接受连接(仅SOCK_STREAM
)
xxxxxxxxxx
11int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
connect()
- 连接到服务器
xxxxxxxxxx
11int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
send()
/recv()
- 发送/接收数据
xxxxxxxxxx
21ssize_t send(int sockfd, const void *buf, size_t len, int flags);
2ssize_t recv(int sockfd, void *buf, size_t len, int flags);
sendto()
/recvfrom()
- 数据报发送/接收
xxxxxxxxxx
41ssize_t sendto(int sockfd, const void *buf, size_t len, int flags,
2 const struct sockaddr *dest_addr, socklen_t addrlen);
3ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags,
4 struct sockaddr *src_addr, socklen_t *addrlen);
服务器端
xxxxxxxxxx
1121
2
3
4
5
6
7
8
9
10const char* SOCKET_PATH = "/tmp/unix_stream_socket";
11int server_fd;
12std::vector<std::thread> client_threads;
13
14void cleanup(int sig) {
15 std::cout << "\n正在清理资源..." << std::endl;
16
17 // 关闭服务器套接字
18 if (server_fd != -1) {
19 close(server_fd);
20 }
21
22 // 删除套接字文件
23 unlink(SOCKET_PATH);
24
25 std::cout << "服务器已关闭" << std::endl;
26 exit(0);
27}
28
29void handle_client(int client_fd, int client_id) {
30 char buffer[1024];
31
32 std::cout << "客户端 " << client_id << " 已连接" << std::endl;
33
34 while (true) {
35 ssize_t bytes = recv(client_fd, buffer, sizeof(buffer) - 1, 0);
36
37 if (bytes <= 0) {
38 if (bytes == 0) {
39 std::cout << "客户端 " << client_id << " 断开连接" << std::endl;
40 } else {
41 perror("recv failed");
42 }
43 break;
44 }
45
46 buffer[bytes] = '\0';
47 std::cout << "客户端 " << client_id << ": " << buffer << std::endl;
48
49 // 回显消息
50 std::string response = "服务器回复[客户端" + std::to_string(client_id) + "]: " + std::string(buffer);
51 send(client_fd, response.c_str(), response.length(), 0);
52 }
53
54 close(client_fd);
55}
56
57int main() {
58 signal(SIGINT, cleanup);
59 signal(SIGTERM, cleanup);
60
61 // 删除可能存在的旧套接字文件
62 unlink(SOCKET_PATH);
63
64 // 创建套接字
65 server_fd = socket(AF_UNIX, SOCK_STREAM, 0);
66 if (server_fd == -1) {
67 perror("socket failed");
68 return 1;
69 }
70
71 // 设置地址
72 struct sockaddr_un addr;
73 memset(&addr, 0, sizeof(addr));
74 addr.sun_family = AF_UNIX;
75 strncpy(addr.sun_path, SOCKET_PATH, sizeof(addr.sun_path) - 1);
76
77 // 绑定地址
78 if (bind(server_fd, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
79 perror("bind failed");
80 close(server_fd);
81 return 1;
82 }
83
84 // 监听连接
85 if (listen(server_fd, 5) == -1) {
86 perror("listen failed");
87 close(server_fd);
88 unlink(SOCKET_PATH);
89 return 1;
90 }
91
92 std::cout << "Unix域流式套接字服务器启动" << std::endl;
93 std::cout << "套接字路径: " << SOCKET_PATH << std::endl;
94 std::cout << "等待客户端连接... (Ctrl+C退出)" << std::endl;
95
96 int client_id = 1;
97
98 while (true) {
99 // 接受客户端连接
100 int client_fd = accept(server_fd, nullptr, nullptr);
101 if (client_fd == -1) {
102 perror("accept failed");
103 continue;
104 }
105
106 // 为每个客户端创建线程
107 client_threads.emplace_back(handle_client, client_fd, client_id++);
108 client_threads.back().detach();
109 }
110
111 return 0;
112}
客户端
xxxxxxxxxx
761
2
3
4
5
6
7
8const char* SOCKET_PATH = "/tmp/unix_stream_socket";
9
10void receive_messages(int sockfd) {
11 char buffer[1024];
12
13 while (true) {
14 ssize_t bytes = recv(sockfd, buffer, sizeof(buffer) - 1, 0);
15
16 if (bytes <= 0) {
17 if (bytes == 0) {
18 std::cout << "服务器关闭连接" << std::endl;
19 } else {
20 perror("recv failed");
21 }
22 break;
23 }
24
25 buffer[bytes] = '\0';
26 std::cout << buffer << std::endl;
27 }
28}
29
30int main() {
31 // 创建套接字
32 int sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
33 if (sockfd == -1) {
34 perror("socket failed");
35 return 1;
36 }
37
38 // 设置服务器地址
39 struct sockaddr_un addr;
40 memset(&addr, 0, sizeof(addr));
41 addr.sun_family = AF_UNIX;
42 strncpy(addr.sun_path, SOCKET_PATH, sizeof(addr.sun_path) - 1);
43
44 // 连接到服务器
45 if (connect(sockfd, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
46 perror("connect failed - 请先启动服务器");
47 close(sockfd);
48 return 1;
49 }
50
51 std::cout << "已连接到Unix域流式套接字服务器" << std::endl;
52 std::cout << "输入消息 (输入'quit'退出): " << std::endl;
53
54 // 启动接收线程
55 std::thread recv_thread(receive_messages, sockfd);
56 recv_thread.detach();
57
58 // 发送消息
59 std::string message;
60 while (std::getline(std::cin, message)) {
61 if (message == "quit") {
62 break;
63 }
64
65 if (!message.empty()) {
66 if (send(sockfd, message.c_str(), message.length(), 0) == -1) {
67 perror("send failed");
68 break;
69 }
70 }
71 }
72
73 close(sockfd);
74 std::cout << "客户端已断开连接" << std::endl;
75 return 0;
76}
特点: 可以跨网络通信,也可用于本地通信
协议: TCP、UDP
内存映射(Memory Mapping)是一种高效的进程间通信机制,它将文件或设备映射到进程的虚拟地址空间,使得多个进程可以通过共享内存的方式访问同一个文件。
特点: 将文件映射到内存,多个进程可以共享
适用场景: 大文件共享,数据库系统
mmap()
- 创建内存映射
xxxxxxxxxx
11void *mmap(void *addr, size_t length, int prot, int flags, int fd, off_t offset);
参数说明:
addr
: 指定映射的起始地址(通常为NULL,让系统选择)
length
: 映射的长度(字节数)
prot
: 内存保护标志
PROT_READ
: 可读
PROT_WRITE
: 可写
PROT_EXEC
: 可执行
PROT_NONE
: 不可访问
flags
: 映射标志
MAP_SHARED
: 共享映射(多进程可见)
MAP_PRIVATE
: 私有映射(写时复制)
MAP_ANONYMOUS
: 匿名映射(不基于文件)
MAP_FIXED
: 固定地址映射
fd
: 文件描述符
offset
: 文件偏移量
返回值:
成功:映射区域的起始地址
失败:MAP_FAILED
munmap()
- 取消内存映射
xxxxxxxxxx
11int munmap(void *addr, size_t length);
参数说明:
addr
: 映射区域的起始地址
length
: 映射的长度
返回值:
成功:0
失败:-1
msync()
- 同步映射区域到文件
xxxxxxxxxx
11int msync(void *addr, size_t length, int flags);
MS_ASYNC
: 异步同步
MS_SYNC
: 同步同步
MS_INVALIDATE
: 使其他映射无效
mprotect()
- 修改内存保护属性
xxxxxxxxxx
11int mprotect(void *addr, size_t len, int prot);
madvise()
- 提供内存使用建议
xxxxxxxxxx
11int madvise(void *addr, size_t length, int advice);
页表映射
mmap在进程的虚拟地址空间中分配一段连续的虚拟地址
建立虚拟地址到物理页面的映射关系
通过页表(Page Table)管理这种映射
延迟加载(Lazy Loading)
mmap调用时并不立即加载文件内容到内存
只有当进程访问某个页面时才触发页面错误(Page Fault)
内核此时才从文件中读取对应页面到物理内存
写时复制(Copy-on-Write)
对于MAP_PRIVATE
映射,多个进程可以共享同一物理页面
当某个进程尝试写入时,内核创建页面的私有副本
实现了内存的高效利用
VMA(Virtual Memory Area)
内核为每个映射创建VMA结构
记录映射的起始地址、长度、权限、文件信息等
链接到进程的内存描述符中
页面缓存(Page Cache)
文件页面在内存中的缓存
多个进程映射同一文件时共享相同的物理页面
提高了内存利用率和访问效率
脏页管理
跟踪被修改的页面(脏页)
定期或在特定条件下将脏页写回文件
确保数据一致性
内存映射(mmap)是一种通用的内存管理机制, 可以映射文件、设备或匿名内存, 是底层的系统调用。
POSIX共享内存是一种专门的IPC机制,是基于mmap实现的高层抽象,专门用于进程间通信。
普通文件映射存储在普通文件系统,共享内存存储在专用的共享内存文件系统。
可以这样理解:POSIX共享内存是基于mmap实现的专门IPC机制,而mmap是更通用的内存管理工具。就像汽车和交通工具的关系——汽车是基于交通工具概念实现的专门载具,但交通工具包含更广泛的概念。
特性 | 内存映射(mmap) | POSIX共享内存 |
---|---|---|
底层实现 | 直接系统调用 | 基于mmap + shm_open |
命名方式 | 文件路径 | POSIX名称(/name) |
存储位置 | 任意文件系统 | 专用共享内存文件系统 |
生命周期 | 跟随文件 | 独立于进程 |
权限管理 | 文件系统权限 | POSIX权限 |
可移植性 | 平台相关 | POSIX标准 |
用途 | 通用内存管理 | 专门IPC |
写进程
xxxxxxxxxx
1091
2
3
4
5
6
7
8
9struct SharedData {
10 int counter;
11 char message[256];
12 bool finished;
13 pid_t writer_pid;
14};
15
16const char* SHARED_FILE = "/tmp/mmap_shared_file";
17int fd;
18SharedData* shared_data;
19
20void cleanup(int sig) {
21 std::cout << "\n正在清理资源..." << std::endl;
22
23 if (shared_data) {
24 shared_data->finished = true;
25 // 同步数据到文件
26 msync(shared_data, sizeof(SharedData), MS_SYNC);
27 munmap(shared_data, sizeof(SharedData));
28 }
29
30 if (fd != -1) {
31 close(fd);
32 }
33
34 std::cout << "写进程已退出" << std::endl;
35 exit(0);
36}
37
38int main() {
39 signal(SIGINT, cleanup);
40 signal(SIGTERM, cleanup);
41
42 // 创建或打开共享文件
43 fd = open(SHARED_FILE, O_CREAT | O_RDWR, 0644);
44 if (fd == -1) {
45 perror("open failed");
46 return 1;
47 }
48
49 // 设置文件大小
50 if (ftruncate(fd, sizeof(SharedData)) == -1) {
51 perror("ftruncate failed");
52 close(fd);
53 return 1;
54 }
55
56 // 创建内存映射
57 shared_data = (SharedData*)mmap(nullptr, sizeof(SharedData),
58 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
59 if (shared_data == MAP_FAILED) {
60 perror("mmap failed");
61 close(fd);
62 return 1;
63 }
64
65 std::cout << "文件映射写进程启动" << std::endl;
66 std::cout << "共享文件: " << SHARED_FILE << std::endl;
67 std::cout << "映射地址: " << shared_data << std::endl;
68 std::cout << "映射大小: " << sizeof(SharedData) << " 字节" << std::endl;
69
70 // 初始化共享数据
71 shared_data->counter = 0;
72 shared_data->finished = false;
73 shared_data->writer_pid = getpid();
74 strcpy(shared_data->message, "初始化消息");
75
76 // 强制同步到文件
77 msync(shared_data, sizeof(SharedData), MS_SYNC);
78
79 // 写入数据
80 for (int i = 1; i <= 20; ++i) {
81 shared_data->counter = i;
82 snprintf(shared_data->message, sizeof(shared_data->message),
83 "mmap消息 %d,写进程PID: %d", i, getpid());
84
85 std::cout << "写入: counter=" << shared_data->counter
86 << ", message=" << shared_data->message << std::endl;
87
88 // 异步同步到文件
89 msync(shared_data, sizeof(SharedData), MS_ASYNC);
90
91 sleep(2);
92
93 if (shared_data->finished) {
94 break;
95 }
96 }
97
98 shared_data->finished = true;
99 msync(shared_data, sizeof(SharedData), MS_SYNC);
100
101 std::cout << "写入完成,按Ctrl+C退出" << std::endl;
102
103 // 保持程序运行
104 while (true) {
105 sleep(1);
106 }
107
108 return 0;
109}
读进程
xxxxxxxxxx
901
2
3
4
5
6
7
8struct SharedData {
9 int counter;
10 char message[256];
11 bool finished;
12 pid_t writer_pid;
13};
14
15const char* SHARED_FILE = "/tmp/mmap_shared_file";
16SharedData* shared_data;
17bool running = true;
18
19void cleanup(int sig) {
20 std::cout << "\n读进程退出" << std::endl;
21 if (shared_data) {
22 munmap(shared_data, sizeof(SharedData));
23 }
24 running = false;
25}
26
27int main() {
28 signal(SIGINT, cleanup);
29 signal(SIGTERM, cleanup);
30
31 // 等待共享文件创建
32 int fd;
33 while ((fd = open(SHARED_FILE, O_RDONLY)) == -1) {
34 std::cout << "等待共享文件创建..." << std::endl;
35 sleep(1);
36 }
37
38 // 获取文件大小
39 struct stat file_stat;
40 if (fstat(fd, &file_stat) == -1) {
41 perror("fstat failed");
42 close(fd);
43 return 1;
44 }
45
46 if (file_stat.st_size < sizeof(SharedData)) {
47 std::cout << "文件大小不足,等待写进程初始化..." << std::endl;
48 close(fd);
49 sleep(2);
50 return main(); // 重新尝试
51 }
52
53 // 创建只读内存映射
54 shared_data = (SharedData*)mmap(nullptr, sizeof(SharedData),
55 PROT_READ, MAP_SHARED, fd, 0);
56 if (shared_data == MAP_FAILED) {
57 perror("mmap failed");
58 close(fd);
59 return 1;
60 }
61
62 close(fd); // 映射后可以关闭文件描述符
63
64 std::cout << "文件映射读进程启动" << std::endl;
65 std::cout << "共享文件: " << SHARED_FILE << std::endl;
66 std::cout << "映射地址: " << shared_data << std::endl;
67 std::cout << "写进程PID: " << shared_data->writer_pid << std::endl;
68 std::cout << "等待数据... (Ctrl+C退出)" << std::endl;
69
70 int last_counter = -1;
71
72 while (running && !shared_data->finished) {
73 if (shared_data->counter != last_counter) {
74 std::cout << "读取: counter=" << shared_data->counter
75 << ", message=" << shared_data->message << std::endl;
76 last_counter = shared_data->counter;
77 }
78
79 usleep(500000); // 500ms
80 }
81
82 if (shared_data->finished) {
83 std::cout << "写进程已完成,最终数据:" << std::endl;
84 std::cout << " counter=" << shared_data->counter << std::endl;
85 std::cout << " message=" << shared_data->message << std::endl;
86 }
87
88 cleanup(0);
89 return 0;
90}
文件锁是一种通过对文件加锁来实现进程同步和互斥访问的IPC机制。它可以防止多个进程同时修改同一个文件,确保数据的一致性。
特点: 通过文件锁实现进程同步
锁的类型
建议锁(Advisory Lock):不强制执行,依赖程序自觉遵守
强制锁(Mandatory Lock):由内核强制执行(Linux默认不支持)
锁的模式
共享锁(读锁,F_RDLCK):多个进程可以同时持有
排他锁(写锁,F_WRLCK):只有一个进程可以持有
解锁(F_UNLCK):释放锁
1. fcntl() 函数
xxxxxxxxxx
21
2int fcntl(int fd, int cmd, struct flock *lock);
参数说明:
fd
:文件描述符
cmd
:操作命令
F_SETLK
:设置锁(非阻塞)
F_SETLKW
:设置锁(阻塞等待)
F_GETLK
:获取锁信息
lock
:flock结构体指针
flock结构体:
xxxxxxxxxx
71struct flock {
2 short l_type; // 锁类型:F_RDLCK, F_WRLCK, F_UNLCK
3 short l_whence; // 偏移量基准:SEEK_SET, SEEK_CUR, SEEK_END
4 off_t l_start; // 相对偏移量
5 off_t l_len; // 锁定长度(0表示到文件末尾)
6 pid_t l_pid; // 持有锁的进程ID(仅F_GETLK时有效)
7};
2. flock() 函数
xxxxxxxxxx
21
2int flock(int fd, int operation);
参数说明:
fd
:文件描述符
operation
:操作类型
LOCK_SH
:共享锁
LOCK_EX
:排他锁
LOCK_UN
:解锁
LOCK_NB
:非阻塞(与上述选项组合使用)
xxxxxxxxxx
1341
2
3
4
5
6
7class FileLock {
8private:
9 int fd;
10 struct flock lock_info;
11
12public:
13 FileLock(const char* filename) {
14 fd = open(filename, O_RDWR | O_CREAT, 0666);
15 if (fd == -1) {
16 perror("open");
17 exit(1);
18 }
19 }
20
21 ~FileLock() {
22 if (fd != -1) {
23 close(fd);
24 }
25 }
26
27 bool setLock(short type, bool blocking = true) {
28 lock_info.l_type = type;
29 lock_info.l_whence = SEEK_SET;
30 lock_info.l_start = 0;
31 lock_info.l_len = 0; // 锁定整个文件
32 lock_info.l_pid = getpid();
33
34 int cmd = blocking ? F_SETLKW : F_SETLK;
35
36 if (fcntl(fd, cmd, &lock_info) == -1) {
37 if (!blocking && (errno == EACCES || errno == EAGAIN)) {
38 std::cout << "文件已被锁定,无法获取锁" << std::endl;
39 return false;
40 }
41 perror("fcntl");
42 return false;
43 }
44 return true;
45 }
46
47 bool unlock() {
48 lock_info.l_type = F_UNLCK;
49 if (fcntl(fd, F_SETLK, &lock_info) == -1) {
50 perror("unlock");
51 return false;
52 }
53 return true;
54 }
55
56 void writeData(const std::string& data) {
57 lseek(fd, 0, SEEK_END);
58 write(fd, data.c_str(), data.length());
59 fsync(fd); // 强制写入磁盘
60 }
61
62 void checkLock() {
63 struct flock check_lock;
64 check_lock.l_type = F_WRLCK;
65 check_lock.l_whence = SEEK_SET;
66 check_lock.l_start = 0;
67 check_lock.l_len = 0;
68
69 if (fcntl(fd, F_GETLK, &check_lock) == -1) {
70 perror("F_GETLK");
71 return;
72 }
73
74 if (check_lock.l_type == F_UNLCK) {
75 std::cout << "文件未被锁定" << std::endl;
76 } else {
77 std::cout << "文件被进程 " << check_lock.l_pid
78 << " 锁定,锁类型: "
79 << (check_lock.l_type == F_RDLCK ? "读锁" : "写锁")
80 << std::endl;
81 }
82 }
83};
84
85int main() {
86 const char* filename = "shared_file.txt";
87
88 // 创建子进程演示锁竞争
89 pid_t pid = fork();
90
91 if (pid == 0) {
92 // 子进程
93 sleep(1); // 让父进程先获取锁
94
95 FileLock file_lock(filename);
96 std::cout << "[子进程] 尝试获取写锁..." << std::endl;
97
98 file_lock.checkLock();
99
100 if (file_lock.setLock(F_WRLCK, false)) { // 非阻塞
101 std::cout << "[子进程] 获取写锁成功" << std::endl;
102 file_lock.writeData("子进程写入的数据\n");
103 sleep(2);
104 file_lock.unlock();
105 std::cout << "[子进程] 释放锁" << std::endl;
106 } else {
107 std::cout << "[子进程] 无法获取锁,等待中..." << std::endl;
108 file_lock.setLock(F_WRLCK, true); // 阻塞等待
109 std::cout << "[子进程] 等待后获取写锁成功" << std::endl;
110 file_lock.writeData("子进程等待后写入的数据\n");
111 file_lock.unlock();
112 }
113 } else if (pid > 0) {
114 // 父进程
115 FileLock file_lock(filename);
116 std::cout << "[父进程] 获取写锁" << std::endl;
117
118 file_lock.setLock(F_WRLCK);
119 file_lock.writeData("父进程写入的数据\n");
120
121 std::cout << "[父进程] 持有锁3秒..." << std::endl;
122 sleep(3);
123
124 file_lock.unlock();
125 std::cout << "[父进程] 释放锁" << std::endl;
126
127 wait(nullptr); // 等待子进程结束
128 } else {
129 perror("fork");
130 return 1;
131 }
132
133 return 0;
134}
eventfd
是Linux内核提供的一种轻量级的事件通知机制,用于进程间或线程间的同步和通信。它创建一个特殊的文件描述符,可以用于事件计数和通知。
eventfd 是一个内核对象,通过文件描述符进行访问,其内部维护一个64位无符号整数计数器,支持读写操作来进行事件通知和同步。可以与epoll、select、poll等IO多路复用机制配合使用
eventfd系统调用返回的是文件描述符,该文件描述符可以读、写、监听。
使用eventfd时,内核中会维护一个计数器
对eventfd执行以下函数
read函数:如果计数器的值不为0时,读取成功,获得到该值;如果计数器的值为0,非阻塞模式时,会直接返回失败,并把error置为EINVAL;如果为阻塞模式,一直会阻塞到A为非0为止。 write函数:将缓冲区写入的8字节整型值加到内核计数器上(累加效果),即会增加8字节的整数在计数器上,如果其值达到0xfffffffffffffffe时,就会阻塞(在阻塞模式下),直到计数器的值被read。
通过对eventfd函数返回的文件描述符进行通信。一个进程或者线程A执行read操作,如果内核计数器的值为0,并且是阻塞模式,那么A就会阻塞;另外一个进程或者线程B执行write操作,就会向内核计数器写,那么阻塞的A发现内核计数器的值不为0,就会被触发,那么两个进程或者线程A与B就达到通信的目的了(通知)
1. eventfd() 函数
xxxxxxxxxx
21
2int eventfd(unsigned int initval, int flags);
参数说明:
initval
:计数器初始值
flags
:标志位
EFD_CLOEXEC
:执行exec时关闭文件描述符
EFD_NONBLOCK
:非阻塞模式
EFD_SEMAPHORE
:信号量模式
返回值:
成功:返回文件描述符
失败:返回-1
2. 读写操作
xxxxxxxxxx
31
2ssize_t read(int fd, void *buf, size_t count);
3ssize_t write(int fd, const void *buf, size_t count);
1. 计数器模式(默认)
写入:将值累加到计数器
读取:返回当前计数器值并重置为0
如果计数器为0,读操作会阻塞
2. 信号量模式(EFD_SEMAPHORE)
写入:将值累加到计数器
读取:计数器减1,返回值总是1
如果计数器为0,读操作会阻塞
xxxxxxxxxx
1491
2
3
4
5
6
7
8
9class EventLoop {
10private:
11 int epoll_fd;
12 bool running;
13
14public:
15 EventLoop() : running(false) {
16 epoll_fd = epoll_create1(EPOLL_CLOEXEC);
17 if (epoll_fd == -1) {
18 perror("epoll_create1");
19 exit(1);
20 }
21 }
22
23 ~EventLoop() {
24 if (epoll_fd != -1) {
25 close(epoll_fd);
26 }
27 }
28
29 bool addEventFd(int event_fd, void* data = nullptr) {
30 struct epoll_event ev;
31 ev.events = EPOLLIN | EPOLLET; // 边缘触发
32 ev.data.ptr = data;
33 ev.data.fd = event_fd;
34
35 if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, event_fd, &ev) == -1) {
36 perror("epoll_ctl");
37 return false;
38 }
39 return true;
40 }
41
42 void run() {
43 running = true;
44 const int MAX_EVENTS = 10;
45 struct epoll_event events[MAX_EVENTS];
46
47 std::cout << "[事件循环] 开始运行" << std::endl;
48
49 while (running) {
50 int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, 1000);
51
52 if (nfds == -1) {
53 if (errno == EINTR) continue;
54 perror("epoll_wait");
55 break;
56 }
57
58 for (int i = 0; i < nfds; ++i) {
59 int fd = events[i].data.fd;
60
61 if (events[i].events & EPOLLIN) {
62 uint64_t value;
63 ssize_t result = read(fd, &value, sizeof(value));
64 if (result == sizeof(value)) {
65 std::cout << "[事件循环] 接收到事件,fd=" << fd
66 << ", 值=" << value << std::endl;
67
68 // 特殊值用于停止事件循环
69 if (value == 999) {
70 std::cout << "[事件循环] 接收到停止信号" << std::endl;
71 running = false;
72 }
73 }
74 }
75 }
76 }
77
78 std::cout << "[事件循环] 结束运行" << std::endl;
79 }
80
81 void stop() {
82 running = false;
83 }
84};
85
86class EventProducer {
87private:
88 int event_fd;
89
90public:
91 EventProducer() {
92 event_fd = eventfd(0, EFD_NONBLOCK);
93 if (event_fd == -1) {
94 perror("eventfd");
95 exit(1);
96 }
97 }
98
99 ~EventProducer() {
100 if (event_fd != -1) {
101 close(event_fd);
102 }
103 }
104
105 void sendEvent(uint64_t value) {
106 ssize_t result = write(event_fd, &value, sizeof(value));
107 if (result != sizeof(value)) {
108 perror("write eventfd");
109 } else {
110 std::cout << "[生产者] 发送事件: " << value << std::endl;
111 }
112 }
113
114 int getFd() const { return event_fd; }
115};
116
117void producerThread(EventProducer& producer) {
118 // 发送一系列事件
119 for (int i = 1; i <= 5; ++i) {
120 std::this_thread::sleep_for(std::chrono::seconds(1));
121 producer.sendEvent(i * 10);
122 }
123
124 // 发送停止信号
125 std::this_thread::sleep_for(std::chrono::seconds(1));
126 producer.sendEvent(999);
127}
128
129int main() {
130 std::cout << "=== EventFD 与 Epoll 集成示例 ===" << std::endl;
131
132 EventLoop event_loop;
133 EventProducer producer;
134
135 // 将eventfd添加到epoll
136 event_loop.addEventFd(producer.getFd());
137
138 // 启动生产者线程
139 std::thread producer_thread(producerThread, std::ref(producer));
140
141 // 运行事件循环
142 event_loop.run();
143
144 // 等待生产者线程完成
145 producer_thread.join();
146
147 std::cout << "=== 示例完成 ===" << std::endl;
148 return 0;
149}
timerfd 是Linux内核提供的一种基于文件描述符的定时器机制。它将定时器抽象为文件描述符,可以与epoll、select、poll等IO多路复用机制集成,实现高效的异步定时器处理。
Note
timerfd是一种时间同步和事件通知机制 ,而不是传统意义上的数据传输IPC。
1. timerfd_create() 函数
xxxxxxxxxx
21
2int timerfd_create(int clockid, int flags);
参数说明:
clockid
:时钟类型
CLOCK_REALTIME
:系统实时时钟
CLOCK_MONOTONIC
:单调递增时钟(推荐)
CLOCK_BOOTTIME
:包含休眠时间的启动时钟
flags
:标志位
TFD_CLOEXEC
:执行exec时关闭
TFD_NONBLOCK
:非阻塞模式
2. timerfd_settime() 函数
xxxxxxxxxx
31int timerfd_settime(int fd, int flags,
2 const struct itimerspec *new_value,
3 struct itimerspec *old_value);
参数说明:
fd
:timerfd文件描述符
flags
:
0
:相对时间
TFD_TIMER_ABSTIME
:绝对时间
new_value
:新的定时器设置
old_value
:返回之前的设置(可为NULL)
3. timerfd_gettime() 函数
xxxxxxxxxx
11int timerfd_gettime(int fd, struct itimerspec *curr_value);
4. itimerspec 结构体
xxxxxxxxxx
91struct itimerspec {
2 struct timespec it_interval; // 周期间隔
3 struct timespec it_value; // 初始到期时间
4};
5
6struct timespec {
7 time_t tv_sec; // 秒
8 long tv_nsec; // 纳秒
9};
xxxxxxxxxx
1531
2
3
4
5
6
7
8
9class TimerManager {
10private:
11 int epoll_fd;
12 struct TimerInfo {
13 int fd;
14 int id;
15 std::string name;
16 bool periodic;
17 };
18 std::vector<TimerInfo> timers;
19
20public:
21 TimerManager() {
22 epoll_fd = epoll_create1(EPOLL_CLOEXEC);
23 if (epoll_fd == -1) {
24 perror("epoll_create1");
25 exit(1);
26 }
27 }
28
29 ~TimerManager() {
30 for (const auto& timer : timers) {
31 close(timer.fd);
32 }
33 if (epoll_fd != -1) {
34 close(epoll_fd);
35 }
36 }
37
38 int addTimer(const std::string& name, int initial_sec, long initial_nsec,
39 int interval_sec = 0, long interval_nsec = 0) {
40 int timer_fd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC);
41 if (timer_fd == -1) {
42 perror("timerfd_create");
43 return -1;
44 }
45
46 struct itimerspec timer_spec;
47 timer_spec.it_value.tv_sec = initial_sec;
48 timer_spec.it_value.tv_nsec = initial_nsec;
49 timer_spec.it_interval.tv_sec = interval_sec;
50 timer_spec.it_interval.tv_nsec = interval_nsec;
51
52 if (timerfd_settime(timer_fd, 0, &timer_spec, nullptr) == -1) {
53 perror("timerfd_settime");
54 close(timer_fd);
55 return -1;
56 }
57
58 struct epoll_event ev;
59 ev.events = EPOLLIN;
60 ev.data.fd = timer_fd;
61
62 if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, timer_fd, &ev) == -1) {
63 perror("epoll_ctl");
64 close(timer_fd);
65 return -1;
66 }
67
68 int timer_id = timers.size();
69 timers.push_back({timer_fd, timer_id, name,
70 (interval_sec > 0 || interval_nsec > 0)});
71
72 return timer_id;
73 }
74
75 void run(int max_events = 10, int timeout_ms = -1) {
76 struct epoll_event events[max_events];
77
78 std::cout << "[定时器管理器] 开始运行" << std::endl;
79
80 while (true) {
81 int nfds = epoll_wait(epoll_fd, events, max_events, timeout_ms);
82
83 if (nfds == -1) {
84 if (errno == EINTR) continue;
85 perror("epoll_wait");
86 break;
87 }
88
89 if (nfds == 0) {
90 std::cout << "[定时器管理器] 超时" << std::endl;
91 continue;
92 }
93
94 for (int i = 0; i < nfds; ++i) {
95 int fd = events[i].data.fd;
96
97 if (events[i].events & EPOLLIN) {
98 handleTimerEvent(fd);
99 }
100 }
101 }
102 }
103
104private:
105 void handleTimerEvent(int fd) {
106 uint64_t expirations;
107 ssize_t result = read(fd, &expirations, sizeof(expirations));
108 if (result != sizeof(expirations)) {
109 perror("read timerfd");
110 return;
111 }
112
113 // 查找对应的定时器
114 for (const auto& timer : timers) {
115 if (timer.fd == fd) {
116 printCurrentTime();
117 std::cout << "定时器 [" << timer.name << "] 触发,过期次数: "
118 << expirations << std::endl;
119
120 // 如果是一次性定时器,可以选择移除
121 if (!timer.periodic) {
122 std::cout << "一次性定时器 [" << timer.name << "] 完成" << std::endl;
123 }
124 break;
125 }
126 }
127 }
128
129 void printCurrentTime() {
130 auto now = std::chrono::system_clock::now();
131 auto time_t = std::chrono::system_clock::to_time_t(now);
132 std::cout << "[" << std::put_time(std::localtime(&time_t), "%H:%M:%S") << "] ";
133 }
134};
135
136int main() {
137 std::cout << "=== TimerFD 与 Epoll 集成示例 ===" << std::endl;
138
139 TimerManager manager;
140
141 // 添加多个定时器
142 manager.addTimer("快速定时器", 1, 0, 2, 0); // 1秒后开始,每2秒触发
143 manager.addTimer("慢速定时器", 3, 0, 5, 0); // 3秒后开始,每5秒触发
144 manager.addTimer("一次性定时器1", 4, 0); // 4秒后触发一次
145 manager.addTimer("一次性定时器2", 7, 0); // 7秒后触发一次
146 manager.addTimer("高频定时器", 2, 0, 0, 500000000); // 2秒后开始,每500毫秒触发
147
148 // 运行事件循环(15秒后超时)
149 manager.run(10, 15000);
150
151 std::cout << "\n=== 示例完成 ===" << std::endl;
152 return 0;
153}