diff --git a/shared_memory/shared_memory.c b/shared_memory/shared_memory.c new file mode 100755 index 0000000..b0473b6 --- /dev/null +++ b/shared_memory/shared_memory.c @@ -0,0 +1,318 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include // 新增:select头文件 +#include // wait头文件 + +#define MAX_TASK 10 +#define THREAD_POOL_SIZE 4 +#define SHM_KEY 0x1234 +#define SEM_KEY 0x5678 +#define PIPE_TIMEOUT_SEC 5 // 新增:管道超时时间(秒) + +// 任务状态枚举 +typedef enum { + TASK_INIT = 0, + TASK_RUNNING, + TASK_DONE +} TaskStatus; + +// 共享内存中的任务结构体 +typedef struct { + int task_id; + char task_content[64]; + TaskStatus status; +} Task; + +// 共享内存核心结构体 +typedef struct { + Task task_queue[MAX_TASK]; + int front; + int rear; + struct sembuf mutex_sem; + struct sembuf empty_sem; + struct sembuf full_sem; + int sem_id; +} SharedMem; + +// 线程池参数 +typedef struct { + SharedMem *shm; + pthread_t tid[THREAD_POOL_SIZE]; + int pool_running; +} ThreadPool; + +// 信号量操作封装(不变) +int sem_init(int sem_key) { + int sem_id = semget(sem_key, 3, IPC_CREAT | 0666); + if (sem_id == -1) { + perror("semget failed"); + return -1; + } + union semun { + int val; + struct semid_ds *buf; + unsigned short *array; + } sem_union; + + sem_union.val = 1; + if (semctl(sem_id, 0, SETVAL, sem_union) == -1) { perror("semctl mutex failed"); return -1; } + sem_union.val = 0; + if (semctl(sem_id, 1, SETVAL, sem_union) == -1) { perror("semctl empty failed"); return -1; } + sem_union.val = MAX_TASK; + if (semctl(sem_id, 2, SETVAL, sem_union) == -1) { perror("semctl full failed"); return -1; } + + return sem_id; +} + +void sem_p(int sem_id, int sem_idx) { + struct sembuf sem_buf = {sem_idx, -1, SEM_UNDO}; + if (semop(sem_id, &sem_buf, 1) == -1) { + perror("sem_p failed"); + } +} + +void sem_v(int sem_id, int sem_idx) { + struct sembuf sem_buf = {sem_idx, 1, SEM_UNDO}; + if (semop(sem_id, &sem_buf, 1) == -1) { + perror("sem_v failed"); + } +} + +// 共享内存操作封装(不变) +SharedMem* shm_create() { + int shm_id = shmget(SHM_KEY, sizeof(SharedMem), IPC_CREAT | 0666); + if (shm_id == -1) { + perror("shmget failed"); + return NULL; + } + SharedMem *shm = (SharedMem*)shmat(shm_id, NULL, 0); + if (shm == (void*)-1) { + perror("shmat failed"); + return NULL; + } + memset(shm, 0, sizeof(SharedMem)); + shm->front = 0; + shm->rear = 0; + shm->sem_id = sem_init(SEM_KEY); + if (shm->sem_id == -1) { + shmdt(shm); + return NULL; + } + return shm; +} + +SharedMem* shm_attach() { + int shm_id = shmget(SHM_KEY, sizeof(SharedMem), 0666); + if (shm_id == -1) { + perror("shmget attach failed"); + return NULL; + } + SharedMem *shm = (SharedMem*)shmat(shm_id, NULL, 0); + if (shm == (void*)-1) { + perror("shmat attach failed"); + return NULL; + } + return shm; +} + +void shm_destroy(SharedMem *shm) { + shmdt(shm); + int shm_id = shmget(SHM_KEY, sizeof(SharedMem), 0666); + shmctl(shm_id, IPC_RMID, NULL); + semctl(shm->sem_id, 0, IPC_RMID); +} + +// 线程处理函数(不变) +void* thread_worker(void *arg) { + ThreadPool *pool = (ThreadPool*)arg; + SharedMem *shm = pool->shm; + + while (pool->pool_running) { + sem_p(shm->sem_id, 1); + sem_p(shm->sem_id, 0); + + Task task = shm->task_queue[shm->front]; + shm->front = (shm->front + 1) % MAX_TASK; + printf("[线程%ld] 取出任务%d:%s\n", pthread_self(), task.task_id, task.task_content); + + sem_v(shm->sem_id, 0); + sem_v(shm->sem_id, 2); + + task.status = TASK_RUNNING; + sleep(1); + task.status = TASK_DONE; + printf("[线程%ld] 完成任务%d,状态:%d\n", pthread_self(), task.task_id, task.status); + } + pthread_exit(NULL); +} + +// 线程池初始化(不变) +int thread_pool_init(ThreadPool *pool, SharedMem *shm) { + pool->shm = shm; + pool->pool_running = 1; + for (int i = 0; i < THREAD_POOL_SIZE; i++) { + if (pthread_create(&pool->tid[i], NULL, thread_worker, pool) != 0) { + perror("pthread_create failed"); + return -1; + } + } + return 0; +} + +// 线程池销毁(不变) +void thread_pool_destroy(ThreadPool *pool) { + pool->pool_running = 0; + for (int i = 0; i < THREAD_POOL_SIZE; i++) { + sem_v(pool->shm->sem_id, 1); + } + for (int i = 0; i < THREAD_POOL_SIZE; i++) { + pthread_join(pool->tid[i], NULL); + } +} + +// 任务提交(不变) +int task_submit(SharedMem *shm, int task_id, const char *content) { + sem_p(shm->sem_id, 2); + sem_p(shm->sem_id, 0); + + Task task = {0}; + task.task_id = task_id; + strncpy(task.task_content, content, sizeof(task.task_content)-1); + task.status = TASK_INIT; + shm->task_queue[shm->rear] = task; + shm->rear = (shm->rear + 1) % MAX_TASK; + printf("[提交进程] 提交任务%d:%s\n", task_id, content); + + sem_v(shm->sem_id, 0); + sem_v(shm->sem_id, 1); + + return 0; +} + +// 新增:带超时的管道读函数 +int pipe_read_with_timeout(int fd, char *buf, int len, int timeout_sec) { + fd_set read_fds; + struct timeval timeout; + + // 1. 初始化fd集合:只监听管道读端fd + FD_ZERO(&read_fds); + FD_SET(fd, &read_fds); + + // 2. 设置超时时间(秒+微秒) + timeout.tv_sec = timeout_sec; + timeout.tv_usec = 0; // 微秒,设为0表示整秒超时 + + // 3. 调用select:监听fd是否可读,超时返回0 + // select第一个参数:最大fd + 1(fd从0开始,所以+1) + int ret = select(fd + 1, &read_fds, NULL, NULL, &timeout); + if (ret < 0) { + // select调用失败(比如fd无效) + perror("select failed"); + return -1; + } else if (ret == 0) { + // 超时:无数据可读 + fprintf(stderr, "[子进程] 管道读超时(%d秒),父进程未发送同步信号\n", timeout_sec); + return 0; + } else { + // 有数据可读:执行read + if (FD_ISSET(fd, &read_fds)) { // 确认fd确实可读 + return read(fd, buf, len); + } + } + return -1; +} + +int main() { + // 新增:创建管道(用于父子进程同步) + int pipe_fd[2]; + if (pipe(pipe_fd) == -1) { + perror("pipe create failed"); + return -1; + } + + pid_t pid = fork(); + if (pid == -1) { + perror("fork failed"); + return -1; + } + + // 子进程:工作进程(带管道超时同步) + if (pid == 0) { + // 关闭管道写端(子进程只读) + close(pipe_fd[1]); + + char buf[2]; + // 新增:调用带超时的管道读函数 + int read_ret = pipe_read_with_timeout(pipe_fd[0], buf, 2, PIPE_TIMEOUT_SEC); + if (read_ret <= 0) { + // 超时或读失败,直接退出 + close(pipe_fd[0]); + return -1; + } + printf("[子进程] 收到父进程同步信号:%s\n", buf); + + // 读成功后,挂载共享内存 + printf("[工作进程] 启动,创建线程池...\n"); + SharedMem *shm = shm_attach(); + if (shm == NULL) { + close(pipe_fd[0]); + return -1; + } + ThreadPool pool = {0}; + if (thread_pool_init(&pool, shm) == -1) { + shm_destroy(shm); + close(pipe_fd[0]); + return -1; + } + + sleep(10); + printf("[工作进程] 停止线程池...\n"); + thread_pool_destroy(&pool); + shm_destroy(shm); + // 关闭管道读端 + close(pipe_fd[0]); + printf("[工作进程] 退出\n"); + return 0; + } + + // 父进程:任务提交进程 + else { + // 关闭管道读端(父进程只写) + close(pipe_fd[0]); + + // 创建共享内存 + printf("[提交进程] 启动,创建共享内存...\n"); + SharedMem *shm = shm_create(); + if (shm == NULL) { + close(pipe_fd[1]); + return -1; + } + + // 共享内存创建完成,写入同步信号 + write(pipe_fd[1], "ok", 2); + printf("[提交进程] 已发送同步信号:ok\n"); + + // 提交测试任务 + for (int i = 1; i <= 5; i++) { + char content[64]; + snprintf(content, sizeof(content), "测试任务_%d", i); + task_submit(shm, i, content); + sleep(1); + } + + // 等待子进程退出 + wait(NULL); + shm_destroy(shm); + // 关闭管道写端 + close(pipe_fd[1]); + printf("[提交进程] 退出\n"); + return 0; + } +} \ No newline at end of file