// 用C语言实现“线程池+共享内存”的进程间通信demo(模拟驱动与上层应用交互); // 进程间通信是父进程和子进程之间通信,子进程通过fork创建 #include #include #include #include #include #include #include #include #include #include #include #include // ====================== 全局配置 ====================== #define MAX_TASK 10 // 共享任务队列最大长度 #define THREAD_POOL_SIZE 4 // 线程池大小 #define SHM_KEY 0x1234 // 共享内存键值 #define SEM_KEY 0x5678 // 信号量键值 #define PIPE_TIMEOUT_SEC 5 // 管道超时时间(秒) // 共享内存中是一个消息队列,这个队列是环形的 // 任务状态枚举 typedef enum { TASK_INIT = 0, // 初始态 TASK_RUNNING, // 执行中 TASK_DONE // 完成态 } TaskStatus; // 共享内存中的任务结构体 typedef struct { int task_id; char task_content[64]; // 任务内容 TaskStatus status; // 任务状态 } Task; // 共享内存核心结构体(包含任务队列 + 信号量) typedef struct { Task task_queue[MAX_TASK]; // 共享任务队列 int front; // 队列头 int rear; // 队列尾 int sem_id; // 信号量集ID } SharedMem; // 线程池参数 typedef struct { SharedMem *shm; // 指向共享内存的指针 pthread_t tid[THREAD_POOL_SIZE]; // 线程ID数组 int pool_running; // 线程池运行标记(1:运行,0:停止) } ThreadPool; // - 创建包含3个信号量的信号量集 // - 互斥信号量(索引0):初始值1表示互斥,用于保护共享内存的互斥访问 // - 空信号量(索引1):初始值0,用于表示队列中的任务数量 // - 满信号量(索引2):初始值MAX_TASK,用于表示队列中的空闲位置数量 // 将信号量的值设置为不同的数值代表不同的用法 // - 0,同步信号量,出食物可用资源,所有申请P操作的进程都会阻塞,直到有进程执行C操作释放资源 // - 1,互斥信号量(二值信号量),初始一个资源,仅允许1个进程进入临界访问共享资源 // - N(N>1,如2,3,5)技术信号量,初始N个资源,允许最多N个进程同时进入临界区(多进程共享有限资源) // ====================== 信号量操作封装 ====================== // 信号量初始化(创建/获取信号量集) int sem_init(int sem_key) { int sem_id = semget(sem_key, 3, IPC_CREAT | 0666); // 创建三个信号量,0表示互斥信号量,用来互斥共享内存;1表示空信号量,2表示满信号量,用来表示队列中的任务数量 // 信号量集是内核资源,生命周期独立于创建它的进程 // 所以在整个程序中或者其他程序中都可以通过键值访问这个信号量集 if (sem_id == -1) { perror("semget failed"); return -1; } union semun { int val; struct semid_ds *buf; unsigned short *array; } sem_union; // 互斥信号量(索引0):初始值1 sem_union.val = 1; if (semctl(sem_id, 0, SETVAL, sem_union) == -1) { perror("semctl mutex failed"); return -1; } // 空信号量(索引1):初始值0(队列初始空) sem_union.val = 0; if (semctl(sem_id, 1, SETVAL, sem_union) == -1) { perror("semctl empty failed"); return -1; } // 满信号量(索引2):初始值MAX_TASK sem_union.val = MAX_TASK; if (semctl(sem_id, 2, SETVAL, sem_union) == -1) { perror("semctl full failed"); return -1; } return sem_id; } // sembuf结构体变量的sem_op的值表示是P操作还是V操作 // - P操作(减1):sem_op = -1 // - V操作(加1):sem_op = 1 // struct sembuf { // unsigned short sem_num; // 信号量集中的信号量编号(从0开始) // short sem_op; // 操作类型:-1=P操作(减1),1=V操作(加1),0=等待信号量值为0 // short sem_flg; // 操作标志:SEM_UNDO/IPC_NOWAIT等 // }; // int semop(int semid, struct sembuf *sops, size_t nsops); // 参数 类型 含义与关键说明 // semid int 信号量集的 ID(由 semget 函数返回,唯一标识一个信号量集)。 // sops struct sembuf* 指向 信号量操作结构体数组 的指针,每个元素定义一个对信号量的操作(如 P/V)。 // nsops size_t 操作数组 sops 的长度(即要执行的信号量操作数量);最常用值为 1(单操作,如仅 P 或仅 V)。 // 信号量P操作(减1,阻塞) void sem_p(int sem_id, int sem_idx) { struct sembuf sem_buf = {sem_idx, -1, SEM_UNDO}; if (semop(sem_id, &sem_buf, 1) == -1) { perror("sem_p failed"); } } // 信号量V操作(加1,唤醒) void sem_v(int sem_id, int sem_idx) { struct sembuf sem_buf = {sem_idx, 1, SEM_UNDO}; if (semop(sem_id, &sem_buf, 1) == -1) { perror("sem_v failed"); } } // ====================== 共享内存操作封装 ====================== // 创建/挂载共享内存(父进程用) SharedMem* shm_create() { // 1. 创建共享内存 // int shmget(key_t key, size_t size, int shmflg); // 参数 类型 核心作用 // key key_t 共享内存段的「唯一键值」,用于标识系统中唯一的共享内存资源: // ① 手动指定(如 IPC_PRIVATE,仅当前进程亲缘关系可用); // ② 通过 ftok() 生成(跨无亲缘进程通信)。 // size size_t 共享内存段的大小(字节): // ① 创建新段时:需指定非 0 值(会按系统「页大小」对齐,如 4096 字节); // ② 获取已存在段时:可设为 0(无需匹配大小)。 // shmflg int 标志位(权限 + 行为),组合使用(按位或 ` ):
▷ 权限位:如 0666(同文件权限,r/w/x 对应6=rw);
▷ 行为位:
- IPC_CREAT:无则创建,有则获取;
- IPC_EXCL:仅与 IPC_CREAT搭配,若段已存在则报错(避免重复创建);
-IPC_EXCL IPC_CREAT`:保证创建「全新」共享内存段。 int shm_id = shmget(SHM_KEY, sizeof(SharedMem), IPC_CREAT | 0666); if (shm_id == -1) { perror("shmget failed"); return NULL; } // shmat函数作用是将shmget创建/获取的共享内存段挂载(关联)到当前进程的地址空间,是共享内存从“系统资源”变成进程可读写内存的关键步骤 // void *shmat(int shmid, const void *shmaddr, int shmflg); // 执行结果 返回值含义 // 成功 返回 void* 类型的指针,指向该共享内存段挂载到当前进程地址空间的起始虚拟地址; // 失败 返回 (void *) -1(注意:不是普通的 int 型 -1,是强制转换为 void* 类型的 -1),同时设置全局变量 errno 标识错误原因。 // 参数 类型 核心作用 // shmid int 由 shmget 返回的共享内存标识符,唯一标识要挂载的共享内存段。 // shmaddr const void* 指定共享内存挂载到进程地址空间的起始地址: // ✅ 推荐设为 NULL:由系统自动分配合适的地址(最常用,无需关心具体地址); // ❌ 非 NULL:需结合 shmflg 的 SHM_RND 标志,否则地址必须严格对齐到系统页大小(不推荐手动指定,易出错)。 // shmflg int 挂载标志(核心是权限控制): // ▷ 0:默认值,以读写权限挂载(需 shmget 时权限匹配); // ▷ SHM_RDONLY:以只读权限挂载(仅能读,不能写); // ▷ SHM_RND:仅配合非 NULL 的 shmaddr 使用,将地址向下对齐到 SHMLBA(共享内存低边界地址)的整数倍。 // 2. 挂载共享内存到进程地址空间 SharedMem *shm = (SharedMem*)shmat(shm_id, NULL, 0); if (shm == (void*)-1) { perror("shmat failed"); return NULL; } // 3. 初始化共享内存 memset(shm, 0, sizeof(SharedMem)); shm->front = 0; shm->rear = 0; // 为什么此处要创建三个信号量 // 信号量类型 索引 初始值 作用 // 互斥信号量 0 1 保护共享内存的互斥访问 // 空信号量 1 0 表示队列中的任务数量 // 满信号量 2 MAX_TASK 表示队列中的空闲位置数量 // 互斥信号量只能保证同时只有一个进程/线程访问共享内存,而空信号量和满信号量可以确保共享内存不溢出和队列为空时不获取队列的任务 // ### 1. 生产者在队列满时继续生产 // 如果只有互斥信号量,当队列已满时,生产者仍然可以获取互斥锁并尝试向队列添加任务,这会导致: // - 队列溢出(环形队列可能覆盖旧任务) // - 数据丢失 // - 程序逻辑错误 // ### 2. 消费者在队列空时继续消费 // 如果只有互斥信号量,当队列为空时,消费者仍然可以获取互斥锁并尝试从队列获取任务,这会导致: // - 读取无效任务数据 // - 程序崩溃或异常行为 shm->sem_id = sem_init(SEM_KEY); // 初始化信号量 if (shm->sem_id == -1) { shmdt(shm); return NULL; } return shm; } // 挂载已存在的共享内存(子进程用) SharedMem* shm_attach() { int shm_id = shmget(SHM_KEY, sizeof(SharedMem), 0666); if (shm_id == -1) { perror("shmget attach failed"); return NULL; } SharedMem *shm = (SharedMem*)shmat(shm_id, NULL, 0); if (shm == (void*)-1) { perror("shmat attach failed"); return NULL; } return shm; } // 仅分离共享内存(子进程用,不删除内核资源) void shm_detach_only(SharedMem *shm) { if (shm == NULL) return; shmdt(shm); // 仅分离,不删除 } // 销毁共享内存(父进程用,删除内核资源) void shm_destroy(SharedMem *shm) { if (shm == NULL) return; // 1. 分离共享内存 shmdt(shm); // 2. 删除共享内存(检查资源是否存在) int shm_id = shmget(SHM_KEY, sizeof(SharedMem), 0666); if (shm_id != -1) { shmctl(shm_id, IPC_RMID, NULL); } // 3. 删除信号量(检查资源是否存在) if (shm->sem_id != -1) { semctl(shm->sem_id, 0, IPC_RMID); } } // ====================== 线程池核心逻辑 ====================== // 线程处理函数:循环取任务执行,增加防空任务/退出检查 void* thread_worker(void *arg) { ThreadPool *pool = (ThreadPool*)arg; SharedMem *shm = pool->shm; while (pool->pool_running) { // 前置检查:线程池已停止则直接退出 if (!pool->pool_running) break; // 1. P操作空信号量(队列空则阻塞) sem_p(shm->sem_id, 1); // 唤醒后再次检查:线程池已停止则归还信号量并退出 if (!pool->pool_running) { sem_v(shm->sem_id, 1); // 归还空信号量,避免死锁 break; } // 2. P操作互斥信号量(保护队列读写) sem_p(shm->sem_id, 0); // 最后检查:线程池已停止则归还所有信号量并退出 if (!pool->pool_running) { sem_v(shm->sem_id, 0); // 归还互斥信号量 sem_v(shm->sem_id, 1); // 归还空信号量 break; } // 3. 检查任务是否为空(避免读取无效任务) Task task = shm->task_queue[shm->front]; if (task.task_id == 0) { sem_v(shm->sem_id, 0); // 归还互斥信号量 continue; } // 4. 取出任务,更新队列头 shm->front = (shm->front + 1) % MAX_TASK; printf("[线程%ld] 取出任务%d:%s\n", pthread_self(), task.task_id, task.task_content); // 5. V操作互斥信号量 sem_v(shm->sem_id, 0); // 6. V操作满信号量(队列有空位) sem_v(shm->sem_id, 2); // 7. 执行任务(模拟耗时操作) task.status = TASK_RUNNING; sleep(1); task.status = TASK_DONE; printf("[线程%ld] 完成任务%d,状态:%d\n", pthread_self(), task.task_id, task.status); } pthread_exit(NULL); } // 初始化线程池 int thread_pool_init(ThreadPool *pool, SharedMem *shm) { pool->shm = shm; pool->pool_running = 1; // 创建线程池中的线程 for (int i = 0; i < THREAD_POOL_SIZE; i++) { if (pthread_create(&pool->tid[i], NULL, thread_worker, pool) != 0) { perror("pthread_create failed"); return -1; } } return 0; } // 销毁线程池(优雅退出) void thread_pool_destroy(ThreadPool *pool) { if (pool == NULL) return; // 标记线程池停止 pool->pool_running = 0; // 唤醒所有阻塞的线程(避免线程卡死) for (int i = 0; i < THREAD_POOL_SIZE; i++) { sem_v(pool->shm->sem_id, 1); } // 等待所有线程退出 for (int i = 0; i < THREAD_POOL_SIZE; i++) { pthread_join(pool->tid[i], NULL); } } // ====================== 任务提交函数 ====================== int task_submit(SharedMem *shm, int task_id, const char *content) { // 1. P操作满信号量(队列满则阻塞) sem_p(shm->sem_id, 2); // 2. P操作互斥信号量 sem_p(shm->sem_id, 0); // 3. 写入任务到队列 Task task = {0}; task.task_id = task_id; strncpy(task.task_content, content, sizeof(task.task_content)-1); task.status = TASK_INIT; shm->task_queue[shm->rear] = task; shm->rear = (shm->rear + 1) % MAX_TASK; printf("[提交进程] 提交任务%d:%s\n", task_id, content); // 4. V操作互斥信号量 sem_v(shm->sem_id, 0); // 5. V操作空信号量(唤醒线程池) sem_v(shm->sem_id, 1); return 0; } // ====================== 带超时的管道读函数 ====================== int pipe_read_with_timeout(int fd, char *buf, int len, int timeout_sec) { fd_set read_fds; struct timeval timeout; // 初始化fd集合 FD_ZERO(&read_fds); FD_SET(fd, &read_fds); // 设置超时时间 timeout.tv_sec = timeout_sec; timeout.tv_usec = 0; // 监听fd是否可读 int ret = select(fd + 1, &read_fds, NULL, NULL, &timeout); if (ret < 0) { perror("select failed"); return -1; } else if (ret == 0) { fprintf(stderr, "[子进程] 管道读超时(%d秒),父进程未发送同步信号\n", timeout_sec); return 0; } else { if (FD_ISSET(fd, &read_fds)) { return read(fd, buf, len); } } return -1; } // ====================== 主函数(进程分支) ====================== int main() { // 创建管道(父子进程同步用) // pipe_fd[0]读,子进程只读这个fd,pipe_fd[1]写,父进程只写这个fd // 使用管道确保在共享内存创建完成后子进程再挂载共享内存 int pipe_fd[2]; if (pipe(pipe_fd) == -1) { perror("pipe create failed"); return -1; } // 创建子进程 pid_t pid = fork(); if (pid == -1) { perror("fork failed"); return -1; } // 子进程:工作进程(线程池 + 消费任务),子进程多线程处理父进程提交过来的任务 if (pid == 0) { // 关闭管道写端(子进程只读) close(pipe_fd[1]); char buf[2]; // 带超时读取父进程的同步信号 int read_ret = pipe_read_with_timeout(pipe_fd[0], buf, 2, PIPE_TIMEOUT_SEC); if (read_ret <= 0) { close(pipe_fd[0]); return -1; } printf("[子进程] 收到父进程同步信号:%s\n", buf); // 挂载共享内存 printf("[工作进程] 启动,创建线程池...\n"); SharedMem *shm = shm_attach(); if (shm == NULL) { close(pipe_fd[0]); return -1; } // 初始化线程池 ThreadPool pool = {0}; if (thread_pool_init(&pool, shm) == -1) { shm_detach_only(shm); close(pipe_fd[0]); return -1; } // 运行10秒后停止线程池 sleep(10); printf("[工作进程] 停止线程池...\n"); thread_pool_destroy(&pool); // 子进程仅分离共享内存,不销毁内核资源 shm_detach_only(shm); close(pipe_fd[0]); printf("[工作进程] 退出\n"); return 0; } // 父进程:任务提交进程,创建任务。父进程单线程提交任务 else { // 关闭管道读端(父进程只写) close(pipe_fd[0]); // 创建共享内存 printf("[提交进程] 启动,创建共享内存...\n"); SharedMem *shm = shm_create(); if (shm == NULL) { close(pipe_fd[1]); return -1; } // 发送同步信号(告知子进程共享内存已创建) write(pipe_fd[1], "ok", 2); printf("[提交进程] 已发送同步信号:ok\n"); // 提交5个测试任务 for (int i = 1; i <= 5; i++) { char content[64]; snprintf(content, sizeof(content), "测试任务_%d", i); task_submit(shm, i, content); sleep(1); } // 等待子进程退出 wait(NULL); // 父进程统一销毁共享内存和信号量 shm_destroy(shm); close(pipe_fd[1]); printf("[提交进程] 退出\n"); return 0; } }