diff --git a/shared_memory/shared_memory.c b/shared_memory/shared_memory.c index b0473b6..c5fba43 100755 --- a/shared_memory/shared_memory.c +++ b/shared_memory/shared_memory.c @@ -7,70 +7,84 @@ #include #include #include -#include // 新增:select头文件 -#include // wait头文件 +#include +#include +#include -#define MAX_TASK 10 -#define THREAD_POOL_SIZE 4 -#define SHM_KEY 0x1234 -#define SEM_KEY 0x5678 -#define PIPE_TIMEOUT_SEC 5 // 新增:管道超时时间(秒) +// ====================== 全局配置 ====================== +#define MAX_TASK 10 // 共享任务队列最大长度 +#define THREAD_POOL_SIZE 4 // 线程池大小 +#define SHM_KEY 0x1234 // 共享内存键值 +#define SEM_KEY 0x5678 // 信号量键值 +#define PIPE_TIMEOUT_SEC 5 // 管道超时时间(秒) // 任务状态枚举 typedef enum { - TASK_INIT = 0, - TASK_RUNNING, - TASK_DONE + TASK_INIT = 0, // 初始态 + TASK_RUNNING, // 执行中 + TASK_DONE // 完成态 } TaskStatus; // 共享内存中的任务结构体 typedef struct { int task_id; - char task_content[64]; - TaskStatus status; + char task_content[64]; // 任务内容 + TaskStatus status; // 任务状态 } Task; -// 共享内存核心结构体 +// 共享内存核心结构体(包含任务队列 + 信号量) typedef struct { - Task task_queue[MAX_TASK]; - int front; - int rear; - struct sembuf mutex_sem; - struct sembuf empty_sem; - struct sembuf full_sem; - int sem_id; + Task task_queue[MAX_TASK]; // 共享任务队列 + int front; // 队列头 + int rear; // 队列尾 + int sem_id; // 信号量集ID } SharedMem; // 线程池参数 typedef struct { - SharedMem *shm; - pthread_t tid[THREAD_POOL_SIZE]; - int pool_running; + SharedMem *shm; // 指向共享内存的指针 + pthread_t tid[THREAD_POOL_SIZE]; // 线程ID数组 + int pool_running; // 线程池运行标记(1:运行,0:停止) } ThreadPool; -// 信号量操作封装(不变) +// ====================== 信号量操作封装 ====================== +// 信号量初始化(创建/获取信号量集) int sem_init(int sem_key) { int sem_id = semget(sem_key, 3, IPC_CREAT | 0666); if (sem_id == -1) { perror("semget failed"); return -1; } + union semun { int val; struct semid_ds *buf; unsigned short *array; } sem_union; + // 互斥信号量(索引0):初始值1 sem_union.val = 1; - if (semctl(sem_id, 0, SETVAL, sem_union) == -1) { perror("semctl mutex failed"); return -1; } + if (semctl(sem_id, 0, SETVAL, sem_union) == -1) { + perror("semctl mutex failed"); + return -1; + } + // 空信号量(索引1):初始值0(队列初始空) sem_union.val = 0; - if (semctl(sem_id, 1, SETVAL, sem_union) == -1) { perror("semctl empty failed"); return -1; } + if (semctl(sem_id, 1, SETVAL, sem_union) == -1) { + perror("semctl empty failed"); + return -1; + } + // 满信号量(索引2):初始值MAX_TASK sem_union.val = MAX_TASK; - if (semctl(sem_id, 2, SETVAL, sem_union) == -1) { perror("semctl full failed"); return -1; } + if (semctl(sem_id, 2, SETVAL, sem_union) == -1) { + perror("semctl full failed"); + return -1; + } return sem_id; } +// 信号量P操作(减1,阻塞) void sem_p(int sem_id, int sem_idx) { struct sembuf sem_buf = {sem_idx, -1, SEM_UNDO}; if (semop(sem_id, &sem_buf, 1) == -1) { @@ -78,6 +92,7 @@ void sem_p(int sem_id, int sem_idx) { } } +// 信号量V操作(加1,唤醒) void sem_v(int sem_id, int sem_idx) { struct sembuf sem_buf = {sem_idx, 1, SEM_UNDO}; if (semop(sem_id, &sem_buf, 1) == -1) { @@ -85,22 +100,26 @@ void sem_v(int sem_id, int sem_idx) { } } -// 共享内存操作封装(不变) +// ====================== 共享内存操作封装 ====================== +// 创建/挂载共享内存(父进程用) SharedMem* shm_create() { + // 1. 创建共享内存 int shm_id = shmget(SHM_KEY, sizeof(SharedMem), IPC_CREAT | 0666); if (shm_id == -1) { perror("shmget failed"); return NULL; } + // 2. 挂载共享内存到进程地址空间 SharedMem *shm = (SharedMem*)shmat(shm_id, NULL, 0); if (shm == (void*)-1) { perror("shmat failed"); return NULL; } + // 3. 初始化共享内存 memset(shm, 0, sizeof(SharedMem)); shm->front = 0; shm->rear = 0; - shm->sem_id = sem_init(SEM_KEY); + shm->sem_id = sem_init(SEM_KEY); // 初始化信号量 if (shm->sem_id == -1) { shmdt(shm); return NULL; @@ -108,6 +127,7 @@ SharedMem* shm_create() { return shm; } +// 挂载已存在的共享内存(子进程用) SharedMem* shm_attach() { int shm_id = shmget(SHM_KEY, sizeof(SharedMem), 0666); if (shm_id == -1) { @@ -122,41 +142,90 @@ SharedMem* shm_attach() { return shm; } +// 仅分离共享内存(子进程用,不删除内核资源) +void shm_detach_only(SharedMem *shm) { + if (shm == NULL) return; + shmdt(shm); // 仅分离,不删除 +} + +// 销毁共享内存(父进程用,删除内核资源) void shm_destroy(SharedMem *shm) { + if (shm == NULL) return; + + // 1. 分离共享内存 shmdt(shm); + + // 2. 删除共享内存(检查资源是否存在) int shm_id = shmget(SHM_KEY, sizeof(SharedMem), 0666); - shmctl(shm_id, IPC_RMID, NULL); - semctl(shm->sem_id, 0, IPC_RMID); + if (shm_id != -1) { + shmctl(shm_id, IPC_RMID, NULL); + } + + // 3. 删除信号量(检查资源是否存在) + if (shm->sem_id != -1) { + semctl(shm->sem_id, 0, IPC_RMID); + } } -// 线程处理函数(不变) +// ====================== 线程池核心逻辑 ====================== +// 线程处理函数:循环取任务执行,增加防空任务/退出检查 void* thread_worker(void *arg) { ThreadPool *pool = (ThreadPool*)arg; SharedMem *shm = pool->shm; while (pool->pool_running) { + // 前置检查:线程池已停止则直接退出 + if (!pool->pool_running) break; + + // 1. P操作空信号量(队列空则阻塞) sem_p(shm->sem_id, 1); + // 唤醒后再次检查:线程池已停止则归还信号量并退出 + if (!pool->pool_running) { + sem_v(shm->sem_id, 1); // 归还空信号量,避免死锁 + break; + } + + // 2. P操作互斥信号量(保护队列读写) sem_p(shm->sem_id, 0); + // 最后检查:线程池已停止则归还所有信号量并退出 + if (!pool->pool_running) { + sem_v(shm->sem_id, 0); // 归还互斥信号量 + sem_v(shm->sem_id, 1); // 归还空信号量 + break; + } + // 3. 检查任务是否为空(避免读取无效任务) Task task = shm->task_queue[shm->front]; + if (task.task_id == 0) { + sem_v(shm->sem_id, 0); // 归还互斥信号量 + continue; + } + + // 4. 取出任务,更新队列头 shm->front = (shm->front + 1) % MAX_TASK; printf("[线程%ld] 取出任务%d:%s\n", pthread_self(), task.task_id, task.task_content); + // 5. V操作互斥信号量 sem_v(shm->sem_id, 0); + // 6. V操作满信号量(队列有空位) sem_v(shm->sem_id, 2); + // 7. 执行任务(模拟耗时操作) task.status = TASK_RUNNING; - sleep(1); + sleep(1); task.status = TASK_DONE; printf("[线程%ld] 完成任务%d,状态:%d\n", pthread_self(), task.task_id, task.status); } + pthread_exit(NULL); } -// 线程池初始化(不变) +// 初始化线程池 int thread_pool_init(ThreadPool *pool, SharedMem *shm) { pool->shm = shm; pool->pool_running = 1; + + // 创建线程池中的线程 for (int i = 0; i < THREAD_POOL_SIZE; i++) { if (pthread_create(&pool->tid[i], NULL, thread_worker, pool) != 0) { perror("pthread_create failed"); @@ -166,22 +235,32 @@ int thread_pool_init(ThreadPool *pool, SharedMem *shm) { return 0; } -// 线程池销毁(不变) +// 销毁线程池(优雅退出) void thread_pool_destroy(ThreadPool *pool) { + if (pool == NULL) return; + + // 标记线程池停止 pool->pool_running = 0; + + // 唤醒所有阻塞的线程(避免线程卡死) for (int i = 0; i < THREAD_POOL_SIZE; i++) { sem_v(pool->shm->sem_id, 1); } + + // 等待所有线程退出 for (int i = 0; i < THREAD_POOL_SIZE; i++) { pthread_join(pool->tid[i], NULL); } } -// 任务提交(不变) +// ====================== 任务提交函数 ====================== int task_submit(SharedMem *shm, int task_id, const char *content) { + // 1. P操作满信号量(队列满则阻塞) sem_p(shm->sem_id, 2); + // 2. P操作互斥信号量 sem_p(shm->sem_id, 0); + // 3. 写入任务到队列 Task task = {0}; task.task_id = task_id; strncpy(task.task_content, content, sizeof(task.task_content)-1); @@ -190,93 +269,96 @@ int task_submit(SharedMem *shm, int task_id, const char *content) { shm->rear = (shm->rear + 1) % MAX_TASK; printf("[提交进程] 提交任务%d:%s\n", task_id, content); + // 4. V操作互斥信号量 sem_v(shm->sem_id, 0); + // 5. V操作空信号量(唤醒线程池) sem_v(shm->sem_id, 1); return 0; } -// 新增:带超时的管道读函数 +// ====================== 带超时的管道读函数 ====================== int pipe_read_with_timeout(int fd, char *buf, int len, int timeout_sec) { fd_set read_fds; struct timeval timeout; - // 1. 初始化fd集合:只监听管道读端fd + // 初始化fd集合 FD_ZERO(&read_fds); FD_SET(fd, &read_fds); - // 2. 设置超时时间(秒+微秒) + // 设置超时时间 timeout.tv_sec = timeout_sec; - timeout.tv_usec = 0; // 微秒,设为0表示整秒超时 + timeout.tv_usec = 0; - // 3. 调用select:监听fd是否可读,超时返回0 - // select第一个参数:最大fd + 1(fd从0开始,所以+1) + // 监听fd是否可读 int ret = select(fd + 1, &read_fds, NULL, NULL, &timeout); if (ret < 0) { - // select调用失败(比如fd无效) perror("select failed"); return -1; } else if (ret == 0) { - // 超时:无数据可读 fprintf(stderr, "[子进程] 管道读超时(%d秒),父进程未发送同步信号\n", timeout_sec); return 0; } else { - // 有数据可读:执行read - if (FD_ISSET(fd, &read_fds)) { // 确认fd确实可读 + if (FD_ISSET(fd, &read_fds)) { return read(fd, buf, len); } } return -1; } +// ====================== 主函数(进程分支) ====================== int main() { - // 新增:创建管道(用于父子进程同步) + // 创建管道(父子进程同步用) int pipe_fd[2]; if (pipe(pipe_fd) == -1) { perror("pipe create failed"); return -1; } + // 创建子进程 pid_t pid = fork(); if (pid == -1) { perror("fork failed"); return -1; } - // 子进程:工作进程(带管道超时同步) + // 子进程:工作进程(线程池 + 消费任务) if (pid == 0) { // 关闭管道写端(子进程只读) close(pipe_fd[1]); char buf[2]; - // 新增:调用带超时的管道读函数 + // 带超时读取父进程的同步信号 int read_ret = pipe_read_with_timeout(pipe_fd[0], buf, 2, PIPE_TIMEOUT_SEC); if (read_ret <= 0) { - // 超时或读失败,直接退出 close(pipe_fd[0]); return -1; } printf("[子进程] 收到父进程同步信号:%s\n", buf); - // 读成功后,挂载共享内存 + // 挂载共享内存 printf("[工作进程] 启动,创建线程池...\n"); SharedMem *shm = shm_attach(); if (shm == NULL) { close(pipe_fd[0]); return -1; } + + // 初始化线程池 ThreadPool pool = {0}; if (thread_pool_init(&pool, shm) == -1) { - shm_destroy(shm); + shm_detach_only(shm); close(pipe_fd[0]); return -1; } + // 运行10秒后停止线程池 sleep(10); printf("[工作进程] 停止线程池...\n"); thread_pool_destroy(&pool); - shm_destroy(shm); - // 关闭管道读端 + + // 子进程仅分离共享内存,不销毁内核资源 + shm_detach_only(shm); close(pipe_fd[0]); printf("[工作进程] 退出\n"); return 0; @@ -295,11 +377,11 @@ int main() { return -1; } - // 共享内存创建完成,写入同步信号 + // 发送同步信号(告知子进程共享内存已创建) write(pipe_fd[1], "ok", 2); printf("[提交进程] 已发送同步信号:ok\n"); - // 提交测试任务 + // 提交5个测试任务 for (int i = 1; i <= 5; i++) { char content[64]; snprintf(content, sizeof(content), "测试任务_%d", i); @@ -309,8 +391,9 @@ int main() { // 等待子进程退出 wait(NULL); + + // 父进程统一销毁共享内存和信号量 shm_destroy(shm); - // 关闭管道写端 close(pipe_fd[1]); printf("[提交进程] 退出\n"); return 0;