thread_pool.c revision 266612
1/* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 22/* 23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27#include <sys/cdefs.h> 28__FBSDID("$FreeBSD: stable/10/cddl/compat/opensolaris/misc/thread_pool.c 266612 2014-05-24 10:44:40Z mav $"); 29 30#pragma ident "%Z%%M% %I% %E% SMI" 31 32#include <stdlib.h> 33#include <signal.h> 34#include <errno.h> 35#include "thread_pool_impl.h" 36 37typedef void (*_Voidfp)(void*); /* pointer to extern "C" function */ 38 39static void 40delete_pool(tpool_t *tpool) 41{ 42 tpool_job_t *job; 43 44 /* 45 * There should be no pending jobs, but just in case... 46 */ 47 for (job = tpool->tp_head; job != NULL; job = tpool->tp_head) { 48 tpool->tp_head = job->tpj_next; 49 free(job); 50 } 51 (void) pthread_attr_destroy(&tpool->tp_attr); 52 free(tpool); 53} 54 55/* 56 * Worker thread is terminating. 57 */ 58static void 59worker_cleanup(void *arg) 60{ 61 tpool_t *tpool = arg; 62 63 if (--tpool->tp_current == 0 && 64 (tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) { 65 if (tpool->tp_flags & TP_ABANDON) { 66 pthread_mutex_unlock(&tpool->tp_mutex); 67 delete_pool(tpool); 68 return; 69 } 70 if (tpool->tp_flags & TP_DESTROY) 71 (void) pthread_cond_broadcast(&tpool->tp_busycv); 72 } 73 pthread_mutex_unlock(&tpool->tp_mutex); 74} 75 76static void 77notify_waiters(tpool_t *tpool) 78{ 79 if (tpool->tp_head == NULL && tpool->tp_active == NULL) { 80 tpool->tp_flags &= ~TP_WAIT; 81 (void) pthread_cond_broadcast(&tpool->tp_waitcv); 82 } 83} 84 85/* 86 * Called by a worker thread on return from a tpool_dispatch()d job. 87 */ 88static void 89job_cleanup(void *arg) 90{ 91 tpool_t *tpool = arg; 92 pthread_t my_tid = pthread_self(); 93 tpool_active_t *activep; 94 tpool_active_t **activepp; 95 96 pthread_mutex_lock(&tpool->tp_mutex); 97 /* CSTYLED */ 98 for (activepp = &tpool->tp_active;; activepp = &activep->tpa_next) { 99 activep = *activepp; 100 if (activep->tpa_tid == my_tid) { 101 *activepp = activep->tpa_next; 102 break; 103 } 104 } 105 if (tpool->tp_flags & TP_WAIT) 106 notify_waiters(tpool); 107} 108 109static void * 110tpool_worker(void *arg) 111{ 112 tpool_t *tpool = (tpool_t *)arg; 113 int elapsed; 114 tpool_job_t *job; 115 void (*func)(void *); 116 tpool_active_t active; 117 sigset_t maskset; 118 119 pthread_mutex_lock(&tpool->tp_mutex); 120 pthread_cleanup_push(worker_cleanup, tpool); 121 122 /* 123 * This is the worker's main loop. 124 * It will only be left if a timeout or an error has occured. 125 */ 126 active.tpa_tid = pthread_self(); 127 for (;;) { 128 elapsed = 0; 129 tpool->tp_idle++; 130 if (tpool->tp_flags & TP_WAIT) 131 notify_waiters(tpool); 132 while ((tpool->tp_head == NULL || 133 (tpool->tp_flags & TP_SUSPEND)) && 134 !(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) { 135 if (tpool->tp_current <= tpool->tp_minimum || 136 tpool->tp_linger == 0) { 137 (void) pthread_cond_wait(&tpool->tp_workcv, 138 &tpool->tp_mutex); 139 } else { 140 struct timespec timeout; 141 142 clock_gettime(CLOCK_MONOTONIC, &timeout); 143 timeout.tv_sec += tpool->tp_linger; 144 if (pthread_cond_timedwait(&tpool->tp_workcv, 145 &tpool->tp_mutex, &timeout) != 0) { 146 elapsed = 1; 147 break; 148 } 149 } 150 } 151 tpool->tp_idle--; 152 if (tpool->tp_flags & TP_DESTROY) 153 break; 154 if (tpool->tp_flags & TP_ABANDON) { 155 /* can't abandon a suspended pool */ 156 if (tpool->tp_flags & TP_SUSPEND) { 157 tpool->tp_flags &= ~TP_SUSPEND; 158 (void) pthread_cond_broadcast(&tpool->tp_workcv); 159 } 160 if (tpool->tp_head == NULL) 161 break; 162 } 163 if ((job = tpool->tp_head) != NULL && 164 !(tpool->tp_flags & TP_SUSPEND)) { 165 elapsed = 0; 166 func = job->tpj_func; 167 arg = job->tpj_arg; 168 tpool->tp_head = job->tpj_next; 169 if (job == tpool->tp_tail) 170 tpool->tp_tail = NULL; 171 tpool->tp_njobs--; 172 active.tpa_next = tpool->tp_active; 173 tpool->tp_active = &active; 174 pthread_mutex_unlock(&tpool->tp_mutex); 175 pthread_cleanup_push(job_cleanup, tpool); 176 free(job); 177 /* 178 * Call the specified function. 179 */ 180 func(arg); 181 /* 182 * We don't know what this thread has been doing, 183 * so we reset its signal mask and cancellation 184 * state back to the initial values. 185 */ 186 sigfillset(&maskset); 187 (void) pthread_sigmask(SIG_SETMASK, &maskset, NULL); 188 (void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 189 NULL); 190 (void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 191 NULL); 192 pthread_cleanup_pop(1); 193 } 194 if (elapsed && tpool->tp_current > tpool->tp_minimum) { 195 /* 196 * We timed out and there is no work to be done 197 * and the number of workers exceeds the minimum. 198 * Exit now to reduce the size of the pool. 199 */ 200 break; 201 } 202 } 203 pthread_cleanup_pop(1); 204 return (arg); 205} 206 207/* 208 * Create a worker thread, with all signals blocked. 209 */ 210static int 211create_worker(tpool_t *tpool) 212{ 213 sigset_t maskset, oset; 214 pthread_t thread; 215 int error; 216 217 sigfillset(&maskset); 218 (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); 219 error = pthread_create(&thread, &tpool->tp_attr, tpool_worker, tpool); 220 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 221 return (error); 222} 223 224tpool_t * 225tpool_create(uint_t min_threads, uint_t max_threads, uint_t linger, 226 pthread_attr_t *attr) 227{ 228 tpool_t *tpool; 229 int error; 230 231 if (min_threads > max_threads || max_threads < 1) { 232 errno = EINVAL; 233 return (NULL); 234 } 235 236 tpool = malloc(sizeof (*tpool)); 237 if (tpool == NULL) { 238 errno = ENOMEM; 239 return (NULL); 240 } 241 bzero(tpool, sizeof(*tpool)); 242 (void) pthread_mutex_init(&tpool->tp_mutex, NULL); 243 (void) pthread_cond_init(&tpool->tp_busycv, NULL); 244 (void) pthread_cond_init(&tpool->tp_workcv, NULL); 245 (void) pthread_cond_init(&tpool->tp_waitcv, NULL); 246 tpool->tp_minimum = min_threads; 247 tpool->tp_maximum = max_threads; 248 tpool->tp_linger = linger; 249 250 /* make all pool threads be detached daemon threads */ 251 (void) pthread_attr_init(&tpool->tp_attr); 252 (void) pthread_attr_setdetachstate(&tpool->tp_attr, 253 PTHREAD_CREATE_DETACHED); 254 255 return (tpool); 256} 257 258/* 259 * Dispatch a work request to the thread pool. 260 * If there are idle workers, awaken one. 261 * Else, if the maximum number of workers has 262 * not been reached, spawn a new worker thread. 263 * Else just return with the job added to the queue. 264 */ 265int 266tpool_dispatch(tpool_t *tpool, void (*func)(void *), void *arg) 267{ 268 tpool_job_t *job; 269 270 if ((job = malloc(sizeof (*job))) == NULL) 271 return (-1); 272 bzero(job, sizeof(*job)); 273 job->tpj_next = NULL; 274 job->tpj_func = func; 275 job->tpj_arg = arg; 276 277 pthread_mutex_lock(&tpool->tp_mutex); 278 279 if (tpool->tp_head == NULL) 280 tpool->tp_head = job; 281 else 282 tpool->tp_tail->tpj_next = job; 283 tpool->tp_tail = job; 284 tpool->tp_njobs++; 285 286 if (!(tpool->tp_flags & TP_SUSPEND)) { 287 if (tpool->tp_idle > 0) 288 (void) pthread_cond_signal(&tpool->tp_workcv); 289 else if (tpool->tp_current < tpool->tp_maximum && 290 create_worker(tpool) == 0) 291 tpool->tp_current++; 292 } 293 294 pthread_mutex_unlock(&tpool->tp_mutex); 295 return (0); 296} 297 298/* 299 * Assumes: by the time tpool_destroy() is called no one will use this 300 * thread pool in any way and no one will try to dispatch entries to it. 301 * Calling tpool_destroy() from a job in the pool will cause deadlock. 302 */ 303void 304tpool_destroy(tpool_t *tpool) 305{ 306 tpool_active_t *activep; 307 308 pthread_mutex_lock(&tpool->tp_mutex); 309 pthread_cleanup_push((_Voidfp)pthread_mutex_unlock, &tpool->tp_mutex); 310 311 /* mark the pool as being destroyed; wakeup idle workers */ 312 tpool->tp_flags |= TP_DESTROY; 313 tpool->tp_flags &= ~TP_SUSPEND; 314 (void) pthread_cond_broadcast(&tpool->tp_workcv); 315 316 /* cancel all active workers */ 317 for (activep = tpool->tp_active; activep; activep = activep->tpa_next) 318 (void) pthread_cancel(activep->tpa_tid); 319 320 /* wait for all active workers to finish */ 321 while (tpool->tp_active != NULL) { 322 tpool->tp_flags |= TP_WAIT; 323 (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex); 324 } 325 326 /* the last worker to terminate will wake us up */ 327 while (tpool->tp_current != 0) 328 (void) pthread_cond_wait(&tpool->tp_busycv, &tpool->tp_mutex); 329 330 pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */ 331 delete_pool(tpool); 332} 333 334/* 335 * Like tpool_destroy(), but don't cancel workers or wait for them to finish. 336 * The last worker to terminate will delete the pool. 337 */ 338void 339tpool_abandon(tpool_t *tpool) 340{ 341 342 pthread_mutex_lock(&tpool->tp_mutex); 343 if (tpool->tp_current == 0) { 344 /* no workers, just delete the pool */ 345 pthread_mutex_unlock(&tpool->tp_mutex); 346 delete_pool(tpool); 347 } else { 348 /* wake up all workers, last one will delete the pool */ 349 tpool->tp_flags |= TP_ABANDON; 350 tpool->tp_flags &= ~TP_SUSPEND; 351 (void) pthread_cond_broadcast(&tpool->tp_workcv); 352 pthread_mutex_unlock(&tpool->tp_mutex); 353 } 354} 355 356/* 357 * Wait for all jobs to complete. 358 * Calling tpool_wait() from a job in the pool will cause deadlock. 359 */ 360void 361tpool_wait(tpool_t *tpool) 362{ 363 364 pthread_mutex_lock(&tpool->tp_mutex); 365 pthread_cleanup_push((_Voidfp)pthread_mutex_unlock, &tpool->tp_mutex); 366 while (tpool->tp_head != NULL || tpool->tp_active != NULL) { 367 tpool->tp_flags |= TP_WAIT; 368 (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex); 369 } 370 pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */ 371} 372 373void 374tpool_suspend(tpool_t *tpool) 375{ 376 377 pthread_mutex_lock(&tpool->tp_mutex); 378 tpool->tp_flags |= TP_SUSPEND; 379 pthread_mutex_unlock(&tpool->tp_mutex); 380} 381 382int 383tpool_suspended(tpool_t *tpool) 384{ 385 int suspended; 386 387 pthread_mutex_lock(&tpool->tp_mutex); 388 suspended = (tpool->tp_flags & TP_SUSPEND) != 0; 389 pthread_mutex_unlock(&tpool->tp_mutex); 390 391 return (suspended); 392} 393 394void 395tpool_resume(tpool_t *tpool) 396{ 397 int excess; 398 399 pthread_mutex_lock(&tpool->tp_mutex); 400 if (!(tpool->tp_flags & TP_SUSPEND)) { 401 pthread_mutex_unlock(&tpool->tp_mutex); 402 return; 403 } 404 tpool->tp_flags &= ~TP_SUSPEND; 405 (void) pthread_cond_broadcast(&tpool->tp_workcv); 406 excess = tpool->tp_njobs - tpool->tp_idle; 407 while (excess-- > 0 && tpool->tp_current < tpool->tp_maximum) { 408 if (create_worker(tpool) != 0) 409 break; /* pthread_create() failed */ 410 tpool->tp_current++; 411 } 412 pthread_mutex_unlock(&tpool->tp_mutex); 413} 414 415int 416tpool_member(tpool_t *tpool) 417{ 418 pthread_t my_tid = pthread_self(); 419 tpool_active_t *activep; 420 421 pthread_mutex_lock(&tpool->tp_mutex); 422 for (activep = tpool->tp_active; activep; activep = activep->tpa_next) { 423 if (activep->tpa_tid == my_tid) { 424 pthread_mutex_unlock(&tpool->tp_mutex); 425 return (1); 426 } 427 } 428 pthread_mutex_unlock(&tpool->tp_mutex); 429 return (0); 430} 431