taskq.c revision 349203
1/* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21/* 22 * Copyright 2010 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25/* 26 * Copyright 2011 Nexenta Systems, Inc. All rights reserved. 27 * Copyright 2012 Garrett D'Amore <garrett@damore.org>. All rights reserved. 28 * Copyright (c) 2014 by Delphix. All rights reserved. 29 */ 30 31#include <sys/zfs_context.h> 32 33int taskq_now; 34taskq_t *system_taskq; 35 36#define TASKQ_ACTIVE 0x00010000 37#define TASKQ_NAMELEN 31 38 39struct taskq { 40 char tq_name[TASKQ_NAMELEN + 1]; 41 kmutex_t tq_lock; 42 krwlock_t tq_threadlock; 43 kcondvar_t tq_dispatch_cv; 44 kcondvar_t tq_wait_cv; 45 thread_t *tq_threadlist; 46 int tq_flags; 47 int tq_active; 48 int tq_nthreads; 49 int tq_nalloc; 50 int tq_minalloc; 51 int tq_maxalloc; 52 kcondvar_t tq_maxalloc_cv; 53 int tq_maxalloc_wait; 54 taskq_ent_t *tq_freelist; 55 taskq_ent_t tq_task; 56}; 57 58static taskq_ent_t * 59task_alloc(taskq_t *tq, int tqflags) 60{ 61 taskq_ent_t *t; 62 int rv; 63 64again: if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) { 65 tq->tq_freelist = t->tqent_next; 66 } else { 67 if (tq->tq_nalloc >= tq->tq_maxalloc) { 68 if (!(tqflags & KM_SLEEP)) 69 return (NULL); 70 71 /* 72 * We don't want to exceed tq_maxalloc, but we can't 73 * wait for other tasks to complete (and thus free up 74 * task structures) without risking deadlock with 75 * the caller. So, we just delay for one second 76 * to throttle the allocation rate. If we have tasks 77 * complete before one second timeout expires then 78 * taskq_ent_free will signal us and we will 79 * immediately retry the allocation. 80 */ 81 tq->tq_maxalloc_wait++; 82#ifdef __FreeBSD__ 83 rv = cv_timedwait(&tq->tq_maxalloc_cv, 84 &tq->tq_lock, hz); 85#else 86 rv = cv_timedwait(&tq->tq_maxalloc_cv, 87 &tq->tq_lock, ddi_get_lbolt() + hz); 88#endif 89 tq->tq_maxalloc_wait--; 90 if (rv > 0) 91 goto again; /* signaled */ 92 } 93 mutex_exit(&tq->tq_lock); 94 95 t = kmem_alloc(sizeof (taskq_ent_t), tqflags & KM_SLEEP); 96 97 mutex_enter(&tq->tq_lock); 98 if (t != NULL) 99 tq->tq_nalloc++; 100 } 101 return (t); 102} 103 104static void 105task_free(taskq_t *tq, taskq_ent_t *t) 106{ 107 if (tq->tq_nalloc <= tq->tq_minalloc) { 108 t->tqent_next = tq->tq_freelist; 109 tq->tq_freelist = t; 110 } else { 111 tq->tq_nalloc--; 112 mutex_exit(&tq->tq_lock); 113 kmem_free(t, sizeof (taskq_ent_t)); 114 mutex_enter(&tq->tq_lock); 115 } 116 117 if (tq->tq_maxalloc_wait) 118 cv_signal(&tq->tq_maxalloc_cv); 119} 120 121taskqid_t 122taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags) 123{ 124 taskq_ent_t *t; 125 126 if (taskq_now) { 127 func(arg); 128 return (1); 129 } 130 131 mutex_enter(&tq->tq_lock); 132 ASSERT(tq->tq_flags & TASKQ_ACTIVE); 133 if ((t = task_alloc(tq, tqflags)) == NULL) { 134 mutex_exit(&tq->tq_lock); 135 return (0); 136 } 137 if (tqflags & TQ_FRONT) { 138 t->tqent_next = tq->tq_task.tqent_next; 139 t->tqent_prev = &tq->tq_task; 140 } else { 141 t->tqent_next = &tq->tq_task; 142 t->tqent_prev = tq->tq_task.tqent_prev; 143 } 144 t->tqent_next->tqent_prev = t; 145 t->tqent_prev->tqent_next = t; 146 t->tqent_func = func; 147 t->tqent_arg = arg; 148 t->tqent_flags = 0; 149 cv_signal(&tq->tq_dispatch_cv); 150 mutex_exit(&tq->tq_lock); 151 return (1); 152} 153 154void 155taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, 156 taskq_ent_t *t) 157{ 158 ASSERT(func != NULL); 159 ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC)); 160 161 /* 162 * Mark it as a prealloc'd task. This is important 163 * to ensure that we don't free it later. 164 */ 165 t->tqent_flags |= TQENT_FLAG_PREALLOC; 166 /* 167 * Enqueue the task to the underlying queue. 168 */ 169 mutex_enter(&tq->tq_lock); 170 171 if (flags & TQ_FRONT) { 172 t->tqent_next = tq->tq_task.tqent_next; 173 t->tqent_prev = &tq->tq_task; 174 } else { 175 t->tqent_next = &tq->tq_task; 176 t->tqent_prev = tq->tq_task.tqent_prev; 177 } 178 t->tqent_next->tqent_prev = t; 179 t->tqent_prev->tqent_next = t; 180 t->tqent_func = func; 181 t->tqent_arg = arg; 182 cv_signal(&tq->tq_dispatch_cv); 183 mutex_exit(&tq->tq_lock); 184} 185 186void 187taskq_wait(taskq_t *tq) 188{ 189 mutex_enter(&tq->tq_lock); 190 while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0) 191 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 192 mutex_exit(&tq->tq_lock); 193} 194 195void 196taskq_wait_id(taskq_t *tq, taskqid_t id) 197{ 198 taskq_wait(tq); 199} 200 201static void * 202taskq_thread(void *arg) 203{ 204 taskq_t *tq = arg; 205 taskq_ent_t *t; 206 boolean_t prealloc; 207 208 mutex_enter(&tq->tq_lock); 209 while (tq->tq_flags & TASKQ_ACTIVE) { 210 if ((t = tq->tq_task.tqent_next) == &tq->tq_task) { 211 if (--tq->tq_active == 0) 212 cv_broadcast(&tq->tq_wait_cv); 213 cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock); 214 tq->tq_active++; 215 continue; 216 } 217 t->tqent_prev->tqent_next = t->tqent_next; 218 t->tqent_next->tqent_prev = t->tqent_prev; 219 t->tqent_next = NULL; 220 t->tqent_prev = NULL; 221 prealloc = t->tqent_flags & TQENT_FLAG_PREALLOC; 222 mutex_exit(&tq->tq_lock); 223 224 rw_enter(&tq->tq_threadlock, RW_READER); 225 t->tqent_func(t->tqent_arg); 226 rw_exit(&tq->tq_threadlock); 227 228 mutex_enter(&tq->tq_lock); 229 if (!prealloc) 230 task_free(tq, t); 231 } 232 tq->tq_nthreads--; 233 cv_broadcast(&tq->tq_wait_cv); 234 mutex_exit(&tq->tq_lock); 235 return (NULL); 236} 237 238/*ARGSUSED*/ 239taskq_t * 240taskq_create(const char *name, int nthreads, pri_t pri, 241 int minalloc, int maxalloc, uint_t flags) 242{ 243 taskq_t *tq = kmem_zalloc(sizeof (taskq_t), KM_SLEEP); 244 int t; 245 246 if (flags & TASKQ_THREADS_CPU_PCT) { 247 int pct; 248 ASSERT3S(nthreads, >=, 0); 249 ASSERT3S(nthreads, <=, 100); 250 pct = MIN(nthreads, 100); 251 pct = MAX(pct, 0); 252 253 nthreads = (sysconf(_SC_NPROCESSORS_ONLN) * pct) / 100; 254 nthreads = MAX(nthreads, 1); /* need at least 1 thread */ 255 } else { 256 ASSERT3S(nthreads, >=, 1); 257 } 258 259 rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL); 260 mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL); 261 cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL); 262 cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL); 263 cv_init(&tq->tq_maxalloc_cv, NULL, CV_DEFAULT, NULL); 264 (void) strncpy(tq->tq_name, name, TASKQ_NAMELEN + 1); 265 tq->tq_flags = flags | TASKQ_ACTIVE; 266 tq->tq_active = nthreads; 267 tq->tq_nthreads = nthreads; 268 tq->tq_minalloc = minalloc; 269 tq->tq_maxalloc = maxalloc; 270 tq->tq_task.tqent_next = &tq->tq_task; 271 tq->tq_task.tqent_prev = &tq->tq_task; 272 tq->tq_threadlist = kmem_alloc(nthreads * sizeof (thread_t), KM_SLEEP); 273 274 if (flags & TASKQ_PREPOPULATE) { 275 mutex_enter(&tq->tq_lock); 276 while (minalloc-- > 0) 277 task_free(tq, task_alloc(tq, KM_SLEEP)); 278 mutex_exit(&tq->tq_lock); 279 } 280 281 for (t = 0; t < nthreads; t++) 282 (void) thr_create(0, 0, taskq_thread, 283 tq, THR_BOUND, &tq->tq_threadlist[t]); 284 285 return (tq); 286} 287 288void 289taskq_destroy(taskq_t *tq) 290{ 291 int t; 292 int nthreads = tq->tq_nthreads; 293 294 taskq_wait(tq); 295 296 mutex_enter(&tq->tq_lock); 297 298 tq->tq_flags &= ~TASKQ_ACTIVE; 299 cv_broadcast(&tq->tq_dispatch_cv); 300 301 while (tq->tq_nthreads != 0) 302 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 303 304 tq->tq_minalloc = 0; 305 while (tq->tq_nalloc != 0) { 306 ASSERT(tq->tq_freelist != NULL); 307 task_free(tq, task_alloc(tq, KM_SLEEP)); 308 } 309 310 mutex_exit(&tq->tq_lock); 311 312 for (t = 0; t < nthreads; t++) 313 (void) thr_join(tq->tq_threadlist[t], NULL, NULL); 314 315 kmem_free(tq->tq_threadlist, nthreads * sizeof (thread_t)); 316 317 rw_destroy(&tq->tq_threadlock); 318 mutex_destroy(&tq->tq_lock); 319 cv_destroy(&tq->tq_dispatch_cv); 320 cv_destroy(&tq->tq_wait_cv); 321 cv_destroy(&tq->tq_maxalloc_cv); 322 323 kmem_free(tq, sizeof (taskq_t)); 324} 325 326int 327taskq_member(taskq_t *tq, void *t) 328{ 329 int i; 330 331 if (taskq_now) 332 return (1); 333 334 for (i = 0; i < tq->tq_nthreads; i++) 335 if (tq->tq_threadlist[i] == (thread_t)(uintptr_t)t) 336 return (1); 337 338 return (0); 339} 340 341void 342system_taskq_init(void) 343{ 344 system_taskq = taskq_create("system_taskq", 64, minclsyspri, 4, 512, 345 TASKQ_DYNAMIC | TASKQ_PREPOPULATE); 346} 347 348void 349system_taskq_fini(void) 350{ 351 taskq_destroy(system_taskq); 352 system_taskq = NULL; /* defensive */ 353} 354