1 changed files with 318 additions and 0 deletions
@ -0,0 +1,318 @@ |
|||||
|
#include <stdio.h> |
||||
|
#include <stdlib.h> |
||||
|
#include <unistd.h> |
||||
|
#include <pthread.h> |
||||
|
#include <sys/ipc.h> |
||||
|
#include <sys/shm.h> |
||||
|
#include <sys/sem.h> |
||||
|
#include <string.h> |
||||
|
#include <errno.h> |
||||
|
#include <sys/select.h> // 新增:select头文件 |
||||
|
#include <sys/wait.h> // wait头文件 |
||||
|
|
||||
|
#define MAX_TASK 10 |
||||
|
#define THREAD_POOL_SIZE 4 |
||||
|
#define SHM_KEY 0x1234 |
||||
|
#define SEM_KEY 0x5678 |
||||
|
#define PIPE_TIMEOUT_SEC 5 // 新增:管道超时时间(秒)
|
||||
|
|
||||
|
// 任务状态枚举
|
||||
|
typedef enum { |
||||
|
TASK_INIT = 0, |
||||
|
TASK_RUNNING, |
||||
|
TASK_DONE |
||||
|
} TaskStatus; |
||||
|
|
||||
|
// 共享内存中的任务结构体
|
||||
|
typedef struct { |
||||
|
int task_id; |
||||
|
char task_content[64]; |
||||
|
TaskStatus status; |
||||
|
} Task; |
||||
|
|
||||
|
// 共享内存核心结构体
|
||||
|
typedef struct { |
||||
|
Task task_queue[MAX_TASK]; |
||||
|
int front; |
||||
|
int rear; |
||||
|
struct sembuf mutex_sem; |
||||
|
struct sembuf empty_sem; |
||||
|
struct sembuf full_sem; |
||||
|
int sem_id; |
||||
|
} SharedMem; |
||||
|
|
||||
|
// 线程池参数
|
||||
|
typedef struct { |
||||
|
SharedMem *shm; |
||||
|
pthread_t tid[THREAD_POOL_SIZE]; |
||||
|
int pool_running; |
||||
|
} ThreadPool; |
||||
|
|
||||
|
// 信号量操作封装(不变)
|
||||
|
int sem_init(int sem_key) { |
||||
|
int sem_id = semget(sem_key, 3, IPC_CREAT | 0666); |
||||
|
if (sem_id == -1) { |
||||
|
perror("semget failed"); |
||||
|
return -1; |
||||
|
} |
||||
|
union semun { |
||||
|
int val; |
||||
|
struct semid_ds *buf; |
||||
|
unsigned short *array; |
||||
|
} sem_union; |
||||
|
|
||||
|
sem_union.val = 1; |
||||
|
if (semctl(sem_id, 0, SETVAL, sem_union) == -1) { perror("semctl mutex failed"); return -1; } |
||||
|
sem_union.val = 0; |
||||
|
if (semctl(sem_id, 1, SETVAL, sem_union) == -1) { perror("semctl empty failed"); return -1; } |
||||
|
sem_union.val = MAX_TASK; |
||||
|
if (semctl(sem_id, 2, SETVAL, sem_union) == -1) { perror("semctl full failed"); return -1; } |
||||
|
|
||||
|
return sem_id; |
||||
|
} |
||||
|
|
||||
|
void sem_p(int sem_id, int sem_idx) { |
||||
|
struct sembuf sem_buf = {sem_idx, -1, SEM_UNDO}; |
||||
|
if (semop(sem_id, &sem_buf, 1) == -1) { |
||||
|
perror("sem_p failed"); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
void sem_v(int sem_id, int sem_idx) { |
||||
|
struct sembuf sem_buf = {sem_idx, 1, SEM_UNDO}; |
||||
|
if (semop(sem_id, &sem_buf, 1) == -1) { |
||||
|
perror("sem_v failed"); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
// 共享内存操作封装(不变)
|
||||
|
SharedMem* shm_create() { |
||||
|
int shm_id = shmget(SHM_KEY, sizeof(SharedMem), IPC_CREAT | 0666); |
||||
|
if (shm_id == -1) { |
||||
|
perror("shmget failed"); |
||||
|
return NULL; |
||||
|
} |
||||
|
SharedMem *shm = (SharedMem*)shmat(shm_id, NULL, 0); |
||||
|
if (shm == (void*)-1) { |
||||
|
perror("shmat failed"); |
||||
|
return NULL; |
||||
|
} |
||||
|
memset(shm, 0, sizeof(SharedMem)); |
||||
|
shm->front = 0; |
||||
|
shm->rear = 0; |
||||
|
shm->sem_id = sem_init(SEM_KEY); |
||||
|
if (shm->sem_id == -1) { |
||||
|
shmdt(shm); |
||||
|
return NULL; |
||||
|
} |
||||
|
return shm; |
||||
|
} |
||||
|
|
||||
|
SharedMem* shm_attach() { |
||||
|
int shm_id = shmget(SHM_KEY, sizeof(SharedMem), 0666); |
||||
|
if (shm_id == -1) { |
||||
|
perror("shmget attach failed"); |
||||
|
return NULL; |
||||
|
} |
||||
|
SharedMem *shm = (SharedMem*)shmat(shm_id, NULL, 0); |
||||
|
if (shm == (void*)-1) { |
||||
|
perror("shmat attach failed"); |
||||
|
return NULL; |
||||
|
} |
||||
|
return shm; |
||||
|
} |
||||
|
|
||||
|
void shm_destroy(SharedMem *shm) { |
||||
|
shmdt(shm); |
||||
|
int shm_id = shmget(SHM_KEY, sizeof(SharedMem), 0666); |
||||
|
shmctl(shm_id, IPC_RMID, NULL); |
||||
|
semctl(shm->sem_id, 0, IPC_RMID); |
||||
|
} |
||||
|
|
||||
|
// 线程处理函数(不变)
|
||||
|
void* thread_worker(void *arg) { |
||||
|
ThreadPool *pool = (ThreadPool*)arg; |
||||
|
SharedMem *shm = pool->shm; |
||||
|
|
||||
|
while (pool->pool_running) { |
||||
|
sem_p(shm->sem_id, 1); |
||||
|
sem_p(shm->sem_id, 0); |
||||
|
|
||||
|
Task task = shm->task_queue[shm->front]; |
||||
|
shm->front = (shm->front + 1) % MAX_TASK; |
||||
|
printf("[线程%ld] 取出任务%d:%s\n", pthread_self(), task.task_id, task.task_content); |
||||
|
|
||||
|
sem_v(shm->sem_id, 0); |
||||
|
sem_v(shm->sem_id, 2); |
||||
|
|
||||
|
task.status = TASK_RUNNING; |
||||
|
sleep(1); |
||||
|
task.status = TASK_DONE; |
||||
|
printf("[线程%ld] 完成任务%d,状态:%d\n", pthread_self(), task.task_id, task.status); |
||||
|
} |
||||
|
pthread_exit(NULL); |
||||
|
} |
||||
|
|
||||
|
// 线程池初始化(不变)
|
||||
|
int thread_pool_init(ThreadPool *pool, SharedMem *shm) { |
||||
|
pool->shm = shm; |
||||
|
pool->pool_running = 1; |
||||
|
for (int i = 0; i < THREAD_POOL_SIZE; i++) { |
||||
|
if (pthread_create(&pool->tid[i], NULL, thread_worker, pool) != 0) { |
||||
|
perror("pthread_create failed"); |
||||
|
return -1; |
||||
|
} |
||||
|
} |
||||
|
return 0; |
||||
|
} |
||||
|
|
||||
|
// 线程池销毁(不变)
|
||||
|
void thread_pool_destroy(ThreadPool *pool) { |
||||
|
pool->pool_running = 0; |
||||
|
for (int i = 0; i < THREAD_POOL_SIZE; i++) { |
||||
|
sem_v(pool->shm->sem_id, 1); |
||||
|
} |
||||
|
for (int i = 0; i < THREAD_POOL_SIZE; i++) { |
||||
|
pthread_join(pool->tid[i], NULL); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
// 任务提交(不变)
|
||||
|
int task_submit(SharedMem *shm, int task_id, const char *content) { |
||||
|
sem_p(shm->sem_id, 2); |
||||
|
sem_p(shm->sem_id, 0); |
||||
|
|
||||
|
Task task = {0}; |
||||
|
task.task_id = task_id; |
||||
|
strncpy(task.task_content, content, sizeof(task.task_content)-1); |
||||
|
task.status = TASK_INIT; |
||||
|
shm->task_queue[shm->rear] = task; |
||||
|
shm->rear = (shm->rear + 1) % MAX_TASK; |
||||
|
printf("[提交进程] 提交任务%d:%s\n", task_id, content); |
||||
|
|
||||
|
sem_v(shm->sem_id, 0); |
||||
|
sem_v(shm->sem_id, 1); |
||||
|
|
||||
|
return 0; |
||||
|
} |
||||
|
|
||||
|
// 新增:带超时的管道读函数
|
||||
|
int pipe_read_with_timeout(int fd, char *buf, int len, int timeout_sec) { |
||||
|
fd_set read_fds; |
||||
|
struct timeval timeout; |
||||
|
|
||||
|
// 1. 初始化fd集合:只监听管道读端fd
|
||||
|
FD_ZERO(&read_fds); |
||||
|
FD_SET(fd, &read_fds); |
||||
|
|
||||
|
// 2. 设置超时时间(秒+微秒)
|
||||
|
timeout.tv_sec = timeout_sec; |
||||
|
timeout.tv_usec = 0; // 微秒,设为0表示整秒超时
|
||||
|
|
||||
|
// 3. 调用select:监听fd是否可读,超时返回0
|
||||
|
// select第一个参数:最大fd + 1(fd从0开始,所以+1)
|
||||
|
int ret = select(fd + 1, &read_fds, NULL, NULL, &timeout); |
||||
|
if (ret < 0) { |
||||
|
// select调用失败(比如fd无效)
|
||||
|
perror("select failed"); |
||||
|
return -1; |
||||
|
} else if (ret == 0) { |
||||
|
// 超时:无数据可读
|
||||
|
fprintf(stderr, "[子进程] 管道读超时(%d秒),父进程未发送同步信号\n", timeout_sec); |
||||
|
return 0; |
||||
|
} else { |
||||
|
// 有数据可读:执行read
|
||||
|
if (FD_ISSET(fd, &read_fds)) { // 确认fd确实可读
|
||||
|
return read(fd, buf, len); |
||||
|
} |
||||
|
} |
||||
|
return -1; |
||||
|
} |
||||
|
|
||||
|
int main() { |
||||
|
// 新增:创建管道(用于父子进程同步)
|
||||
|
int pipe_fd[2]; |
||||
|
if (pipe(pipe_fd) == -1) { |
||||
|
perror("pipe create failed"); |
||||
|
return -1; |
||||
|
} |
||||
|
|
||||
|
pid_t pid = fork(); |
||||
|
if (pid == -1) { |
||||
|
perror("fork failed"); |
||||
|
return -1; |
||||
|
} |
||||
|
|
||||
|
// 子进程:工作进程(带管道超时同步)
|
||||
|
if (pid == 0) { |
||||
|
// 关闭管道写端(子进程只读)
|
||||
|
close(pipe_fd[1]); |
||||
|
|
||||
|
char buf[2]; |
||||
|
// 新增:调用带超时的管道读函数
|
||||
|
int read_ret = pipe_read_with_timeout(pipe_fd[0], buf, 2, PIPE_TIMEOUT_SEC); |
||||
|
if (read_ret <= 0) { |
||||
|
// 超时或读失败,直接退出
|
||||
|
close(pipe_fd[0]); |
||||
|
return -1; |
||||
|
} |
||||
|
printf("[子进程] 收到父进程同步信号:%s\n", buf); |
||||
|
|
||||
|
// 读成功后,挂载共享内存
|
||||
|
printf("[工作进程] 启动,创建线程池...\n"); |
||||
|
SharedMem *shm = shm_attach(); |
||||
|
if (shm == NULL) { |
||||
|
close(pipe_fd[0]); |
||||
|
return -1; |
||||
|
} |
||||
|
ThreadPool pool = {0}; |
||||
|
if (thread_pool_init(&pool, shm) == -1) { |
||||
|
shm_destroy(shm); |
||||
|
close(pipe_fd[0]); |
||||
|
return -1; |
||||
|
} |
||||
|
|
||||
|
sleep(10); |
||||
|
printf("[工作进程] 停止线程池...\n"); |
||||
|
thread_pool_destroy(&pool); |
||||
|
shm_destroy(shm); |
||||
|
// 关闭管道读端
|
||||
|
close(pipe_fd[0]); |
||||
|
printf("[工作进程] 退出\n"); |
||||
|
return 0; |
||||
|
} |
||||
|
|
||||
|
// 父进程:任务提交进程
|
||||
|
else { |
||||
|
// 关闭管道读端(父进程只写)
|
||||
|
close(pipe_fd[0]); |
||||
|
|
||||
|
// 创建共享内存
|
||||
|
printf("[提交进程] 启动,创建共享内存...\n"); |
||||
|
SharedMem *shm = shm_create(); |
||||
|
if (shm == NULL) { |
||||
|
close(pipe_fd[1]); |
||||
|
return -1; |
||||
|
} |
||||
|
|
||||
|
// 共享内存创建完成,写入同步信号
|
||||
|
write(pipe_fd[1], "ok", 2); |
||||
|
printf("[提交进程] 已发送同步信号:ok\n"); |
||||
|
|
||||
|
// 提交测试任务
|
||||
|
for (int i = 1; i <= 5; i++) { |
||||
|
char content[64]; |
||||
|
snprintf(content, sizeof(content), "测试任务_%d", i); |
||||
|
task_submit(shm, i, content); |
||||
|
sleep(1); |
||||
|
} |
||||
|
|
||||
|
// 等待子进程退出
|
||||
|
wait(NULL); |
||||
|
shm_destroy(shm); |
||||
|
// 关闭管道写端
|
||||
|
close(pipe_fd[1]); |
||||
|
printf("[提交进程] 退出\n"); |
||||
|
return 0; |
||||
|
} |
||||
|
} |
||||
Loading…
Reference in new issue