From 631ac6591d0786172f67b9843a16dc4481c253c8 Mon Sep 17 00:00:00 2001 From: gaorui Date: Mon, 15 Dec 2025 15:46:37 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=E6=97=A0=E4=BA=B2=E7=BC=98?= =?UTF-8?q?=E5=85=B3=E7=B3=BB=E8=BF=9B=E7=A8=8B=E9=97=B4=E9=80=9A=E8=BF=87?= =?UTF-8?q?=E6=9C=89=E5=90=8D=E7=AE=A1=E9=81=93=E5=90=8C=E6=AD=A5=EF=BC=8C?= =?UTF-8?q?=E9=80=9A=E8=BF=87=E5=85=B1=E4=BA=AB=E5=86=85=E5=AD=98=E9=80=9A?= =?UTF-8?q?=E4=BF=A1=E7=9A=84=E6=A1=88=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 1_shared_memory/shm_comm/shm_comm_README.md | 134 ++++++++++++++++ 1_shared_memory/shm_comm/shm_comm_a.c | 164 +++++++++---------- 1_shared_memory/shm_comm/shm_comm_b.c | 167 +++++++++----------- 3 files changed, 281 insertions(+), 184 deletions(-) create mode 100755 1_shared_memory/shm_comm/shm_comm_README.md diff --git a/1_shared_memory/shm_comm/shm_comm_README.md b/1_shared_memory/shm_comm/shm_comm_README.md new file mode 100755 index 0000000..b7cbdd4 --- /dev/null +++ b/1_shared_memory/shm_comm/shm_comm_README.md @@ -0,0 +1,134 @@ +# 共享内存 + 有名管道 实现无亲缘关系进程通信 + +## 问题概述 +原有的`shm_pc.c`是父子进程通信模型,使用匿名管道进行同步。需要将其修改为两个无亲缘关系进程间的通信模型,分别为生产者进程和消费者进程。 + +## 解决方案 + +### 1. 进程间通信方式选择 +- **共享内存**:用于高效传输大量数据(任务队列) +- **有名管道(FIFO)**:用于无亲缘关系进程间的同步信号传递 +- **信号量**:用于共享内存的互斥访问和任务队列的同步 + +### 2. 核心实现 + +#### 文件结构 +``` +1_shared_memory/ +├── shm_comm/ +│ ├── shm_comm_a.c # 生产者进程 +│ └── shm_comm_b.c # 消费者进程 +``` + +#### 主要功能 + +##### shm_comm_a.c (生产者) +- 创建共享内存(包含环形任务队列) +- 创建有名管道用于同步 +- 向共享内存提交5个测试任务 +- 发送同步信号通知消费者进程 +- 等待消费者处理完成后销毁共享内存 + +##### shm_comm_b.c (消费者) +- 等待并打开有名管道 +- 接收生产者的同步信号 +- 挂载共享内存 +- 创建线程池处理任务 +- 完成所有任务后退出 + +### 3. 关键技术点 + +#### 环形任务队列 +```c +// 共享内存核心结构体 +typedef struct { + Task task_queue[MAX_TASK]; // 共享任务队列 + int front; // 队列头 + int rear; // 队列尾 + int sem_id; // 信号量集ID +} SharedMem; + +// 入队操作(生产者) +shm->rear = (shm->rear + 1) % MAX_TASK; +shm->task_queue[shm->rear] = task; + +// 出队操作(消费者) +Task task = shm->task_queue[shm->front]; +shm->front = (shm->front + 1) % MAX_TASK; +``` + +#### 信号量机制 +- **互斥信号量**(索引0):保护共享内存的互斥访问 +- **空信号量**(索引1):表示队列中的任务数量,初始值为0 +- **满信号量**(索引2):表示队列中的空闲位置数量,初始值为MAX_TASK + +#### 有名管道同步 +```c +// 生产者创建有名管道 +mkfifo(FIFO_PATH, 0666); + +// 生产者发送同步信号 +int fifo_fd = open(FIFO_PATH, O_WRONLY); +write(fifo_fd, "ok", 2); + +// 消费者接收同步信号 +int fifo_fd = open(FIFO_PATH, O_RDONLY); +read(fifo_fd, buf, 2); +``` + +### 4. 编译和运行 + +#### 编译命令 +```bash +gcc -o shm_comm_a shm_comm_a.c -lpthread +gcc -o shm_comm_b shm_comm_b.c -lpthread +``` + +#### 运行步骤 +1. 启动生产者进程: + ```bash + ./shm_comm_a + ``` + +2. 启动消费者进程: + ```bash + ./shm_comm_b + ``` + +### 5. 测试结果 + +生产者进程输出: +``` +[生产者进程] 启动,创建共享内存... +[生产者进程] 已发送同步信号:ok +[提交进程] 提交任务1:测试任务_1 +[提交进程] 提交任务2:测试任务_2 +[提交进程] 提交任务3:测试任务_3 +[提交进程] 提交任务4:测试任务_4 +[提交进程] 提交任务5:测试任务_5 +[生产者进程] 退出 +``` + +消费者进程输出: +``` +[消费者进程] 收到生产者同步信号:ok +[消费者进程] 启动,创建线程池... +[线程140408006018816] 取出任务1:测试任务_1 +[线程140407997626112] 取出任务2:测试任务_2 +[线程140407989233408] 取出任务3:测试任务_3 +[线程140407980840704] 取出任务4:测试任务_4 +[线程140408006018816] 取出任务5:测试任务_5 +[消费者进程] 停止线程池... +[消费者进程] 退出 +``` + +## 总结 + +本实现成功将父子进程通信模型修改为无亲缘关系进程通信: +1. 使用**有名管道**替代匿名管道实现进程同步 +2. 保持了**共享内存**的高效数据传输特性 +3. 维护了**环形任务队列**的高效空间利用 +4. 通过**信号量**确保了共享资源的安全访问 +5. 实现了**线程池**的多任务并行处理 + +测试结果表明,两个无亲缘关系的进程能够稳定通信并正确处理任务。 \ No newline at end of file diff --git a/1_shared_memory/shm_comm/shm_comm_a.c b/1_shared_memory/shm_comm/shm_comm_a.c index d45c438..37abf6c 100644 --- a/1_shared_memory/shm_comm/shm_comm_a.c +++ b/1_shared_memory/shm_comm/shm_comm_a.c @@ -1,7 +1,5 @@ - - // 用C语言实现“线程池+共享内存”的进程间通信demo(模拟驱动与上层应用交互); -// 进程间通信是父进程和子进程之间通信,子进程通过fork创建 +// 进程间通信是进程a与进程b通信,当前是进程a的代码,单线程生产者模型 #include #include @@ -15,12 +13,15 @@ #include #include #include +#include // 有名管道需要的头文件 +#include // 有名管道需要的头文件 // ====================== 全局配置 ====================== #define MAX_TASK 10 // 共享任务队列最大长度 #define THREAD_POOL_SIZE 4 // 线程池大小 #define SHM_KEY 0x1234 // 共享内存键值 #define SEM_KEY 0x5678 // 信号量键值 +#define FIFO_PATH "/tmp/shm_comm_fifo" // 有名管道路径 #define PIPE_TIMEOUT_SEC 5 // 管道超时时间(秒) @@ -44,14 +45,6 @@ typedef struct { // 共享内存核心结构体(包含任务队列 + 信号量) typedef struct { -// 1. front、rear 通过 % 数组容量 循环移动,数值在 0~ 容量 - 1 之间往复 -// 2. 使用固定大小的数组来作为环形队列的存储空间 - - -// 顺序队列和环形队列 -// 顺序队列是固定大小的数组存储,环形队列是循环利用数组空间的队列 -// 环形队列的优势是可以充分利用数组空间,避免了顺序队列的假溢出问题 -// 环形队列的实现需要注意队头队尾的循环移动问题 Task task_queue[MAX_TASK]; // 共享任务队列 int front; // 队列头 int rear; // 队列尾 @@ -66,6 +59,8 @@ typedef struct { } ThreadPool; + + // - 创建包含3个信号量的信号量集 // - 互斥信号量(索引0):初始值1表示互斥,用于保护共享内存的互斥访问 // - 空信号量(索引1):初始值0,用于表示队列中的任务数量 @@ -412,98 +407,81 @@ int pipe_read_with_timeout(int fd, char *buf, int len, int timeout_sec) { } // ====================== 主函数(进程分支) ====================== -int main() { - // 创建管道(父子进程同步用),这里是无名管道,只能用于父子进程之间通信 - // 因为它们共享文件描述符 - // pipe_fd[0]读,子进程只读这个fd,pipe_fd[1]写,父进程只写这个fd - // 使用管道确保在共享内存创建完成后子进程再挂载共享内存 - int pipe_fd[2]; - if (pipe(pipe_fd) == -1) { - perror("pipe create failed"); - return -1; - } - - // 创建子进程 - pid_t pid = fork(); - if (pid == -1) { - perror("fork failed"); - return -1; - } - - // 子进程:工作进程(线程池 + 消费任务),子进程多线程处理父进程提交过来的任务 - if (pid == 0) { - // 关闭管道写端(子进程只读) - close(pipe_fd[1]); +// 带超时的有名管道读函数 +int fifo_read_with_timeout(int fd, char *buf, int len, int timeout_sec) { + fd_set read_fds; + struct timeval timeout; - char buf[2]; - // 带超时读取父进程的同步信号 - int read_ret = pipe_read_with_timeout(pipe_fd[0], buf, 2, PIPE_TIMEOUT_SEC); - if (read_ret <= 0) { - close(pipe_fd[0]); - return -1; - } - printf("[子进程] 收到父进程同步信号:%s\n", buf); + // 初始化fd集合 + FD_ZERO(&read_fds); + FD_SET(fd, &read_fds); - // 挂载共享内存 - printf("[工作进程] 启动,创建线程池...\n"); - SharedMem *shm = shm_attach(); - if (shm == NULL) { - close(pipe_fd[0]); - return -1; - } + // 设置超时时间 + timeout.tv_sec = timeout_sec; + timeout.tv_usec = 0; - // 初始化线程池 - ThreadPool pool = {0}; - if (thread_pool_init(&pool, shm) == -1) { - shm_detach_only(shm); - close(pipe_fd[0]); - return -1; + // 监听fd是否可读 + int ret = select(fd + 1, &read_fds, NULL, NULL, &timeout); + if (ret < 0) { + perror("select failed"); + return -1; + } else if (ret == 0) { + fprintf(stderr, "[生产者进程] 有名管道读超时\n"); + return 0; + } else { + if (FD_ISSET(fd, &read_fds)) { + return read(fd, buf, len); } + } + return -1; +} - // 运行10秒后停止线程池 - sleep(10); - printf("[工作进程] 停止线程池...\n"); - thread_pool_destroy(&pool); - - // 子进程仅分离共享内存,不销毁内核资源 - shm_detach_only(shm); - close(pipe_fd[0]); - printf("[工作进程] 退出\n"); - return 0; +int main() { + // 创建有名管道 + if (mkfifo(FIFO_PATH, 0666) == -1 && errno != EEXIST) { + perror("mkfifo failed"); + return -1; } - // 父进程:任务提交进程,创建任务。父进程单线程提交任务 - else { - // 关闭管道读端(父进程只写) - close(pipe_fd[0]); + // 创建共享内存 + printf("[生产者进程] 启动,创建共享内存...\n"); + SharedMem *shm = shm_create(); + if (shm == NULL) { + return -1; + } - // 创建共享内存 - printf("[提交进程] 启动,创建共享内存...\n"); - SharedMem *shm = shm_create(); - if (shm == NULL) { - close(pipe_fd[1]); - return -1; - } + // 打开有名管道写端 + int fifo_fd = open(FIFO_PATH, O_WRONLY); + if (fifo_fd == -1) { + perror("open fifo failed"); + shm_destroy(shm); + return -1; + } - // 发送同步信号(告知子进程共享内存已创建) - write(pipe_fd[1], "ok", 2); - printf("[提交进程] 已发送同步信号:ok\n"); + // 发送同步信号(告知消费者进程共享内存已创建) + write(fifo_fd, "ok", 2); + printf("[生产者进程] 已发送同步信号:ok\n"); - // 提交5个测试任务 - for (int i = 1; i <= 5; i++) { - char content[64]; - snprintf(content, sizeof(content), "测试任务_%d", i); - task_submit(shm, i, content); - sleep(1); - } + // 关闭有名管道 + close(fifo_fd); - // 等待子进程退出 - wait(NULL); - - // 父进程统一销毁共享内存和信号量 - shm_destroy(shm); - close(pipe_fd[1]); - printf("[提交进程] 退出\n"); - return 0; + // 提交5个测试任务 + for (int i = 1; i <= 5; i++) { + char content[64]; + snprintf(content, sizeof(content), "测试任务_%d", i); + task_submit(shm, i, content); + sleep(1); } + + // 等待一段时间让消费者进程完成任务 + sleep(15); + + // 生产者进程统一销毁共享内存和信号量 + shm_destroy(shm); + + // 删除有名管道 + unlink(FIFO_PATH); + + printf("[生产者进程] 退出\n"); + return 0; } \ No newline at end of file diff --git a/1_shared_memory/shm_comm/shm_comm_b.c b/1_shared_memory/shm_comm/shm_comm_b.c index d45c438..8110c87 100644 --- a/1_shared_memory/shm_comm/shm_comm_b.c +++ b/1_shared_memory/shm_comm/shm_comm_b.c @@ -1,7 +1,5 @@ - - // 用C语言实现“线程池+共享内存”的进程间通信demo(模拟驱动与上层应用交互); -// 进程间通信是父进程和子进程之间通信,子进程通过fork创建 +// 进程间通信是进程a与进程b通信,当前是进程b的代码,多线程消费者模型 #include #include @@ -15,13 +13,17 @@ #include #include #include +#include // 有名管道需要的头文件 +#include // 有名管道需要的头文件 // ====================== 全局配置 ====================== #define MAX_TASK 10 // 共享任务队列最大长度 #define THREAD_POOL_SIZE 4 // 线程池大小 #define SHM_KEY 0x1234 // 共享内存键值 #define SEM_KEY 0x5678 // 信号量键值 +#define FIFO_PATH "/tmp/shm_comm_fifo" // 有名管道路径 #define PIPE_TIMEOUT_SEC 5 // 管道超时时间(秒) +#define FIFO_WAIT_TIMEOUT_SEC 10 // 等待有名管道创建的超时时间(秒) // 共享内存中是一个消息队列,这个队列是环形的 @@ -44,14 +46,6 @@ typedef struct { // 共享内存核心结构体(包含任务队列 + 信号量) typedef struct { -// 1. front、rear 通过 % 数组容量 循环移动,数值在 0~ 容量 - 1 之间往复 -// 2. 使用固定大小的数组来作为环形队列的存储空间 - - -// 顺序队列和环形队列 -// 顺序队列是固定大小的数组存储,环形队列是循环利用数组空间的队列 -// 环形队列的优势是可以充分利用数组空间,避免了顺序队列的假溢出问题 -// 环形队列的实现需要注意队头队尾的循环移动问题 Task task_queue[MAX_TASK]; // 共享任务队列 int front; // 队列头 int rear; // 队列尾 @@ -412,98 +406,89 @@ int pipe_read_with_timeout(int fd, char *buf, int len, int timeout_sec) { } // ====================== 主函数(进程分支) ====================== -int main() { - // 创建管道(父子进程同步用),这里是无名管道,只能用于父子进程之间通信 - // 因为它们共享文件描述符 - // pipe_fd[0]读,子进程只读这个fd,pipe_fd[1]写,父进程只写这个fd - // 使用管道确保在共享内存创建完成后子进程再挂载共享内存 - int pipe_fd[2]; - if (pipe(pipe_fd) == -1) { - perror("pipe create failed"); - return -1; - } - - // 创建子进程 - pid_t pid = fork(); - if (pid == -1) { - perror("fork failed"); - return -1; - } +// 带超时的有名管道读函数 +int fifo_read_with_timeout(int fd, char *buf, int len, int timeout_sec) { + fd_set read_fds; + struct timeval timeout; - // 子进程:工作进程(线程池 + 消费任务),子进程多线程处理父进程提交过来的任务 - if (pid == 0) { - // 关闭管道写端(子进程只读) - close(pipe_fd[1]); + // 初始化fd集合 + FD_ZERO(&read_fds); + FD_SET(fd, &read_fds); - char buf[2]; - // 带超时读取父进程的同步信号 - int read_ret = pipe_read_with_timeout(pipe_fd[0], buf, 2, PIPE_TIMEOUT_SEC); - if (read_ret <= 0) { - close(pipe_fd[0]); - return -1; - } - printf("[子进程] 收到父进程同步信号:%s\n", buf); + // 设置超时时间 + timeout.tv_sec = timeout_sec; + timeout.tv_usec = 0; - // 挂载共享内存 - printf("[工作进程] 启动,创建线程池...\n"); - SharedMem *shm = shm_attach(); - if (shm == NULL) { - close(pipe_fd[0]); - return -1; + // 监听fd是否可读 + int ret = select(fd + 1, &read_fds, NULL, NULL, &timeout); + if (ret < 0) { + perror("select failed"); + return -1; + } else if (ret == 0) { + fprintf(stderr, "[消费者进程] 有名管道读超时\n"); + return 0; + } else { + if (FD_ISSET(fd, &read_fds)) { + return read(fd, buf, len); } + } + return -1; +} - // 初始化线程池 - ThreadPool pool = {0}; - if (thread_pool_init(&pool, shm) == -1) { - shm_detach_only(shm); - close(pipe_fd[0]); +int main() { + // 等待有名管道创建 + int fifo_wait_count = 0; + while (access(FIFO_PATH, F_OK) == -1) { + if (fifo_wait_count >= FIFO_WAIT_TIMEOUT_SEC) { + fprintf(stderr, "[消费者进程] 等待有名管道超时\n"); return -1; } - - // 运行10秒后停止线程池 - sleep(10); - printf("[工作进程] 停止线程池...\n"); - thread_pool_destroy(&pool); - - // 子进程仅分离共享内存,不销毁内核资源 - shm_detach_only(shm); - close(pipe_fd[0]); - printf("[工作进程] 退出\n"); - return 0; + printf("[消费者进程] 等待有名管道创建... (%d/%d)\n", fifo_wait_count + 1, FIFO_WAIT_TIMEOUT_SEC); + sleep(1); + fifo_wait_count++; } - // 父进程:任务提交进程,创建任务。父进程单线程提交任务 - else { - // 关闭管道读端(父进程只写) - close(pipe_fd[0]); + // 打开有名管道读端 + int fifo_fd = open(FIFO_PATH, O_RDONLY); + if (fifo_fd == -1) { + perror("open fifo failed"); + return -1; + } - // 创建共享内存 - printf("[提交进程] 启动,创建共享内存...\n"); - SharedMem *shm = shm_create(); - if (shm == NULL) { - close(pipe_fd[1]); - return -1; - } + char buf[2]; + // 带超时读取生产者的同步信号 + int read_ret = fifo_read_with_timeout(fifo_fd, buf, 2, PIPE_TIMEOUT_SEC); + if (read_ret <= 0) { + close(fifo_fd); + return -1; + } + printf("[消费者进程] 收到生产者同步信号:%s\n", buf); - // 发送同步信号(告知子进程共享内存已创建) - write(pipe_fd[1], "ok", 2); - printf("[提交进程] 已发送同步信号:ok\n"); + // 关闭有名管道 + close(fifo_fd); - // 提交5个测试任务 - for (int i = 1; i <= 5; i++) { - char content[64]; - snprintf(content, sizeof(content), "测试任务_%d", i); - task_submit(shm, i, content); - sleep(1); - } + // 挂载共享内存 + printf("[消费者进程] 启动,创建线程池...\n"); + SharedMem *shm = shm_attach(); + if (shm == NULL) { + return -1; + } - // 等待子进程退出 - wait(NULL); - - // 父进程统一销毁共享内存和信号量 - shm_destroy(shm); - close(pipe_fd[1]); - printf("[提交进程] 退出\n"); - return 0; + // 初始化线程池 + ThreadPool pool = {0}; + if (thread_pool_init(&pool, shm) == -1) { + shm_detach_only(shm); + return -1; } + + // 运行10秒后停止线程池 + sleep(10); + printf("[消费者进程] 停止线程池...\n"); + thread_pool_destroy(&pool); + + // 仅分离共享内存,不销毁内核资源(由生产者销毁) + shm_detach_only(shm); + + printf("[消费者进程] 退出\n"); + return 0; } \ No newline at end of file