Browse Source

完善无亲缘关系进程间通过有名管道同步,通过共享内存通信的案例

test
gaorui 5 days ago
parent
commit
631ac6591d
  1. 134
      1_shared_memory/shm_comm/shm_comm_README.md
  2. 158
      1_shared_memory/shm_comm/shm_comm_a.c
  3. 163
      1_shared_memory/shm_comm/shm_comm_b.c

134
1_shared_memory/shm_comm/shm_comm_README.md

@ -0,0 +1,134 @@
# 共享内存 + 有名管道 实现无亲缘关系进程通信
## 问题概述
原有的`shm_pc.c`是父子进程通信模型,使用匿名管道进行同步。需要将其修改为两个无亲缘关系进程间的通信模型,分别为生产者进程和消费者进程。
## 解决方案
### 1. 进程间通信方式选择
- **共享内存**:用于高效传输大量数据(任务队列)
- **有名管道(FIFO)**:用于无亲缘关系进程间的同步信号传递
- **信号量**:用于共享内存的互斥访问和任务队列的同步
### 2. 核心实现
#### 文件结构
```
1_shared_memory/
├── shm_comm/
│ ├── shm_comm_a.c # 生产者进程
│ └── shm_comm_b.c # 消费者进程
```
#### 主要功能
##### shm_comm_a.c (生产者)
- 创建共享内存(包含环形任务队列)
- 创建有名管道用于同步
- 向共享内存提交5个测试任务
- 发送同步信号通知消费者进程
- 等待消费者处理完成后销毁共享内存
##### shm_comm_b.c (消费者)
- 等待并打开有名管道
- 接收生产者的同步信号
- 挂载共享内存
- 创建线程池处理任务
- 完成所有任务后退出
### 3. 关键技术点
#### 环形任务队列
```c
// 共享内存核心结构体
typedef struct {
Task task_queue[MAX_TASK]; // 共享任务队列
int front; // 队列头
int rear; // 队列尾
int sem_id; // 信号量集ID
} SharedMem;
// 入队操作(生产者)
shm->rear = (shm->rear + 1) % MAX_TASK;
shm->task_queue[shm->rear] = task;
// 出队操作(消费者)
Task task = shm->task_queue[shm->front];
shm->front = (shm->front + 1) % MAX_TASK;
```
#### 信号量机制
- **互斥信号量**(索引0):保护共享内存的互斥访问
- **空信号量**(索引1):表示队列中的任务数量,初始值为0
- **满信号量**(索引2):表示队列中的空闲位置数量,初始值为MAX_TASK
#### 有名管道同步
```c
// 生产者创建有名管道
mkfifo(FIFO_PATH, 0666);
// 生产者发送同步信号
int fifo_fd = open(FIFO_PATH, O_WRONLY);
write(fifo_fd, "ok", 2);
// 消费者接收同步信号
int fifo_fd = open(FIFO_PATH, O_RDONLY);
read(fifo_fd, buf, 2);
```
### 4. 编译和运行
#### 编译命令
```bash
gcc -o shm_comm_a shm_comm_a.c -lpthread
gcc -o shm_comm_b shm_comm_b.c -lpthread
```
#### 运行步骤
1. 启动生产者进程:
```bash
./shm_comm_a
```
2. 启动消费者进程:
```bash
./shm_comm_b
```
### 5. 测试结果
生产者进程输出:
```
[生产者进程] 启动,创建共享内存...
[生产者进程] 已发送同步信号:ok
[提交进程] 提交任务1:测试任务_1
[提交进程] 提交任务2:测试任务_2
[提交进程] 提交任务3:测试任务_3
[提交进程] 提交任务4:测试任务_4
[提交进程] 提交任务5:测试任务_5
[生产者进程] 退出
```
消费者进程输出:
```
[消费者进程] 收到生产者同步信号:ok
[消费者进程] 启动,创建线程池...
[线程140408006018816] 取出任务1:测试任务_1
[线程140407997626112] 取出任务2:测试任务_2
[线程140407989233408] 取出任务3:测试任务_3
[线程140407980840704] 取出任务4:测试任务_4
[线程140408006018816] 取出任务5:测试任务_5
[消费者进程] 停止线程池...
[消费者进程] 退出
```
## 总结
本实现成功将父子进程通信模型修改为无亲缘关系进程通信:
1. 使用**有名管道**替代匿名管道实现进程同步
2. 保持了**共享内存**的高效数据传输特性
3. 维护了**环形任务队列**的高效空间利用
4. 通过**信号量**确保了共享资源的安全访问
5. 实现了**线程池**的多任务并行处理
测试结果表明,两个无亲缘关系的进程能够稳定通信并正确处理任务。

158
1_shared_memory/shm_comm/shm_comm_a.c

@ -1,7 +1,5 @@
// 用C语言实现“线程池+共享内存”的进程间通信demo(模拟驱动与上层应用交互); // 用C语言实现“线程池+共享内存”的进程间通信demo(模拟驱动与上层应用交互);
// 进程间通信是父进程和子进程之间通信,子进程通过fork创建 // 进程间通信是进程a与进程b通信,当前是进程a的代码,单线程生产者模型
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
@ -15,12 +13,15 @@
#include <sys/select.h> #include <sys/select.h>
#include <sys/wait.h> #include <sys/wait.h>
#include <signal.h> #include <signal.h>
#include <sys/stat.h> // 有名管道需要的头文件
#include <fcntl.h> // 有名管道需要的头文件
// ====================== 全局配置 ====================== // ====================== 全局配置 ======================
#define MAX_TASK 10 // 共享任务队列最大长度 #define MAX_TASK 10 // 共享任务队列最大长度
#define THREAD_POOL_SIZE 4 // 线程池大小 #define THREAD_POOL_SIZE 4 // 线程池大小
#define SHM_KEY 0x1234 // 共享内存键值 #define SHM_KEY 0x1234 // 共享内存键值
#define SEM_KEY 0x5678 // 信号量键值 #define SEM_KEY 0x5678 // 信号量键值
#define FIFO_PATH "/tmp/shm_comm_fifo" // 有名管道路径
#define PIPE_TIMEOUT_SEC 5 // 管道超时时间(秒) #define PIPE_TIMEOUT_SEC 5 // 管道超时时间(秒)
@ -44,14 +45,6 @@ typedef struct {
// 共享内存核心结构体(包含任务队列 + 信号量) // 共享内存核心结构体(包含任务队列 + 信号量)
typedef struct { typedef struct {
// 1. front、rear 通过 % 数组容量 循环移动,数值在 0~ 容量 - 1 之间往复
// 2. 使用固定大小的数组来作为环形队列的存储空间
// 顺序队列和环形队列
// 顺序队列是固定大小的数组存储,环形队列是循环利用数组空间的队列
// 环形队列的优势是可以充分利用数组空间,避免了顺序队列的假溢出问题
// 环形队列的实现需要注意队头队尾的循环移动问题
Task task_queue[MAX_TASK]; // 共享任务队列 Task task_queue[MAX_TASK]; // 共享任务队列
int front; // 队列头 int front; // 队列头
int rear; // 队列尾 int rear; // 队列尾
@ -66,6 +59,8 @@ typedef struct {
} ThreadPool; } ThreadPool;
// - 创建包含3个信号量的信号量集 // - 创建包含3个信号量的信号量集
// - 互斥信号量(索引0):初始值1表示互斥,用于保护共享内存的互斥访问 // - 互斥信号量(索引0):初始值1表示互斥,用于保护共享内存的互斥访问
// - 空信号量(索引1):初始值0,用于表示队列中的任务数量 // - 空信号量(索引1):初始值0,用于表示队列中的任务数量
@ -412,98 +407,81 @@ int pipe_read_with_timeout(int fd, char *buf, int len, int timeout_sec) {
} }
// ====================== 主函数(进程分支) ====================== // ====================== 主函数(进程分支) ======================
int main() { // 带超时的有名管道读函数
// 创建管道(父子进程同步用),这里是无名管道,只能用于父子进程之间通信 int fifo_read_with_timeout(int fd, char *buf, int len, int timeout_sec) {
// 因为它们共享文件描述符 fd_set read_fds;
// pipe_fd[0]读,子进程只读这个fd,pipe_fd[1]写,父进程只写这个fd struct timeval timeout;
// 使用管道确保在共享内存创建完成后子进程再挂载共享内存
int pipe_fd[2]; // 初始化fd集合
if (pipe(pipe_fd) == -1) { FD_ZERO(&read_fds);
perror("pipe create failed"); FD_SET(fd, &read_fds);
// 设置超时时间
timeout.tv_sec = timeout_sec;
timeout.tv_usec = 0;
// 监听fd是否可读
int ret = select(fd + 1, &read_fds, NULL, NULL, &timeout);
if (ret < 0) {
perror("select failed");
return -1; return -1;
} else if (ret == 0) {
fprintf(stderr, "[生产者进程] 有名管道读超时\n");
return 0;
} else {
if (FD_ISSET(fd, &read_fds)) {
return read(fd, buf, len);
}
} }
return -1;
}
// 创建子进程 int main() {
pid_t pid = fork(); // 创建有名管道
if (pid == -1) { if (mkfifo(FIFO_PATH, 0666) == -1 && errno != EEXIST) {
perror("fork failed"); perror("mkfifo failed");
return -1; return -1;
} }
// 子进程:工作进程(线程池 + 消费任务),子进程多线程处理父进程提交过来的任务 // 创建共享内存
if (pid == 0) { printf("[生产者进程] 启动,创建共享内存...\n");
// 关闭管道写端(子进程只读) SharedMem *shm = shm_create();
close(pipe_fd[1]); if (shm == NULL) {
return -1;
}
char buf[2]; // 打开有名管道写端
// 带超时读取父进程的同步信号 int fifo_fd = open(FIFO_PATH, O_WRONLY);
int read_ret = pipe_read_with_timeout(pipe_fd[0], buf, 2, PIPE_TIMEOUT_SEC); if (fifo_fd == -1) {
if (read_ret <= 0) { perror("open fifo failed");
close(pipe_fd[0]); shm_destroy(shm);
return -1; return -1;
} }
printf("[子进程] 收到父进程同步信号:%s\n", buf);
// 挂载共享内存 // 发送同步信号(告知消费者进程共享内存已创建)
printf("[工作进程] 启动,创建线程池...\n"); write(fifo_fd, "ok", 2);
SharedMem *shm = shm_attach(); printf("[生产者进程] 已发送同步信号:ok\n");
if (shm == NULL) {
close(pipe_fd[0]);
return -1;
}
// 初始化线程池 // 关闭有名管道
ThreadPool pool = {0}; close(fifo_fd);
if (thread_pool_init(&pool, shm) == -1) {
shm_detach_only(shm);
close(pipe_fd[0]);
return -1;
}
// 运行10秒后停止线程池
sleep(10);
printf("[工作进程] 停止线程池...\n");
thread_pool_destroy(&pool);
// 子进程仅分离共享内存,不销毁内核资源 // 提交5个测试任务
shm_detach_only(shm); for (int i = 1; i <= 5; i++) {
close(pipe_fd[0]); char content[64];
printf("[工作进程] 退出\n"); snprintf(content, sizeof(content), "测试任务_%d", i);
return 0; task_submit(shm, i, content);
sleep(1);
} }
// 父进程:任务提交进程,创建任务。父进程单线程提交任务 // 等待一段时间让消费者进程完成任务
else { sleep(15);
// 关闭管道读端(父进程只写)
close(pipe_fd[0]);
// 创建共享内存 // 生产者进程统一销毁共享内存和信号量
printf("[提交进程] 启动,创建共享内存...\n"); shm_destroy(shm);
SharedMem *shm = shm_create();
if (shm == NULL) {
close(pipe_fd[1]);
return -1;
}
// 发送同步信号(告知子进程共享内存已创建) // 删除有名管道
write(pipe_fd[1], "ok", 2); unlink(FIFO_PATH);
printf("[提交进程] 已发送同步信号:ok\n");
// 提交5个测试任务 printf("[生产者进程] 退出\n");
for (int i = 1; i <= 5; i++) { return 0;
char content[64];
snprintf(content, sizeof(content), "测试任务_%d", i);
task_submit(shm, i, content);
sleep(1);
}
// 等待子进程退出
wait(NULL);
// 父进程统一销毁共享内存和信号量
shm_destroy(shm);
close(pipe_fd[1]);
printf("[提交进程] 退出\n");
return 0;
}
} }

163
1_shared_memory/shm_comm/shm_comm_b.c

@ -1,7 +1,5 @@
// 用C语言实现“线程池+共享内存”的进程间通信demo(模拟驱动与上层应用交互); // 用C语言实现“线程池+共享内存”的进程间通信demo(模拟驱动与上层应用交互);
// 进程间通信是父进程和子进程之间通信,子进程通过fork创建 // 进程间通信是进程a与进程b通信,当前是进程b的代码,多线程消费者模型
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
@ -15,13 +13,17 @@
#include <sys/select.h> #include <sys/select.h>
#include <sys/wait.h> #include <sys/wait.h>
#include <signal.h> #include <signal.h>
#include <sys/stat.h> // 有名管道需要的头文件
#include <fcntl.h> // 有名管道需要的头文件
// ====================== 全局配置 ====================== // ====================== 全局配置 ======================
#define MAX_TASK 10 // 共享任务队列最大长度 #define MAX_TASK 10 // 共享任务队列最大长度
#define THREAD_POOL_SIZE 4 // 线程池大小 #define THREAD_POOL_SIZE 4 // 线程池大小
#define SHM_KEY 0x1234 // 共享内存键值 #define SHM_KEY 0x1234 // 共享内存键值
#define SEM_KEY 0x5678 // 信号量键值 #define SEM_KEY 0x5678 // 信号量键值
#define FIFO_PATH "/tmp/shm_comm_fifo" // 有名管道路径
#define PIPE_TIMEOUT_SEC 5 // 管道超时时间(秒) #define PIPE_TIMEOUT_SEC 5 // 管道超时时间(秒)
#define FIFO_WAIT_TIMEOUT_SEC 10 // 等待有名管道创建的超时时间(秒)
// 共享内存中是一个消息队列,这个队列是环形的 // 共享内存中是一个消息队列,这个队列是环形的
@ -44,14 +46,6 @@ typedef struct {
// 共享内存核心结构体(包含任务队列 + 信号量) // 共享内存核心结构体(包含任务队列 + 信号量)
typedef struct { typedef struct {
// 1. front、rear 通过 % 数组容量 循环移动,数值在 0~ 容量 - 1 之间往复
// 2. 使用固定大小的数组来作为环形队列的存储空间
// 顺序队列和环形队列
// 顺序队列是固定大小的数组存储,环形队列是循环利用数组空间的队列
// 环形队列的优势是可以充分利用数组空间,避免了顺序队列的假溢出问题
// 环形队列的实现需要注意队头队尾的循环移动问题
Task task_queue[MAX_TASK]; // 共享任务队列 Task task_queue[MAX_TASK]; // 共享任务队列
int front; // 队列头 int front; // 队列头
int rear; // 队列尾 int rear; // 队列尾
@ -412,98 +406,89 @@ int pipe_read_with_timeout(int fd, char *buf, int len, int timeout_sec) {
} }
// ====================== 主函数(进程分支) ====================== // ====================== 主函数(进程分支) ======================
int main() { // 带超时的有名管道读函数
// 创建管道(父子进程同步用),这里是无名管道,只能用于父子进程之间通信 int fifo_read_with_timeout(int fd, char *buf, int len, int timeout_sec) {
// 因为它们共享文件描述符 fd_set read_fds;
// pipe_fd[0]读,子进程只读这个fd,pipe_fd[1]写,父进程只写这个fd struct timeval timeout;
// 使用管道确保在共享内存创建完成后子进程再挂载共享内存
int pipe_fd[2];
if (pipe(pipe_fd) == -1) {
perror("pipe create failed");
return -1;
}
// 创建子进程
pid_t pid = fork();
if (pid == -1) {
perror("fork failed");
return -1;
}
// 子进程:工作进程(线程池 + 消费任务),子进程多线程处理父进程提交过来的任务 // 初始化fd集合
if (pid == 0) { FD_ZERO(&read_fds);
// 关闭管道写端(子进程只读) FD_SET(fd, &read_fds);
close(pipe_fd[1]);
char buf[2]; // 设置超时时间
// 带超时读取父进程的同步信号 timeout.tv_sec = timeout_sec;
int read_ret = pipe_read_with_timeout(pipe_fd[0], buf, 2, PIPE_TIMEOUT_SEC); timeout.tv_usec = 0;
if (read_ret <= 0) {
close(pipe_fd[0]);
return -1;
}
printf("[子进程] 收到父进程同步信号:%s\n", buf);
// 挂载共享内存 // 监听fd是否可读
printf("[工作进程] 启动,创建线程池...\n"); int ret = select(fd + 1, &read_fds, NULL, NULL, &timeout);
SharedMem *shm = shm_attach(); if (ret < 0) {
if (shm == NULL) { perror("select failed");
close(pipe_fd[0]); return -1;
return -1; } else if (ret == 0) {
fprintf(stderr, "[消费者进程] 有名管道读超时\n");
return 0;
} else {
if (FD_ISSET(fd, &read_fds)) {
return read(fd, buf, len);
} }
}
return -1;
}
// 初始化线程池 int main() {
ThreadPool pool = {0}; // 等待有名管道创建
if (thread_pool_init(&pool, shm) == -1) { int fifo_wait_count = 0;
shm_detach_only(shm); while (access(FIFO_PATH, F_OK) == -1) {
close(pipe_fd[0]); if (fifo_wait_count >= FIFO_WAIT_TIMEOUT_SEC) {
fprintf(stderr, "[消费者进程] 等待有名管道超时\n");
return -1; return -1;
} }
printf("[消费者进程] 等待有名管道创建... (%d/%d)\n", fifo_wait_count + 1, FIFO_WAIT_TIMEOUT_SEC);
sleep(1);
fifo_wait_count++;
}
// 运行10秒后停止线程池 // 打开有名管道读端
sleep(10); int fifo_fd = open(FIFO_PATH, O_RDONLY);
printf("[工作进程] 停止线程池...\n"); if (fifo_fd == -1) {
thread_pool_destroy(&pool); perror("open fifo failed");
return -1;
}
// 子进程仅分离共享内存,不销毁内核资源 char buf[2];
shm_detach_only(shm); // 带超时读取生产者的同步信号
close(pipe_fd[0]); int read_ret = fifo_read_with_timeout(fifo_fd, buf, 2, PIPE_TIMEOUT_SEC);
printf("[工作进程] 退出\n"); if (read_ret <= 0) {
return 0; close(fifo_fd);
return -1;
} }
printf("[消费者进程] 收到生产者同步信号:%s\n", buf);
// 父进程:任务提交进程,创建任务。父进程单线程提交任务 // 关闭有名管道
else { close(fifo_fd);
// 关闭管道读端(父进程只写)
close(pipe_fd[0]);
// 创建共享内存 // 挂载共享内存
printf("[提交进程] 启动,创建共享内存...\n"); printf("[消费者进程] 启动,创建线程池...\n");
SharedMem *shm = shm_create(); SharedMem *shm = shm_attach();
if (shm == NULL) { if (shm == NULL) {
close(pipe_fd[1]); return -1;
return -1; }
}
// 发送同步信号(告知子进程共享内存已创建) // 初始化线程池
write(pipe_fd[1], "ok", 2); ThreadPool pool = {0};
printf("[提交进程] 已发送同步信号:ok\n"); if (thread_pool_init(&pool, shm) == -1) {
shm_detach_only(shm);
return -1;
}
// 提交5个测试任务 // 运行10秒后停止线程池
for (int i = 1; i <= 5; i++) { sleep(10);
char content[64]; printf("[消费者进程] 停止线程池...\n");
snprintf(content, sizeof(content), "测试任务_%d", i); thread_pool_destroy(&pool);
task_submit(shm, i, content);
sleep(1);
}
// 等待子进程退出 // 仅分离共享内存,不销毁内核资源(由生产者销毁)
wait(NULL); shm_detach_only(shm);
// 父进程统一销毁共享内存和信号量 printf("[消费者进程] 退出\n");
shm_destroy(shm); return 0;
close(pipe_fd[1]);
printf("[提交进程] 退出\n");
return 0;
}
} }
Loading…
Cancel
Save