You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
401 lines
12 KiB
401 lines
12 KiB
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
#include <unistd.h>
|
|
#include <pthread.h>
|
|
#include <sys/ipc.h>
|
|
#include <sys/shm.h>
|
|
#include <sys/sem.h>
|
|
#include <string.h>
|
|
#include <errno.h>
|
|
#include <sys/select.h>
|
|
#include <sys/wait.h>
|
|
#include <signal.h>
|
|
|
|
// ====================== 全局配置 ======================
|
|
#define MAX_TASK 10 // 共享任务队列最大长度
|
|
#define THREAD_POOL_SIZE 4 // 线程池大小
|
|
#define SHM_KEY 0x1234 // 共享内存键值
|
|
#define SEM_KEY 0x5678 // 信号量键值
|
|
#define PIPE_TIMEOUT_SEC 5 // 管道超时时间(秒)
|
|
|
|
// 任务状态枚举
|
|
typedef enum {
|
|
TASK_INIT = 0, // 初始态
|
|
TASK_RUNNING, // 执行中
|
|
TASK_DONE // 完成态
|
|
} TaskStatus;
|
|
|
|
// 共享内存中的任务结构体
|
|
typedef struct {
|
|
int task_id;
|
|
char task_content[64]; // 任务内容
|
|
TaskStatus status; // 任务状态
|
|
} Task;
|
|
|
|
// 共享内存核心结构体(包含任务队列 + 信号量)
|
|
typedef struct {
|
|
Task task_queue[MAX_TASK]; // 共享任务队列
|
|
int front; // 队列头
|
|
int rear; // 队列尾
|
|
int sem_id; // 信号量集ID
|
|
} SharedMem;
|
|
|
|
// 线程池参数
|
|
typedef struct {
|
|
SharedMem *shm; // 指向共享内存的指针
|
|
pthread_t tid[THREAD_POOL_SIZE]; // 线程ID数组
|
|
int pool_running; // 线程池运行标记(1:运行,0:停止)
|
|
} ThreadPool;
|
|
|
|
// ====================== 信号量操作封装 ======================
|
|
// 信号量初始化(创建/获取信号量集)
|
|
int sem_init(int sem_key) {
|
|
int sem_id = semget(sem_key, 3, IPC_CREAT | 0666);
|
|
if (sem_id == -1) {
|
|
perror("semget failed");
|
|
return -1;
|
|
}
|
|
|
|
union semun {
|
|
int val;
|
|
struct semid_ds *buf;
|
|
unsigned short *array;
|
|
} sem_union;
|
|
|
|
// 互斥信号量(索引0):初始值1
|
|
sem_union.val = 1;
|
|
if (semctl(sem_id, 0, SETVAL, sem_union) == -1) {
|
|
perror("semctl mutex failed");
|
|
return -1;
|
|
}
|
|
// 空信号量(索引1):初始值0(队列初始空)
|
|
sem_union.val = 0;
|
|
if (semctl(sem_id, 1, SETVAL, sem_union) == -1) {
|
|
perror("semctl empty failed");
|
|
return -1;
|
|
}
|
|
// 满信号量(索引2):初始值MAX_TASK
|
|
sem_union.val = MAX_TASK;
|
|
if (semctl(sem_id, 2, SETVAL, sem_union) == -1) {
|
|
perror("semctl full failed");
|
|
return -1;
|
|
}
|
|
|
|
return sem_id;
|
|
}
|
|
|
|
// 信号量P操作(减1,阻塞)
|
|
void sem_p(int sem_id, int sem_idx) {
|
|
struct sembuf sem_buf = {sem_idx, -1, SEM_UNDO};
|
|
if (semop(sem_id, &sem_buf, 1) == -1) {
|
|
perror("sem_p failed");
|
|
}
|
|
}
|
|
|
|
// 信号量V操作(加1,唤醒)
|
|
void sem_v(int sem_id, int sem_idx) {
|
|
struct sembuf sem_buf = {sem_idx, 1, SEM_UNDO};
|
|
if (semop(sem_id, &sem_buf, 1) == -1) {
|
|
perror("sem_v failed");
|
|
}
|
|
}
|
|
|
|
// ====================== 共享内存操作封装 ======================
|
|
// 创建/挂载共享内存(父进程用)
|
|
SharedMem* shm_create() {
|
|
// 1. 创建共享内存
|
|
int shm_id = shmget(SHM_KEY, sizeof(SharedMem), IPC_CREAT | 0666);
|
|
if (shm_id == -1) {
|
|
perror("shmget failed");
|
|
return NULL;
|
|
}
|
|
// 2. 挂载共享内存到进程地址空间
|
|
SharedMem *shm = (SharedMem*)shmat(shm_id, NULL, 0);
|
|
if (shm == (void*)-1) {
|
|
perror("shmat failed");
|
|
return NULL;
|
|
}
|
|
// 3. 初始化共享内存
|
|
memset(shm, 0, sizeof(SharedMem));
|
|
shm->front = 0;
|
|
shm->rear = 0;
|
|
shm->sem_id = sem_init(SEM_KEY); // 初始化信号量
|
|
if (shm->sem_id == -1) {
|
|
shmdt(shm);
|
|
return NULL;
|
|
}
|
|
return shm;
|
|
}
|
|
|
|
// 挂载已存在的共享内存(子进程用)
|
|
SharedMem* shm_attach() {
|
|
int shm_id = shmget(SHM_KEY, sizeof(SharedMem), 0666);
|
|
if (shm_id == -1) {
|
|
perror("shmget attach failed");
|
|
return NULL;
|
|
}
|
|
SharedMem *shm = (SharedMem*)shmat(shm_id, NULL, 0);
|
|
if (shm == (void*)-1) {
|
|
perror("shmat attach failed");
|
|
return NULL;
|
|
}
|
|
return shm;
|
|
}
|
|
|
|
// 仅分离共享内存(子进程用,不删除内核资源)
|
|
void shm_detach_only(SharedMem *shm) {
|
|
if (shm == NULL) return;
|
|
shmdt(shm); // 仅分离,不删除
|
|
}
|
|
|
|
// 销毁共享内存(父进程用,删除内核资源)
|
|
void shm_destroy(SharedMem *shm) {
|
|
if (shm == NULL) return;
|
|
|
|
// 1. 分离共享内存
|
|
shmdt(shm);
|
|
|
|
// 2. 删除共享内存(检查资源是否存在)
|
|
int shm_id = shmget(SHM_KEY, sizeof(SharedMem), 0666);
|
|
if (shm_id != -1) {
|
|
shmctl(shm_id, IPC_RMID, NULL);
|
|
}
|
|
|
|
// 3. 删除信号量(检查资源是否存在)
|
|
if (shm->sem_id != -1) {
|
|
semctl(shm->sem_id, 0, IPC_RMID);
|
|
}
|
|
}
|
|
|
|
// ====================== 线程池核心逻辑 ======================
|
|
// 线程处理函数:循环取任务执行,增加防空任务/退出检查
|
|
void* thread_worker(void *arg) {
|
|
ThreadPool *pool = (ThreadPool*)arg;
|
|
SharedMem *shm = pool->shm;
|
|
|
|
while (pool->pool_running) {
|
|
// 前置检查:线程池已停止则直接退出
|
|
if (!pool->pool_running) break;
|
|
|
|
// 1. P操作空信号量(队列空则阻塞)
|
|
sem_p(shm->sem_id, 1);
|
|
// 唤醒后再次检查:线程池已停止则归还信号量并退出
|
|
if (!pool->pool_running) {
|
|
sem_v(shm->sem_id, 1); // 归还空信号量,避免死锁
|
|
break;
|
|
}
|
|
|
|
// 2. P操作互斥信号量(保护队列读写)
|
|
sem_p(shm->sem_id, 0);
|
|
// 最后检查:线程池已停止则归还所有信号量并退出
|
|
if (!pool->pool_running) {
|
|
sem_v(shm->sem_id, 0); // 归还互斥信号量
|
|
sem_v(shm->sem_id, 1); // 归还空信号量
|
|
break;
|
|
}
|
|
|
|
// 3. 检查任务是否为空(避免读取无效任务)
|
|
Task task = shm->task_queue[shm->front];
|
|
if (task.task_id == 0) {
|
|
sem_v(shm->sem_id, 0); // 归还互斥信号量
|
|
continue;
|
|
}
|
|
|
|
// 4. 取出任务,更新队列头
|
|
shm->front = (shm->front + 1) % MAX_TASK;
|
|
printf("[线程%ld] 取出任务%d:%s\n", pthread_self(), task.task_id, task.task_content);
|
|
|
|
// 5. V操作互斥信号量
|
|
sem_v(shm->sem_id, 0);
|
|
// 6. V操作满信号量(队列有空位)
|
|
sem_v(shm->sem_id, 2);
|
|
|
|
// 7. 执行任务(模拟耗时操作)
|
|
task.status = TASK_RUNNING;
|
|
sleep(1);
|
|
task.status = TASK_DONE;
|
|
printf("[线程%ld] 完成任务%d,状态:%d\n", pthread_self(), task.task_id, task.status);
|
|
}
|
|
|
|
pthread_exit(NULL);
|
|
}
|
|
|
|
// 初始化线程池
|
|
int thread_pool_init(ThreadPool *pool, SharedMem *shm) {
|
|
pool->shm = shm;
|
|
pool->pool_running = 1;
|
|
|
|
// 创建线程池中的线程
|
|
for (int i = 0; i < THREAD_POOL_SIZE; i++) {
|
|
if (pthread_create(&pool->tid[i], NULL, thread_worker, pool) != 0) {
|
|
perror("pthread_create failed");
|
|
return -1;
|
|
}
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
// 销毁线程池(优雅退出)
|
|
void thread_pool_destroy(ThreadPool *pool) {
|
|
if (pool == NULL) return;
|
|
|
|
// 标记线程池停止
|
|
pool->pool_running = 0;
|
|
|
|
// 唤醒所有阻塞的线程(避免线程卡死)
|
|
for (int i = 0; i < THREAD_POOL_SIZE; i++) {
|
|
sem_v(pool->shm->sem_id, 1);
|
|
}
|
|
|
|
// 等待所有线程退出
|
|
for (int i = 0; i < THREAD_POOL_SIZE; i++) {
|
|
pthread_join(pool->tid[i], NULL);
|
|
}
|
|
}
|
|
|
|
// ====================== 任务提交函数 ======================
|
|
int task_submit(SharedMem *shm, int task_id, const char *content) {
|
|
// 1. P操作满信号量(队列满则阻塞)
|
|
sem_p(shm->sem_id, 2);
|
|
// 2. P操作互斥信号量
|
|
sem_p(shm->sem_id, 0);
|
|
|
|
// 3. 写入任务到队列
|
|
Task task = {0};
|
|
task.task_id = task_id;
|
|
strncpy(task.task_content, content, sizeof(task.task_content)-1);
|
|
task.status = TASK_INIT;
|
|
shm->task_queue[shm->rear] = task;
|
|
shm->rear = (shm->rear + 1) % MAX_TASK;
|
|
printf("[提交进程] 提交任务%d:%s\n", task_id, content);
|
|
|
|
// 4. V操作互斥信号量
|
|
sem_v(shm->sem_id, 0);
|
|
// 5. V操作空信号量(唤醒线程池)
|
|
sem_v(shm->sem_id, 1);
|
|
|
|
return 0;
|
|
}
|
|
|
|
// ====================== 带超时的管道读函数 ======================
|
|
int pipe_read_with_timeout(int fd, char *buf, int len, int timeout_sec) {
|
|
fd_set read_fds;
|
|
struct timeval timeout;
|
|
|
|
// 初始化fd集合
|
|
FD_ZERO(&read_fds);
|
|
FD_SET(fd, &read_fds);
|
|
|
|
// 设置超时时间
|
|
timeout.tv_sec = timeout_sec;
|
|
timeout.tv_usec = 0;
|
|
|
|
// 监听fd是否可读
|
|
int ret = select(fd + 1, &read_fds, NULL, NULL, &timeout);
|
|
if (ret < 0) {
|
|
perror("select failed");
|
|
return -1;
|
|
} else if (ret == 0) {
|
|
fprintf(stderr, "[子进程] 管道读超时(%d秒),父进程未发送同步信号\n", timeout_sec);
|
|
return 0;
|
|
} else {
|
|
if (FD_ISSET(fd, &read_fds)) {
|
|
return read(fd, buf, len);
|
|
}
|
|
}
|
|
return -1;
|
|
}
|
|
|
|
// ====================== 主函数(进程分支) ======================
|
|
int main() {
|
|
// 创建管道(父子进程同步用)
|
|
int pipe_fd[2];
|
|
if (pipe(pipe_fd) == -1) {
|
|
perror("pipe create failed");
|
|
return -1;
|
|
}
|
|
|
|
// 创建子进程
|
|
pid_t pid = fork();
|
|
if (pid == -1) {
|
|
perror("fork failed");
|
|
return -1;
|
|
}
|
|
|
|
// 子进程:工作进程(线程池 + 消费任务)
|
|
if (pid == 0) {
|
|
// 关闭管道写端(子进程只读)
|
|
close(pipe_fd[1]);
|
|
|
|
char buf[2];
|
|
// 带超时读取父进程的同步信号
|
|
int read_ret = pipe_read_with_timeout(pipe_fd[0], buf, 2, PIPE_TIMEOUT_SEC);
|
|
if (read_ret <= 0) {
|
|
close(pipe_fd[0]);
|
|
return -1;
|
|
}
|
|
printf("[子进程] 收到父进程同步信号:%s\n", buf);
|
|
|
|
// 挂载共享内存
|
|
printf("[工作进程] 启动,创建线程池...\n");
|
|
SharedMem *shm = shm_attach();
|
|
if (shm == NULL) {
|
|
close(pipe_fd[0]);
|
|
return -1;
|
|
}
|
|
|
|
// 初始化线程池
|
|
ThreadPool pool = {0};
|
|
if (thread_pool_init(&pool, shm) == -1) {
|
|
shm_detach_only(shm);
|
|
close(pipe_fd[0]);
|
|
return -1;
|
|
}
|
|
|
|
// 运行10秒后停止线程池
|
|
sleep(10);
|
|
printf("[工作进程] 停止线程池...\n");
|
|
thread_pool_destroy(&pool);
|
|
|
|
// 子进程仅分离共享内存,不销毁内核资源
|
|
shm_detach_only(shm);
|
|
close(pipe_fd[0]);
|
|
printf("[工作进程] 退出\n");
|
|
return 0;
|
|
}
|
|
|
|
// 父进程:任务提交进程
|
|
else {
|
|
// 关闭管道读端(父进程只写)
|
|
close(pipe_fd[0]);
|
|
|
|
// 创建共享内存
|
|
printf("[提交进程] 启动,创建共享内存...\n");
|
|
SharedMem *shm = shm_create();
|
|
if (shm == NULL) {
|
|
close(pipe_fd[1]);
|
|
return -1;
|
|
}
|
|
|
|
// 发送同步信号(告知子进程共享内存已创建)
|
|
write(pipe_fd[1], "ok", 2);
|
|
printf("[提交进程] 已发送同步信号:ok\n");
|
|
|
|
// 提交5个测试任务
|
|
for (int i = 1; i <= 5; i++) {
|
|
char content[64];
|
|
snprintf(content, sizeof(content), "测试任务_%d", i);
|
|
task_submit(shm, i, content);
|
|
sleep(1);
|
|
}
|
|
|
|
// 等待子进程退出
|
|
wait(NULL);
|
|
|
|
// 父进程统一销毁共享内存和信号量
|
|
shm_destroy(shm);
|
|
close(pipe_fd[1]);
|
|
printf("[提交进程] 退出\n");
|
|
return 0;
|
|
}
|
|
}
|