-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy paththreadpool.c
More file actions
304 lines (260 loc) · 9.39 KB
/
threadpool.c
File metadata and controls
304 lines (260 loc) · 9.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
#include "threadpool.h"
// 创建线程池
ThreadPool *threadpoolCreate(int min, int max, int queueCapacity) {
// 为线程池分配空间
ThreadPool *pool = (ThreadPool *)malloc(sizeof(ThreadPool));
if (pool == NULL) {
perror("malloc threadpool");
return NULL;
}
// 初始化任务队列
// 为队列的底层数组分配空间
pool->qTasks = (Task *)malloc(sizeof(Task) * queueCapacity);
if (pool->qTasks == NULL) {
perror("malloc tasks");
free(pool);
return NULL;
}
// 任务队列
pool->qCapacity = queueCapacity;
pool->qFront = 0; // 队首
pool->qRear = 0; // 队尾
pool->qTaskCount = 0; // 当前任务数
// 退出标志
pool->shutdown = 0;
// 线程相关初始化
// 存储工作线程ID的数组,要足够存储最大线程个数的空间
pool->workerThreadIDs = (pthread_t *)malloc(sizeof(pthread_t) * max);
if (pool->workerThreadIDs == NULL) {
perror("malloc worker threads");
free(pool->qTasks);
free(pool);
return NULL;
}
memset(pool->workerThreadIDs, 0, sizeof(pthread_t) * max);
pool->minThreads = min; // 最小线程数
pool->maxThreads = max; // 最大线程数
pool->busyCount = 0; // 忙碌线程计数
pool->liveCount = min; // 存活线程计数
pool->exitCount = 0; // 退出线程计数
// 创建初始工作线程
for (int i = 0; i < min; ++i) {
pthread_create(&pool->workerThreadIDs[i], NULL, worker, pool);
}
// 创建管理者线程
pthread_create(&pool->managerThreadID, NULL, manager, pool);
// 初始化互斥锁
pthread_mutex_init(&pool->poolMutex, NULL);
pthread_mutex_init(&pool->busyMutex, NULL);
// 初始化条件变量
pthread_cond_init(&pool->notEmpty, NULL); // 队列非空条件变量
pthread_cond_init(&pool->notFull, NULL); // 队列未满条件变量
return pool;
}
// 销毁线程池
int threadpoolDestroy(ThreadPool *pool) {
if (pool == NULL) {
return -1;
}
// 更改关闭标志
pool->shutdown = 1;
// 先回收管理者线程
pthread_join(pool->managerThreadID, NULL);
// 唤醒阻塞的工作线程,让其自杀
for (int i = 0; i < pool->liveCount; ++i) {
pthread_cond_signal(&pool->notEmpty);
pthread_cond_signal(&pool->notFull);
}
sleep(1);
// 释放资源
if (pool && pool->qTasks) {
free(pool->qTasks);
pool->qTasks = NULL;
}
if (pool && pool->workerThreadIDs) {
free(pool->workerThreadIDs);
pool->workerThreadIDs = NULL;
}
pthread_mutex_destroy(&pool->poolMutex);
pthread_mutex_destroy(&pool->busyMutex);
pthread_cond_destroy(&pool->notEmpty);
pthread_cond_destroy(&pool->notFull);
free(pool);
pool = NULL;
return 0;
}
// 单个线程退出
void threadExit(ThreadPool *pool) {
// 获取当前线程的ID
pthread_t tid = pthread_self();
// 遍历工作线程ID数组,找到对应的线程ID
pthread_mutex_lock(&pool->poolMutex);
for (int i = 0; i < pool->maxThreads; ++i) {
if (pool->workerThreadIDs[i] == tid) {
// 将对应的线程ID还原为0
pool->workerThreadIDs[i] = 0;
printf("线程 %ld 退出\n", tid);
break;
}
}
pthread_mutex_unlock(&pool->poolMutex);
// 退出当前线程
pthread_exit(NULL);
}
// 管理者线程函数
void *manager(void *arg) {
ThreadPool *pool = (ThreadPool *)arg;
while (!pool->shutdown) {
// 每3秒检测一次
sleep(3);
// 再次判断是否退出
if (pool->shutdown) {
break;
}
printf("开始检测\n");
// 取出线程池中任务的数量与当前线程的数量
pthread_mutex_lock(&pool->poolMutex);
int taskNum = pool->qTaskCount;
int threadNum = pool->liveCount;
pthread_mutex_unlock(&pool->poolMutex);
// 取出线程池中忙碌线程的数量
pthread_mutex_lock(&pool->busyMutex);
int busyNum = pool->busyCount;
pthread_mutex_unlock(&pool->busyMutex);
// 增加线程
// 任务数 > 存活线程数 && 存活线程数 < 最大线程数,增加线程
if (taskNum > threadNum && threadNum < pool->maxThreads) {
// 加锁
pthread_mutex_lock(&pool->poolMutex);
// 一次只添加两个
int count = 0;
for (int i = 0; taskNum > threadNum &&
threadNum < pool->maxThreads && count < COUNT;
++i) {
// 从空闲的线程ID中分配
if (pool->workerThreadIDs[i] == 0) {
pthread_create(&pool->workerThreadIDs[i], NULL, worker,
pool);
pool->liveCount++;
count++;
}
}
printf("增加线程\n");
// 解锁
pthread_mutex_unlock(&pool->poolMutex);
}
// 减少线程
// 忙碌线程数 * 2 < 存活线程数 && 存活线程数 > 最小线程数,减少线程
if (busyNum * 2 < threadNum && threadNum > pool->minThreads) {
pthread_mutex_lock(&pool->poolMutex);
pool->exitCount = COUNT; // 设置一次性退出的线程数
pthread_mutex_unlock(&pool->poolMutex);
// 唤醒空闲的等待的工作线程,让它们自杀
for (int i = 0; i < COUNT; ++i) {
pthread_cond_signal(&pool->notEmpty);
}
}
}
pthread_exit(NULL);
return NULL;
}
// 工作线程函数
void *worker(void *arg) {
ThreadPool *pool = (ThreadPool *)arg;
while (1) {
// 加锁
pthread_mutex_lock(&pool->poolMutex);
// 队列空了但没关闭
while (pool->qTaskCount == 0 && !pool->shutdown) {
printf("队列空了但未关闭,工作线程%ld等待中\n", pthread_self());
// not_empty条件变量未满足,阻塞并释放锁
pthread_cond_wait(&pool->notEmpty, &pool->poolMutex);
// 判断是否自杀
if (pool->exitCount > 0) {
// 退出线程计数减少
pool->exitCount--;
// 再次判断线程数是否大于最小线程数
if (pool->liveCount > pool->minThreads) {
pool->liveCount--;
// 先解锁再退出
pthread_mutex_unlock(&pool->poolMutex);
printf("工作线程%ld自杀\n", pthread_self());
threadExit(pool);
}
}
}
// 队列关闭了,退出当前线程
if (pool->shutdown) {
printf("队列关闭了,工作线程%ld退出\n", pthread_self());
// 先释放锁
pthread_mutex_unlock(&pool->poolMutex);
threadExit(pool);
break;
}
// 取出任务
Task task;
task.process = pool->qTasks[pool->qFront].process;
task.arg = pool->qTasks[pool->qFront].arg;
// 改变计数
pool->qFront = (pool->qFront + 1) % pool->qCapacity; // 队首后移
pool->qTaskCount--; // 任务数减一
// 唤醒被not_full阻塞的线程
pthread_cond_signal(&pool->notFull);
// 解锁
pthread_mutex_unlock(&pool->poolMutex);
// 执行任务
printf("执行任务%d开始\n", *((int *)task.arg));
pthread_mutex_lock(&pool->busyMutex);
pool->busyCount++; // 忙碌线程计数增加
pthread_mutex_unlock(&pool->busyMutex);
task.process(task.arg);
// 释放资源
free(task.arg);
task.arg = NULL;
pthread_mutex_lock(&pool->busyMutex);
pool->busyCount--; // 忙碌线程计数减少
pthread_mutex_unlock(&pool->busyMutex);
printf("执行任务完成\n");
}
return NULL;
}
// 添加任务到线程池
void threadpoolAddTask(ThreadPool *pool, void (*process)(void *), void *arg) {
// 加锁
pthread_mutex_lock(&pool->poolMutex);
// 循环判断任务队列是否满,如果满就会被阻塞
while (pool->qTaskCount == pool->qCapacity && !pool->shutdown) {
printf("队列满了,等待...\n");
pthread_cond_wait(&pool->notFull, &pool->poolMutex);
}
// 再次检查队列是否已关闭,因为可能在唤醒时队列关闭
if (pool->shutdown) {
pthread_mutex_unlock(&pool->poolMutex);
return;
}
// 添加任务到rear指向的位置,然后移动
Task task;
task.process = process;
task.arg = arg;
pool->qTasks[pool->qRear] = task;
pool->qRear = (pool->qRear + 1) % pool->qCapacity;
pool->qTaskCount++;
printf("添加任务 %d\n", *((int *)arg));
// 唤醒等待的工作线程
pthread_cond_signal(&pool->notEmpty);
// 解锁
pthread_mutex_unlock(&pool->poolMutex);
}
// 获取线程池状态
int threadpoolGetBusyCount(ThreadPool *pool) {
pthread_mutex_lock(&pool->poolMutex);
int busy_count = pool->busyCount;
pthread_mutex_unlock(&pool->poolMutex);
return busy_count;
}
int threadpoolGetAliveCount(ThreadPool *pool) {
pthread_mutex_lock(&pool->poolMutex);
int live_count = pool->liveCount;
pthread_mutex_unlock(&pool->poolMutex);
return live_count;
}