thread_pool.c revision 277554
1/* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22/* 23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27#include <sys/cdefs.h> 28__FBSDID("$FreeBSD: stable/10/cddl/compat/opensolaris/misc/thread_pool.c 277554 2015-01-23 00:54:56Z delphij $"); 29 30#pragma ident "%Z%%M% %I% %E% SMI" 31 32#include <stdlib.h> 33#include <signal.h> 34#include <errno.h> 35#include "thread_pool_impl.h" 36 37typedef void (*_Voidfp)(void*); /* pointer to extern "C" function */ 38 39static void 40delete_pool(tpool_t *tpool) 41{ 42 tpool_job_t *job; 43 44 /* 45 * There should be no pending jobs, but just in case... 46 */ 47 for (job = tpool->tp_head; job != NULL; job = tpool->tp_head) { 48 tpool->tp_head = job->tpj_next; 49 free(job); 50 } 51 (void) pthread_attr_destroy(&tpool->tp_attr); 52 free(tpool); 53} 54 55/* 56 * Worker thread is terminating. 57 */ 58static void 59worker_cleanup(void *arg) 60{ 61 tpool_t *tpool = arg; 62 63 if (--tpool->tp_current == 0 && 64 (tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) { 65 if (tpool->tp_flags & TP_ABANDON) { 66 pthread_mutex_unlock(&tpool->tp_mutex); 67 delete_pool(tpool); 68 return; 69 } 70 if (tpool->tp_flags & TP_DESTROY) 71 (void) pthread_cond_broadcast(&tpool->tp_busycv); 72 } 73 pthread_mutex_unlock(&tpool->tp_mutex); 74} 75 76static void 77notify_waiters(tpool_t *tpool) 78{ 79 if (tpool->tp_head == NULL && tpool->tp_active == NULL) { 80 tpool->tp_flags &= ~TP_WAIT; 81 (void) pthread_cond_broadcast(&tpool->tp_waitcv); 82 } 83} 84 85/* 86 * Called by a worker thread on return from a tpool_dispatch()d job. 87 */ 88static void 89job_cleanup(void *arg) 90{ 91 tpool_t *tpool = arg; 92 pthread_t my_tid = pthread_self(); 93 tpool_active_t *activep; 94 tpool_active_t **activepp; 95 96 pthread_mutex_lock(&tpool->tp_mutex); 97 /* CSTYLED */ 98 for (activepp = &tpool->tp_active;; activepp = &activep->tpa_next) { 99 activep = *activepp; 100 if (activep->tpa_tid == my_tid) { 101 *activepp = activep->tpa_next; 102 break; 103 } 104 } 105 if (tpool->tp_flags & TP_WAIT) 106 notify_waiters(tpool); 107} 108 109static void * 110tpool_worker(void *arg) 111{ 112 tpool_t *tpool = (tpool_t *)arg; 113 int elapsed; 114 tpool_job_t *job; 115 void (*func)(void *); 116 tpool_active_t active; 117 sigset_t maskset; 118 119 pthread_mutex_lock(&tpool->tp_mutex); 120 pthread_cleanup_push(worker_cleanup, tpool); 121 122 /* 123 * This is the worker's main loop. 124 * It will only be left if a timeout or an error has occured. 125 */ 126 active.tpa_tid = pthread_self(); 127 for (;;) { 128 elapsed = 0; 129 tpool->tp_idle++; 130 if (tpool->tp_flags & TP_WAIT) 131 notify_waiters(tpool); 132 while ((tpool->tp_head == NULL || 133 (tpool->tp_flags & TP_SUSPEND)) && 134 !(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) { 135 if (tpool->tp_current <= tpool->tp_minimum || 136 tpool->tp_linger == 0) { 137 (void) pthread_cond_wait(&tpool->tp_workcv, 138 &tpool->tp_mutex); 139 } else { 140 struct timespec timeout; 141 142 clock_gettime(CLOCK_MONOTONIC, &timeout); 143 timeout.tv_sec += tpool->tp_linger; 144 if (pthread_cond_timedwait(&tpool->tp_workcv, 145 &tpool->tp_mutex, &timeout) != 0) { 146 elapsed = 1; 147 break; 148 } 149 } 150 } 151 tpool->tp_idle--; 152 if (tpool->tp_flags & TP_DESTROY) 153 break; 154 if (tpool->tp_flags & TP_ABANDON) { 155 /* can't abandon a suspended pool */ 156 if (tpool->tp_flags & TP_SUSPEND) { 157 tpool->tp_flags &= ~TP_SUSPEND; 158 (void) pthread_cond_broadcast(&tpool->tp_workcv); 159 } 160 if (tpool->tp_head == NULL) 161 break; 162 } 163 if ((job = tpool->tp_head) != NULL && 164 !(tpool->tp_flags & TP_SUSPEND)) { 165 elapsed = 0; 166 func = job->tpj_func; 167 arg = job->tpj_arg; 168 tpool->tp_head = job->tpj_next; 169 if (job == tpool->tp_tail) 170 tpool->tp_tail = NULL; 171 tpool->tp_njobs--; 172 active.tpa_next = tpool->tp_active; 173 tpool->tp_active = &active; 174 pthread_mutex_unlock(&tpool->tp_mutex); 175 pthread_cleanup_push(job_cleanup, tpool); 176 free(job); 177 /* 178 * Call the specified function. 179 */ 180 func(arg); 181 /* 182 * We don't know what this thread has been doing, 183 * so we reset its signal mask and cancellation 184 * state back to the initial values. 185 */ 186 sigfillset(&maskset); 187 (void) pthread_sigmask(SIG_SETMASK, &maskset, NULL); 188 (void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 189 NULL); 190 (void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 191 NULL); 192 pthread_cleanup_pop(1); 193 } 194 if (elapsed && tpool->tp_current > tpool->tp_minimum) { 195 /* 196 * We timed out and there is no work to be done 197 * and the number of workers exceeds the minimum. 198 * Exit now to reduce the size of the pool. 199 */ 200 break; 201 } 202 } 203 pthread_cleanup_pop(1); 204 return (arg); 205} 206 207/* 208 * Create a worker thread, with all signals blocked. 209 */ 210static int 211create_worker(tpool_t *tpool) 212{ 213 sigset_t maskset, oset; 214 pthread_t thread; 215 int error; 216 217 sigfillset(&maskset); 218 (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); 219 error = pthread_create(&thread, &tpool->tp_attr, tpool_worker, tpool); 220 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 221 return (error); 222} 223 224tpool_t * 225tpool_create(uint_t min_threads, uint_t max_threads, uint_t linger, 226 pthread_attr_t *attr) 227{ 228 tpool_t *tpool; 229 int error; 230 231 if (min_threads > max_threads || max_threads < 1) { 232 errno = EINVAL; 233 return (NULL); 234 } 235 236 tpool = calloc(1, sizeof (*tpool)); 237 if (tpool == NULL) { 238 errno = ENOMEM; 239 return (NULL); 240 } 241 (void) pthread_mutex_init(&tpool->tp_mutex, NULL); 242 (void) pthread_cond_init(&tpool->tp_busycv, NULL); 243 (void) pthread_cond_init(&tpool->tp_workcv, NULL); 244 (void) pthread_cond_init(&tpool->tp_waitcv, NULL); 245 tpool->tp_minimum = min_threads; 246 tpool->tp_maximum = max_threads; 247 tpool->tp_linger = linger; 248 249 /* make all pool threads be detached daemon threads */ 250 (void) pthread_attr_init(&tpool->tp_attr); 251 (void) pthread_attr_setdetachstate(&tpool->tp_attr, 252 PTHREAD_CREATE_DETACHED); 253 254 return (tpool); 255} 256 257/* 258 * Dispatch a work request to the thread pool. 259 * If there are idle workers, awaken one. 260 * Else, if the maximum number of workers has 261 * not been reached, spawn a new worker thread. 262 * Else just return with the job added to the queue. 263 */ 264int 265tpool_dispatch(tpool_t *tpool, void (*func)(void *), void *arg) 266{ 267 tpool_job_t *job; 268 269 if ((job = calloc(1, sizeof (*job))) == NULL) 270 return (-1); 271 job->tpj_next = NULL; 272 job->tpj_func = func; 273 job->tpj_arg = arg; 274 275 pthread_mutex_lock(&tpool->tp_mutex); 276 277 if (tpool->tp_head == NULL) 278 tpool->tp_head = job; 279 else 280 tpool->tp_tail->tpj_next = job; 281 tpool->tp_tail = job; 282 tpool->tp_njobs++; 283 284 if (!(tpool->tp_flags & TP_SUSPEND)) { 285 if (tpool->tp_idle > 0) 286 (void) pthread_cond_signal(&tpool->tp_workcv); 287 else if (tpool->tp_current < tpool->tp_maximum && 288 create_worker(tpool) == 0) 289 tpool->tp_current++; 290 } 291 292 pthread_mutex_unlock(&tpool->tp_mutex); 293 return (0); 294} 295 296/* 297 * Assumes: by the time tpool_destroy() is called no one will use this 298 * thread pool in any way and no one will try to dispatch entries to it. 299 * Calling tpool_destroy() from a job in the pool will cause deadlock. 300 */ 301void 302tpool_destroy(tpool_t *tpool) 303{ 304 tpool_active_t *activep; 305 306 pthread_mutex_lock(&tpool->tp_mutex); 307 pthread_cleanup_push((_Voidfp)pthread_mutex_unlock, &tpool->tp_mutex); 308 309 /* mark the pool as being destroyed; wakeup idle workers */ 310 tpool->tp_flags |= TP_DESTROY; 311 tpool->tp_flags &= ~TP_SUSPEND; 312 (void) pthread_cond_broadcast(&tpool->tp_workcv); 313 314 /* cancel all active workers */ 315 for (activep = tpool->tp_active; activep; activep = activep->tpa_next) 316 (void) pthread_cancel(activep->tpa_tid); 317 318 /* wait for all active workers to finish */ 319 while (tpool->tp_active != NULL) { 320 tpool->tp_flags |= TP_WAIT; 321 (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex); 322 } 323 324 /* the last worker to terminate will wake us up */ 325 while (tpool->tp_current != 0) 326 (void) pthread_cond_wait(&tpool->tp_busycv, &tpool->tp_mutex); 327 328 pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */ 329 delete_pool(tpool); 330} 331 332/* 333 * Like tpool_destroy(), but don't cancel workers or wait for them to finish. 334 * The last worker to terminate will delete the pool. 335 */ 336void 337tpool_abandon(tpool_t *tpool) 338{ 339 340 pthread_mutex_lock(&tpool->tp_mutex); 341 if (tpool->tp_current == 0) { 342 /* no workers, just delete the pool */ 343 pthread_mutex_unlock(&tpool->tp_mutex); 344 delete_pool(tpool); 345 } else { 346 /* wake up all workers, last one will delete the pool */ 347 tpool->tp_flags |= TP_ABANDON; 348 tpool->tp_flags &= ~TP_SUSPEND; 349 (void) pthread_cond_broadcast(&tpool->tp_workcv); 350 pthread_mutex_unlock(&tpool->tp_mutex); 351 } 352} 353 354/* 355 * Wait for all jobs to complete. 356 * Calling tpool_wait() from a job in the pool will cause deadlock. 357 */ 358void 359tpool_wait(tpool_t *tpool) 360{ 361 362 pthread_mutex_lock(&tpool->tp_mutex); 363 pthread_cleanup_push((_Voidfp)pthread_mutex_unlock, &tpool->tp_mutex); 364 while (tpool->tp_head != NULL || tpool->tp_active != NULL) { 365 tpool->tp_flags |= TP_WAIT; 366 (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex); 367 } 368 pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */ 369} 370 371void 372tpool_suspend(tpool_t *tpool) 373{ 374 375 pthread_mutex_lock(&tpool->tp_mutex); 376 tpool->tp_flags |= TP_SUSPEND; 377 pthread_mutex_unlock(&tpool->tp_mutex); 378} 379 380int 381tpool_suspended(tpool_t *tpool) 382{ 383 int suspended; 384 385 pthread_mutex_lock(&tpool->tp_mutex); 386 suspended = (tpool->tp_flags & TP_SUSPEND) != 0; 387 pthread_mutex_unlock(&tpool->tp_mutex); 388 389 return (suspended); 390} 391 392void 393tpool_resume(tpool_t *tpool) 394{ 395 int excess; 396 397 pthread_mutex_lock(&tpool->tp_mutex); 398 if (!(tpool->tp_flags & TP_SUSPEND)) { 399 pthread_mutex_unlock(&tpool->tp_mutex); 400 return; 401 } 402 tpool->tp_flags &= ~TP_SUSPEND; 403 (void) pthread_cond_broadcast(&tpool->tp_workcv); 404 excess = tpool->tp_njobs - tpool->tp_idle; 405 while (excess-- > 0 && tpool->tp_current < tpool->tp_maximum) { 406 if (create_worker(tpool) != 0) 407 break; /* pthread_create() failed */ 408 tpool->tp_current++; 409 } 410 pthread_mutex_unlock(&tpool->tp_mutex); 411} 412 413int 414tpool_member(tpool_t *tpool) 415{ 416 pthread_t my_tid = pthread_self(); 417 tpool_active_t *activep; 418 419 pthread_mutex_lock(&tpool->tp_mutex); 420 for (activep = tpool->tp_active; activep; activep = activep->tpa_next) { 421 if (activep->tpa_tid == my_tid) { 422 pthread_mutex_unlock(&tpool->tp_mutex); 423 return (1); 424 } 425 } 426 pthread_mutex_unlock(&tpool->tp_mutex); 427 return (0); 428} 429