1/* 2 * Copyright (c) 2008-2013 Apple Inc. All rights reserved. 3 * 4 * @APPLE_APACHE_LICENSE_HEADER_START@ 5 * 6 * Licensed under the Apache License, Version 2.0 (the "License"); 7 * you may not use this file except in compliance with the License. 8 * You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 * 18 * @APPLE_APACHE_LICENSE_HEADER_END@ 19 */ 20 21#include "internal.h" 22#if HAVE_MACH 23#include "protocol.h" 24#endif 25 26#if (!HAVE_PTHREAD_WORKQUEUES || DISPATCH_DEBUG) && \ 27 !defined(DISPATCH_ENABLE_THREAD_POOL) 28#define DISPATCH_ENABLE_THREAD_POOL 1 29#endif 30#if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES || DISPATCH_ENABLE_THREAD_POOL 31#define DISPATCH_USE_PTHREAD_POOL 1 32#endif 33#if HAVE_PTHREAD_WORKQUEUES && (!HAVE_PTHREAD_WORKQUEUE_QOS || DISPATCH_DEBUG) \ 34 && !defined(DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK) 35#define DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK 1 36#endif 37#if HAVE_PTHREAD_WORKQUEUES && DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK && \ 38 !HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP && \ 39 !defined(DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK) 40#define DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 1 41#endif 42#if HAVE_PTHREAD_WORKQUEUE_QOS && !DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK 43#undef HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP 44#define HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP 0 45#endif 46#if HAVE_PTHREAD_WORKQUEUES && DISPATCH_USE_PTHREAD_POOL && \ 47 !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 48#define pthread_workqueue_t void* 49#endif 50 51static void _dispatch_cache_cleanup(void *value); 52static void _dispatch_async_f_redirect(dispatch_queue_t dq, 53 dispatch_continuation_t dc, pthread_priority_t pp); 54static void _dispatch_queue_cleanup(void *ctxt); 55static inline void _dispatch_queue_wakeup_global2(dispatch_queue_t dq, 56 unsigned int n); 57static inline void _dispatch_queue_wakeup_global(dispatch_queue_t dq); 58static inline _dispatch_thread_semaphore_t 59 _dispatch_queue_drain_one_barrier_sync(dispatch_queue_t dq); 60static inline bool _dispatch_queue_prepare_override(dispatch_queue_t dq, 61 dispatch_queue_t tq, pthread_priority_t p); 62static inline void _dispatch_queue_push_override(dispatch_queue_t dq, 63 dispatch_queue_t tq, pthread_priority_t p); 64#if HAVE_PTHREAD_WORKQUEUES 65static void _dispatch_worker_thread4(void *context); 66#if HAVE_PTHREAD_WORKQUEUE_QOS 67static void _dispatch_worker_thread3(pthread_priority_t priority); 68#endif 69#if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP 70static void _dispatch_worker_thread2(int priority, int options, void *context); 71#endif 72#endif 73#if DISPATCH_USE_PTHREAD_POOL 74static void *_dispatch_worker_thread(void *context); 75static int _dispatch_pthread_sigmask(int how, sigset_t *set, sigset_t *oset); 76#endif 77 78#if DISPATCH_COCOA_COMPAT 79static dispatch_once_t _dispatch_main_q_port_pred; 80static dispatch_queue_t _dispatch_main_queue_wakeup(void); 81unsigned long _dispatch_runloop_queue_wakeup(dispatch_queue_t dq); 82static void _dispatch_runloop_queue_port_init(void *ctxt); 83static void _dispatch_runloop_queue_port_dispose(dispatch_queue_t dq); 84#endif 85 86static void _dispatch_root_queues_init(void *context); 87static dispatch_once_t _dispatch_root_queues_pred; 88 89#pragma mark - 90#pragma mark dispatch_root_queue 91 92struct dispatch_pthread_root_queue_context_s { 93 pthread_attr_t dpq_thread_attr; 94 dispatch_block_t dpq_thread_configure; 95 struct dispatch_semaphore_s dpq_thread_mediator; 96}; 97typedef struct dispatch_pthread_root_queue_context_s * 98 dispatch_pthread_root_queue_context_t; 99 100#if DISPATCH_ENABLE_THREAD_POOL 101static struct dispatch_pthread_root_queue_context_s 102 _dispatch_pthread_root_queue_contexts[] = { 103 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS] = { 104 .dpq_thread_mediator = { 105 .do_vtable = DISPATCH_VTABLE(semaphore), 106 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 107 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 108 }}, 109 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT] = { 110 .dpq_thread_mediator = { 111 .do_vtable = DISPATCH_VTABLE(semaphore), 112 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 113 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 114 }}, 115 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS] = { 116 .dpq_thread_mediator = { 117 .do_vtable = DISPATCH_VTABLE(semaphore), 118 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 119 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 120 }}, 121 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT] = { 122 .dpq_thread_mediator = { 123 .do_vtable = DISPATCH_VTABLE(semaphore), 124 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 125 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 126 }}, 127 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS] = { 128 .dpq_thread_mediator = { 129 .do_vtable = DISPATCH_VTABLE(semaphore), 130 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 131 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 132 }}, 133 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT] = { 134 .dpq_thread_mediator = { 135 .do_vtable = DISPATCH_VTABLE(semaphore), 136 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 137 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 138 }}, 139 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS] = { 140 .dpq_thread_mediator = { 141 .do_vtable = DISPATCH_VTABLE(semaphore), 142 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 143 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 144 }}, 145 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT] = { 146 .dpq_thread_mediator = { 147 .do_vtable = DISPATCH_VTABLE(semaphore), 148 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 149 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 150 }}, 151 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS] = { 152 .dpq_thread_mediator = { 153 .do_vtable = DISPATCH_VTABLE(semaphore), 154 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 155 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 156 }}, 157 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT] = { 158 .dpq_thread_mediator = { 159 .do_vtable = DISPATCH_VTABLE(semaphore), 160 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 161 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 162 }}, 163 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS] = { 164 .dpq_thread_mediator = { 165 .do_vtable = DISPATCH_VTABLE(semaphore), 166 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 167 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 168 }}, 169 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT] = { 170 .dpq_thread_mediator = { 171 .do_vtable = DISPATCH_VTABLE(semaphore), 172 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 173 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 174 }}, 175}; 176#endif 177 178#define MAX_PTHREAD_COUNT 255 179 180struct dispatch_root_queue_context_s { 181 union { 182 struct { 183 unsigned int volatile dgq_pending; 184#if HAVE_PTHREAD_WORKQUEUES 185 qos_class_t dgq_qos; 186 int dgq_wq_priority, dgq_wq_options; 187#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL 188 pthread_workqueue_t dgq_kworkqueue; 189#endif 190#endif // HAVE_PTHREAD_WORKQUEUES 191#if DISPATCH_USE_PTHREAD_POOL 192 void *dgq_ctxt; 193 uint32_t volatile dgq_thread_pool_size; 194#endif 195 }; 196 char _dgq_pad[DISPATCH_CACHELINE_SIZE]; 197 }; 198}; 199typedef struct dispatch_root_queue_context_s *dispatch_root_queue_context_t; 200 201DISPATCH_CACHELINE_ALIGN 202static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = { 203 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS] = {{{ 204#if HAVE_PTHREAD_WORKQUEUES 205 .dgq_qos = _DISPATCH_QOS_CLASS_MAINTENANCE, 206 .dgq_wq_priority = WORKQ_BG_PRIOQUEUE, 207 .dgq_wq_options = 0, 208#endif 209#if DISPATCH_ENABLE_THREAD_POOL 210 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[ 211 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS], 212#endif 213 }}}, 214 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT] = {{{ 215#if HAVE_PTHREAD_WORKQUEUES 216 .dgq_qos = _DISPATCH_QOS_CLASS_MAINTENANCE, 217 .dgq_wq_priority = WORKQ_BG_PRIOQUEUE, 218 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT, 219#endif 220#if DISPATCH_ENABLE_THREAD_POOL 221 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[ 222 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT], 223#endif 224 }}}, 225 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS] = {{{ 226#if HAVE_PTHREAD_WORKQUEUES 227 .dgq_qos = _DISPATCH_QOS_CLASS_BACKGROUND, 228 .dgq_wq_priority = WORKQ_BG_PRIOQUEUE, 229 .dgq_wq_options = 0, 230#endif 231#if DISPATCH_ENABLE_THREAD_POOL 232 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[ 233 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS], 234#endif 235 }}}, 236 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT] = {{{ 237#if HAVE_PTHREAD_WORKQUEUES 238 .dgq_qos = _DISPATCH_QOS_CLASS_BACKGROUND, 239 .dgq_wq_priority = WORKQ_BG_PRIOQUEUE, 240 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT, 241#endif 242#if DISPATCH_ENABLE_THREAD_POOL 243 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[ 244 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT], 245#endif 246 }}}, 247 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS] = {{{ 248#if HAVE_PTHREAD_WORKQUEUES 249 .dgq_qos = _DISPATCH_QOS_CLASS_UTILITY, 250 .dgq_wq_priority = WORKQ_LOW_PRIOQUEUE, 251 .dgq_wq_options = 0, 252#endif 253#if DISPATCH_ENABLE_THREAD_POOL 254 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[ 255 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS], 256#endif 257 }}}, 258 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT] = {{{ 259#if HAVE_PTHREAD_WORKQUEUES 260 .dgq_qos = _DISPATCH_QOS_CLASS_UTILITY, 261 .dgq_wq_priority = WORKQ_LOW_PRIOQUEUE, 262 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT, 263#endif 264#if DISPATCH_ENABLE_THREAD_POOL 265 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[ 266 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT], 267#endif 268 }}}, 269 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS] = {{{ 270#if HAVE_PTHREAD_WORKQUEUES 271 .dgq_qos = _DISPATCH_QOS_CLASS_DEFAULT, 272 .dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE, 273 .dgq_wq_options = 0, 274#endif 275#if DISPATCH_ENABLE_THREAD_POOL 276 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[ 277 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS], 278#endif 279 }}}, 280 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT] = {{{ 281#if HAVE_PTHREAD_WORKQUEUES 282 .dgq_qos = _DISPATCH_QOS_CLASS_DEFAULT, 283 .dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE, 284 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT, 285#endif 286#if DISPATCH_ENABLE_THREAD_POOL 287 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[ 288 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT], 289#endif 290 }}}, 291 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS] = {{{ 292#if HAVE_PTHREAD_WORKQUEUES 293 .dgq_qos = _DISPATCH_QOS_CLASS_USER_INITIATED, 294 .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE, 295 .dgq_wq_options = 0, 296#endif 297#if DISPATCH_ENABLE_THREAD_POOL 298 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[ 299 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS], 300#endif 301 }}}, 302 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT] = {{{ 303#if HAVE_PTHREAD_WORKQUEUES 304 .dgq_qos = _DISPATCH_QOS_CLASS_USER_INITIATED, 305 .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE, 306 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT, 307#endif 308#if DISPATCH_ENABLE_THREAD_POOL 309 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[ 310 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT], 311#endif 312 }}}, 313 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS] = {{{ 314#if HAVE_PTHREAD_WORKQUEUES 315 .dgq_qos = _DISPATCH_QOS_CLASS_USER_INTERACTIVE, 316 .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE, 317 .dgq_wq_options = 0, 318#endif 319#if DISPATCH_ENABLE_THREAD_POOL 320 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[ 321 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS], 322#endif 323 }}}, 324 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT] = {{{ 325#if HAVE_PTHREAD_WORKQUEUES 326 .dgq_qos = _DISPATCH_QOS_CLASS_USER_INTERACTIVE, 327 .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE, 328 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT, 329#endif 330#if DISPATCH_ENABLE_THREAD_POOL 331 .dgq_ctxt = &_dispatch_pthread_root_queue_contexts[ 332 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT], 333#endif 334 }}}, 335}; 336 337// 6618342 Contact the team that owns the Instrument DTrace probe before 338// renaming this symbol 339// dq_running is set to 2 so that barrier operations go through the slow path 340DISPATCH_CACHELINE_ALIGN 341struct dispatch_queue_s _dispatch_root_queues[] = { 342 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS] = { 343 .do_vtable = DISPATCH_VTABLE(queue_root), 344 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 345 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 346 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK, 347 .do_ctxt = &_dispatch_root_queue_contexts[ 348 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS], 349 .dq_label = "com.apple.root.maintenance-qos", 350 .dq_running = 2, 351 .dq_width = DISPATCH_QUEUE_WIDTH_MAX, 352 .dq_serialnum = 4, 353 }, 354 [DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT] = { 355 .do_vtable = DISPATCH_VTABLE(queue_root), 356 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 357 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 358 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK, 359 .do_ctxt = &_dispatch_root_queue_contexts[ 360 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT], 361 .dq_label = "com.apple.root.maintenance-qos.overcommit", 362 .dq_running = 2, 363 .dq_width = DISPATCH_QUEUE_WIDTH_MAX, 364 .dq_serialnum = 5, 365 }, 366 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS] = { 367 .do_vtable = DISPATCH_VTABLE(queue_root), 368 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 369 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 370 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK, 371 .do_ctxt = &_dispatch_root_queue_contexts[ 372 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS], 373 .dq_label = "com.apple.root.background-qos", 374 .dq_running = 2, 375 .dq_width = DISPATCH_QUEUE_WIDTH_MAX, 376 .dq_serialnum = 6, 377 }, 378 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT] = { 379 .do_vtable = DISPATCH_VTABLE(queue_root), 380 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 381 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 382 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK, 383 .do_ctxt = &_dispatch_root_queue_contexts[ 384 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT], 385 .dq_label = "com.apple.root.background-qos.overcommit", 386 .dq_running = 2, 387 .dq_width = DISPATCH_QUEUE_WIDTH_MAX, 388 .dq_serialnum = 7, 389 }, 390 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS] = { 391 .do_vtable = DISPATCH_VTABLE(queue_root), 392 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 393 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 394 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK, 395 .do_ctxt = &_dispatch_root_queue_contexts[ 396 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS], 397 .dq_label = "com.apple.root.utility-qos", 398 .dq_running = 2, 399 .dq_width = DISPATCH_QUEUE_WIDTH_MAX, 400 .dq_serialnum = 8, 401 }, 402 [DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT] = { 403 .do_vtable = DISPATCH_VTABLE(queue_root), 404 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 405 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 406 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK, 407 .do_ctxt = &_dispatch_root_queue_contexts[ 408 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT], 409 .dq_label = "com.apple.root.utility-qos.overcommit", 410 .dq_running = 2, 411 .dq_width = DISPATCH_QUEUE_WIDTH_MAX, 412 .dq_serialnum = 9, 413 }, 414 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS] = { 415 .do_vtable = DISPATCH_VTABLE(queue_root), 416 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 417 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 418 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK, 419 .do_ctxt = &_dispatch_root_queue_contexts[ 420 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS], 421 .dq_label = "com.apple.root.default-qos", 422 .dq_running = 2, 423 .dq_width = DISPATCH_QUEUE_WIDTH_MAX, 424 .dq_serialnum = 10, 425 }, 426 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT] = { 427 .do_vtable = DISPATCH_VTABLE(queue_root), 428 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 429 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 430 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK, 431 .do_ctxt = &_dispatch_root_queue_contexts[ 432 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT], 433 .dq_label = "com.apple.root.default-qos.overcommit", 434 .dq_running = 2, 435 .dq_width = DISPATCH_QUEUE_WIDTH_MAX, 436 .dq_serialnum = 11, 437 }, 438 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS] = { 439 .do_vtable = DISPATCH_VTABLE(queue_root), 440 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 441 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 442 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK, 443 .do_ctxt = &_dispatch_root_queue_contexts[ 444 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS], 445 .dq_label = "com.apple.root.user-initiated-qos", 446 .dq_running = 2, 447 .dq_width = DISPATCH_QUEUE_WIDTH_MAX, 448 .dq_serialnum = 12, 449 }, 450 [DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT] = { 451 .do_vtable = DISPATCH_VTABLE(queue_root), 452 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 453 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 454 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK, 455 .do_ctxt = &_dispatch_root_queue_contexts[ 456 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT], 457 .dq_label = "com.apple.root.user-initiated-qos.overcommit", 458 .dq_running = 2, 459 .dq_width = DISPATCH_QUEUE_WIDTH_MAX, 460 .dq_serialnum = 13, 461 }, 462 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS] = { 463 .do_vtable = DISPATCH_VTABLE(queue_root), 464 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 465 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 466 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK, 467 .do_ctxt = &_dispatch_root_queue_contexts[ 468 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS], 469 .dq_label = "com.apple.root.user-interactive-qos", 470 .dq_running = 2, 471 .dq_width = DISPATCH_QUEUE_WIDTH_MAX, 472 .dq_serialnum = 14, 473 }, 474 [DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT] = { 475 .do_vtable = DISPATCH_VTABLE(queue_root), 476 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 477 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 478 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK, 479 .do_ctxt = &_dispatch_root_queue_contexts[ 480 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT], 481 .dq_label = "com.apple.root.user-interactive-qos.overcommit", 482 .dq_running = 2, 483 .dq_width = DISPATCH_QUEUE_WIDTH_MAX, 484 .dq_serialnum = 15, 485 }, 486}; 487 488#if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP 489static const dispatch_queue_t _dispatch_wq2root_queues[][2] = { 490 [WORKQ_BG_PRIOQUEUE][0] = &_dispatch_root_queues[ 491 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS], 492 [WORKQ_BG_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] = 493 &_dispatch_root_queues[ 494 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT], 495 [WORKQ_LOW_PRIOQUEUE][0] = &_dispatch_root_queues[ 496 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS], 497 [WORKQ_LOW_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] = 498 &_dispatch_root_queues[ 499 DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT], 500 [WORKQ_DEFAULT_PRIOQUEUE][0] = &_dispatch_root_queues[ 501 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS], 502 [WORKQ_DEFAULT_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] = 503 &_dispatch_root_queues[ 504 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT], 505 [WORKQ_HIGH_PRIOQUEUE][0] = &_dispatch_root_queues[ 506 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS], 507 [WORKQ_HIGH_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] = 508 &_dispatch_root_queues[ 509 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT], 510}; 511#endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP 512 513#define DISPATCH_PRIORITY_COUNT 5 514 515enum { 516 // No DISPATCH_PRIORITY_IDX_MAINTENANCE define because there is no legacy 517 // maintenance priority 518 DISPATCH_PRIORITY_IDX_BACKGROUND = 0, 519 DISPATCH_PRIORITY_IDX_NON_INTERACTIVE, 520 DISPATCH_PRIORITY_IDX_LOW, 521 DISPATCH_PRIORITY_IDX_DEFAULT, 522 DISPATCH_PRIORITY_IDX_HIGH, 523}; 524 525static qos_class_t _dispatch_priority2qos[] = { 526 [DISPATCH_PRIORITY_IDX_BACKGROUND] = _DISPATCH_QOS_CLASS_BACKGROUND, 527 [DISPATCH_PRIORITY_IDX_NON_INTERACTIVE] = _DISPATCH_QOS_CLASS_UTILITY, 528 [DISPATCH_PRIORITY_IDX_LOW] = _DISPATCH_QOS_CLASS_UTILITY, 529 [DISPATCH_PRIORITY_IDX_DEFAULT] = _DISPATCH_QOS_CLASS_DEFAULT, 530 [DISPATCH_PRIORITY_IDX_HIGH] = _DISPATCH_QOS_CLASS_USER_INITIATED, 531}; 532 533#if HAVE_PTHREAD_WORKQUEUE_QOS 534static const int _dispatch_priority2wq[] = { 535 [DISPATCH_PRIORITY_IDX_BACKGROUND] = WORKQ_BG_PRIOQUEUE, 536 [DISPATCH_PRIORITY_IDX_NON_INTERACTIVE] = WORKQ_NON_INTERACTIVE_PRIOQUEUE, 537 [DISPATCH_PRIORITY_IDX_LOW] = WORKQ_LOW_PRIOQUEUE, 538 [DISPATCH_PRIORITY_IDX_DEFAULT] = WORKQ_DEFAULT_PRIOQUEUE, 539 [DISPATCH_PRIORITY_IDX_HIGH] = WORKQ_HIGH_PRIOQUEUE, 540}; 541#endif 542 543#if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES 544static struct dispatch_queue_s _dispatch_mgr_root_queue; 545#else 546#define _dispatch_mgr_root_queue \ 547 _dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY] 548#endif 549 550// 6618342 Contact the team that owns the Instrument DTrace probe before 551// renaming this symbol 552DISPATCH_CACHELINE_ALIGN 553struct dispatch_queue_s _dispatch_mgr_q = { 554 .do_vtable = DISPATCH_VTABLE(queue_mgr), 555 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 556 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 557 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK, 558 .do_targetq = &_dispatch_mgr_root_queue, 559 .dq_label = "com.apple.libdispatch-manager", 560 .dq_width = 1, 561 .dq_is_thread_bound = 1, 562 .dq_serialnum = 2, 563}; 564 565dispatch_queue_t 566dispatch_get_global_queue(long priority, unsigned long flags) 567{ 568 if (flags & ~(unsigned long)DISPATCH_QUEUE_OVERCOMMIT) { 569 return NULL; 570 } 571 dispatch_once_f(&_dispatch_root_queues_pred, NULL, 572 _dispatch_root_queues_init); 573 qos_class_t qos; 574 switch (priority) { 575#if !RDAR_17878963 || DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK 576 case _DISPATCH_QOS_CLASS_MAINTENANCE: 577 if (!_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS] 578 .dq_priority) { 579 // map maintenance to background on old kernel 580 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_BACKGROUND]; 581 } else { 582 qos = (qos_class_t)priority; 583 } 584 break; 585#endif // RDAR_17878963 || DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK 586 case DISPATCH_QUEUE_PRIORITY_BACKGROUND: 587 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_BACKGROUND]; 588 break; 589 case DISPATCH_QUEUE_PRIORITY_NON_INTERACTIVE: 590 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_NON_INTERACTIVE]; 591 break; 592 case DISPATCH_QUEUE_PRIORITY_LOW: 593 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_LOW]; 594 break; 595 case DISPATCH_QUEUE_PRIORITY_DEFAULT: 596 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_DEFAULT]; 597 break; 598 case DISPATCH_QUEUE_PRIORITY_HIGH: 599 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_HIGH]; 600 break; 601 case _DISPATCH_QOS_CLASS_USER_INTERACTIVE: 602#if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK 603 if (!_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS] 604 .dq_priority) { 605 qos = _dispatch_priority2qos[DISPATCH_PRIORITY_IDX_HIGH]; 606 break; 607 } 608#endif 609 // fallthrough 610 default: 611 qos = (qos_class_t)priority; 612 break; 613 } 614 return _dispatch_get_root_queue(qos, flags & DISPATCH_QUEUE_OVERCOMMIT); 615} 616 617DISPATCH_ALWAYS_INLINE 618static inline dispatch_queue_t 619_dispatch_get_current_queue(void) 620{ 621 return _dispatch_queue_get_current() ?: 622 _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT, true); 623} 624 625dispatch_queue_t 626dispatch_get_current_queue(void) 627{ 628 return _dispatch_get_current_queue(); 629} 630 631DISPATCH_ALWAYS_INLINE 632static inline bool 633_dispatch_queue_targets_queue(dispatch_queue_t dq1, dispatch_queue_t dq2) 634{ 635 while (dq1) { 636 if (dq1 == dq2) { 637 return true; 638 } 639 dq1 = dq1->do_targetq; 640 } 641 return false; 642} 643 644#define DISPATCH_ASSERT_QUEUE_MESSAGE "BUG in client of libdispatch: " \ 645 "Assertion failed: Block was run on an unexpected queue" 646 647DISPATCH_NOINLINE 648static void 649_dispatch_assert_queue_fail(dispatch_queue_t dq, bool expected) 650{ 651 char *msg; 652 asprintf(&msg, "%s\n%s queue: 0x%p[%s]", DISPATCH_ASSERT_QUEUE_MESSAGE, 653 expected ? "Expected" : "Unexpected", dq, dq->dq_label ? 654 dq->dq_label : ""); 655 _dispatch_log("%s", msg); 656 _dispatch_set_crash_log_message(msg); 657 _dispatch_hardware_crash(); 658 free(msg); 659} 660 661void 662dispatch_assert_queue(dispatch_queue_t dq) 663{ 664 if (slowpath(!dq) || slowpath(!(dx_metatype(dq) == _DISPATCH_QUEUE_TYPE))) { 665 DISPATCH_CLIENT_CRASH("invalid queue passed to " 666 "dispatch_assert_queue()"); 667 } 668 dispatch_queue_t cq = _dispatch_queue_get_current(); 669 if (fastpath(cq) && fastpath(_dispatch_queue_targets_queue(cq, dq))) { 670 return; 671 } 672 _dispatch_assert_queue_fail(dq, true); 673} 674 675void 676dispatch_assert_queue_not(dispatch_queue_t dq) 677{ 678 if (slowpath(!dq) || slowpath(!(dx_metatype(dq) == _DISPATCH_QUEUE_TYPE))) { 679 DISPATCH_CLIENT_CRASH("invalid queue passed to " 680 "dispatch_assert_queue_not()"); 681 } 682 dispatch_queue_t cq = _dispatch_queue_get_current(); 683 if (slowpath(cq) && slowpath(_dispatch_queue_targets_queue(cq, dq))) { 684 _dispatch_assert_queue_fail(dq, false); 685 } 686} 687 688#if DISPATCH_DEBUG && DISPATCH_ROOT_QUEUE_DEBUG 689#define _dispatch_root_queue_debug(...) _dispatch_debug(__VA_ARGS__) 690#define _dispatch_debug_root_queue(...) dispatch_debug_queue(__VA_ARGS__) 691#else 692#define _dispatch_root_queue_debug(...) 693#define _dispatch_debug_root_queue(...) 694#endif 695 696#pragma mark - 697#pragma mark dispatch_init 698 699#if HAVE_PTHREAD_WORKQUEUE_QOS 700int _dispatch_set_qos_class_enabled; 701pthread_priority_t _dispatch_background_priority; 702pthread_priority_t _dispatch_user_initiated_priority; 703 704static void 705_dispatch_root_queues_init_qos(int supported) 706{ 707 pthread_priority_t p; 708 qos_class_t qos; 709 unsigned int i; 710 for (i = 0; i < DISPATCH_PRIORITY_COUNT; i++) { 711 p = _pthread_qos_class_encode_workqueue(_dispatch_priority2wq[i], 0); 712 qos = _pthread_qos_class_decode(p, NULL, NULL); 713 dispatch_assert(qos != _DISPATCH_QOS_CLASS_UNSPECIFIED); 714 _dispatch_priority2qos[i] = qos; 715 } 716 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) { 717 qos = _dispatch_root_queue_contexts[i].dgq_qos; 718 if (qos == _DISPATCH_QOS_CLASS_MAINTENANCE && 719 !(supported & WORKQ_FEATURE_MAINTENANCE)) { 720 continue; 721 } 722 unsigned long flags = i & 1 ? _PTHREAD_PRIORITY_OVERCOMMIT_FLAG : 0; 723 flags |= _PTHREAD_PRIORITY_ROOTQUEUE_FLAG; 724 if (i == DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS || 725 i == DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT) { 726 flags |= _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG; 727 } 728 p = _pthread_qos_class_encode(qos, 0, flags); 729 _dispatch_root_queues[i].dq_priority = p; 730 } 731 p = _pthread_qos_class_encode(qos_class_main(), 0, 0); 732 _dispatch_main_q.dq_priority = p; 733 _dispatch_queue_set_override_priority(&_dispatch_main_q); 734 _dispatch_background_priority = _dispatch_root_queues[ 735 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS].dq_priority & 736 ~_PTHREAD_PRIORITY_FLAGS_MASK; 737 _dispatch_user_initiated_priority = _dispatch_root_queues[ 738 DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS].dq_priority & 739 ~_PTHREAD_PRIORITY_FLAGS_MASK; 740 if (!slowpath(getenv("LIBDISPATCH_DISABLE_SET_QOS"))) { 741 _dispatch_set_qos_class_enabled = 1; 742 } 743} 744#endif 745 746static inline bool 747_dispatch_root_queues_init_workq(void) 748{ 749 bool result = false; 750#if HAVE_PTHREAD_WORKQUEUES 751 bool disable_wq = false; 752#if DISPATCH_ENABLE_THREAD_POOL && DISPATCH_DEBUG 753 disable_wq = slowpath(getenv("LIBDISPATCH_DISABLE_KWQ")); 754#endif 755 int r; 756#if HAVE_PTHREAD_WORKQUEUE_QOS 757 bool disable_qos = false; 758#if DISPATCH_DEBUG 759 disable_qos = slowpath(getenv("LIBDISPATCH_DISABLE_QOS")); 760#endif 761 if (!disable_qos && !disable_wq) { 762 r = _pthread_workqueue_supported(); 763 int supported = r; 764 if (r & WORKQ_FEATURE_FINEPRIO) { 765 r = _pthread_workqueue_init(_dispatch_worker_thread3, 766 offsetof(struct dispatch_queue_s, dq_serialnum), 0); 767 result = !r; 768 if (result) _dispatch_root_queues_init_qos(supported); 769 } 770 } 771#endif // HAVE_PTHREAD_WORKQUEUE_QOS 772#if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP 773 if (!result && !disable_wq) { 774#if PTHREAD_WORKQUEUE_SPI_VERSION >= 20121218 775 pthread_workqueue_setdispatchoffset_np( 776 offsetof(struct dispatch_queue_s, dq_serialnum)); 777#endif 778 r = pthread_workqueue_setdispatch_np(_dispatch_worker_thread2); 779#if !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 780 (void)dispatch_assume_zero(r); 781#endif 782 result = !r; 783 } 784#endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP 785#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL 786 if (!result) { 787#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 788 pthread_workqueue_attr_t pwq_attr; 789 if (!disable_wq) { 790 r = pthread_workqueue_attr_init_np(&pwq_attr); 791 (void)dispatch_assume_zero(r); 792 } 793#endif 794 int i; 795 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) { 796 pthread_workqueue_t pwq = NULL; 797 dispatch_root_queue_context_t qc; 798 qc = &_dispatch_root_queue_contexts[i]; 799#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 800 if (!disable_wq) { 801 r = pthread_workqueue_attr_setqueuepriority_np(&pwq_attr, 802 qc->dgq_wq_priority); 803 (void)dispatch_assume_zero(r); 804 r = pthread_workqueue_attr_setovercommit_np(&pwq_attr, 805 qc->dgq_wq_options & 806 WORKQ_ADDTHREADS_OPTION_OVERCOMMIT); 807 (void)dispatch_assume_zero(r); 808 r = pthread_workqueue_create_np(&pwq, &pwq_attr); 809 (void)dispatch_assume_zero(r); 810 result = result || dispatch_assume(pwq); 811 } 812#endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 813 qc->dgq_kworkqueue = pwq ? pwq : (void*)(~0ul); 814 } 815#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 816 if (!disable_wq) { 817 r = pthread_workqueue_attr_destroy_np(&pwq_attr); 818 (void)dispatch_assume_zero(r); 819 } 820#endif 821 } 822#endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_ENABLE_THREAD_POOL 823#endif // HAVE_PTHREAD_WORKQUEUES 824 return result; 825} 826 827#if DISPATCH_USE_PTHREAD_POOL 828static inline void 829_dispatch_root_queue_init_pthread_pool(dispatch_root_queue_context_t qc, 830 uint8_t pool_size, bool overcommit) 831{ 832 dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt; 833 uint32_t thread_pool_size = overcommit ? MAX_PTHREAD_COUNT : 834 dispatch_hw_config(active_cpus); 835 if (slowpath(pool_size) && pool_size < thread_pool_size) { 836 thread_pool_size = pool_size; 837 } 838 qc->dgq_thread_pool_size = thread_pool_size; 839 if (qc->dgq_qos) { 840 (void)dispatch_assume_zero(pthread_attr_init(&pqc->dpq_thread_attr)); 841 (void)dispatch_assume_zero(pthread_attr_setdetachstate( 842 &pqc->dpq_thread_attr, PTHREAD_CREATE_DETACHED)); 843#if HAVE_PTHREAD_WORKQUEUE_QOS 844 (void)dispatch_assume_zero(pthread_attr_set_qos_class_np( 845 &pqc->dpq_thread_attr, qc->dgq_qos, 0)); 846#endif 847 } 848#if USE_MACH_SEM 849 // override the default FIFO behavior for the pool semaphores 850 kern_return_t kr = semaphore_create(mach_task_self(), 851 &pqc->dpq_thread_mediator.dsema_port, SYNC_POLICY_LIFO, 0); 852 DISPATCH_VERIFY_MIG(kr); 853 (void)dispatch_assume_zero(kr); 854 (void)dispatch_assume(pqc->dpq_thread_mediator.dsema_port); 855#elif USE_POSIX_SEM 856 /* XXXRW: POSIX semaphores don't support LIFO? */ 857 int ret = sem_init(&pqc->dpq_thread_mediator.dsema_sem), 0, 0); 858 (void)dispatch_assume_zero(ret); 859#endif 860} 861#endif // DISPATCH_USE_PTHREAD_POOL 862 863static dispatch_once_t _dispatch_root_queues_pred; 864 865static void 866_dispatch_root_queues_init(void *context DISPATCH_UNUSED) 867{ 868 _dispatch_safe_fork = false; 869 if (!_dispatch_root_queues_init_workq()) { 870#if DISPATCH_ENABLE_THREAD_POOL 871 int i; 872 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) { 873 bool overcommit = true; 874#if TARGET_OS_EMBEDDED 875 // some software hangs if the non-overcommitting queues do not 876 // overcommit when threads block. Someday, this behavior should 877 // apply to all platforms 878 if (!(i & 1)) { 879 overcommit = false; 880 } 881#endif 882 _dispatch_root_queue_init_pthread_pool( 883 &_dispatch_root_queue_contexts[i], 0, overcommit); 884 } 885#else 886 DISPATCH_CRASH("Root queue initialization failed"); 887#endif // DISPATCH_ENABLE_THREAD_POOL 888 } 889} 890 891#define countof(x) (sizeof(x) / sizeof(x[0])) 892 893DISPATCH_EXPORT DISPATCH_NOTHROW 894void 895libdispatch_init(void) 896{ 897 dispatch_assert(DISPATCH_QUEUE_QOS_COUNT == 6); 898 dispatch_assert(DISPATCH_ROOT_QUEUE_COUNT == 12); 899 900 dispatch_assert(DISPATCH_QUEUE_PRIORITY_LOW == 901 -DISPATCH_QUEUE_PRIORITY_HIGH); 902 dispatch_assert(countof(_dispatch_root_queues) == 903 DISPATCH_ROOT_QUEUE_COUNT); 904 dispatch_assert(countof(_dispatch_root_queue_contexts) == 905 DISPATCH_ROOT_QUEUE_COUNT); 906 dispatch_assert(countof(_dispatch_priority2qos) == 907 DISPATCH_PRIORITY_COUNT); 908#if HAVE_PTHREAD_WORKQUEUE_QOS 909 dispatch_assert(countof(_dispatch_priority2wq) == 910 DISPATCH_PRIORITY_COUNT); 911#endif 912#if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP 913 dispatch_assert(sizeof(_dispatch_wq2root_queues) / 914 sizeof(_dispatch_wq2root_queues[0][0]) == 915 WORKQ_NUM_PRIOQUEUE * 2); 916#endif 917#if DISPATCH_ENABLE_THREAD_POOL 918 dispatch_assert(countof(_dispatch_pthread_root_queue_contexts) == 919 DISPATCH_ROOT_QUEUE_COUNT); 920#endif 921 922 dispatch_assert(offsetof(struct dispatch_continuation_s, do_next) == 923 offsetof(struct dispatch_object_s, do_next)); 924 dispatch_assert(sizeof(struct dispatch_apply_s) <= 925 DISPATCH_CONTINUATION_SIZE); 926 dispatch_assert(sizeof(struct dispatch_queue_s) % DISPATCH_CACHELINE_SIZE 927 == 0); 928 dispatch_assert(sizeof(struct dispatch_root_queue_context_s) % 929 DISPATCH_CACHELINE_SIZE == 0); 930 931 _dispatch_thread_key_create(&dispatch_queue_key, _dispatch_queue_cleanup); 932 _dispatch_thread_key_create(&dispatch_voucher_key, _voucher_thread_cleanup); 933 _dispatch_thread_key_create(&dispatch_cache_key, _dispatch_cache_cleanup); 934 _dispatch_thread_key_create(&dispatch_io_key, NULL); 935 _dispatch_thread_key_create(&dispatch_apply_key, NULL); 936 _dispatch_thread_key_create(&dispatch_defaultpriority_key, NULL); 937#if DISPATCH_PERF_MON && !DISPATCH_INTROSPECTION 938 _dispatch_thread_key_create(&dispatch_bcounter_key, NULL); 939#endif 940#if !DISPATCH_USE_OS_SEMAPHORE_CACHE 941 _dispatch_thread_key_create(&dispatch_sema4_key, 942 (void (*)(void *))_dispatch_thread_semaphore_dispose); 943#endif 944 945#if DISPATCH_USE_RESOLVERS // rdar://problem/8541707 946 _dispatch_main_q.do_targetq = &_dispatch_root_queues[ 947 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT]; 948#endif 949 950 _dispatch_thread_setspecific(dispatch_queue_key, &_dispatch_main_q); 951 _dispatch_queue_set_bound_thread(&_dispatch_main_q); 952 953#if DISPATCH_USE_PTHREAD_ATFORK 954 (void)dispatch_assume_zero(pthread_atfork(dispatch_atfork_prepare, 955 dispatch_atfork_parent, dispatch_atfork_child)); 956#endif 957 958 _dispatch_hw_config_init(); 959 _dispatch_vtable_init(); 960 _os_object_init(); 961 _voucher_init(); 962 _dispatch_introspection_init(); 963} 964 965#if HAVE_MACH 966static dispatch_once_t _dispatch_mach_host_port_pred; 967static mach_port_t _dispatch_mach_host_port; 968 969static void 970_dispatch_mach_host_port_init(void *ctxt DISPATCH_UNUSED) 971{ 972 kern_return_t kr; 973 mach_port_t mp, mhp = mach_host_self(); 974 kr = host_get_host_port(mhp, &mp); 975 DISPATCH_VERIFY_MIG(kr); 976 if (!kr) { 977 // mach_host_self returned the HOST_PRIV port 978 kr = mach_port_deallocate(mach_task_self(), mhp); 979 DISPATCH_VERIFY_MIG(kr); 980 (void)dispatch_assume_zero(kr); 981 mhp = mp; 982 } else if (kr != KERN_INVALID_ARGUMENT) { 983 (void)dispatch_assume_zero(kr); 984 } 985 if (!dispatch_assume(mhp)) { 986 DISPATCH_CRASH("Could not get unprivileged host port"); 987 } 988 _dispatch_mach_host_port = mhp; 989} 990 991mach_port_t 992_dispatch_get_mach_host_port(void) 993{ 994 dispatch_once_f(&_dispatch_mach_host_port_pred, NULL, 995 _dispatch_mach_host_port_init); 996 return _dispatch_mach_host_port; 997} 998#endif 999 1000DISPATCH_EXPORT DISPATCH_NOTHROW 1001void 1002dispatch_atfork_child(void) 1003{ 1004 void *crash = (void *)0x100; 1005 size_t i; 1006 1007#if HAVE_MACH 1008 _dispatch_mach_host_port_pred = 0; 1009 _dispatch_mach_host_port = MACH_VOUCHER_NULL; 1010#endif 1011 _voucher_atfork_child(); 1012 if (_dispatch_safe_fork) { 1013 return; 1014 } 1015 _dispatch_child_of_unsafe_fork = true; 1016 1017 _dispatch_main_q.dq_items_head = crash; 1018 _dispatch_main_q.dq_items_tail = crash; 1019 1020 _dispatch_mgr_q.dq_items_head = crash; 1021 _dispatch_mgr_q.dq_items_tail = crash; 1022 1023 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) { 1024 _dispatch_root_queues[i].dq_items_head = crash; 1025 _dispatch_root_queues[i].dq_items_tail = crash; 1026 } 1027} 1028 1029#pragma mark - 1030#pragma mark dispatch_queue_attr_t 1031 1032DISPATCH_ALWAYS_INLINE 1033static inline bool 1034_dispatch_qos_class_valid(dispatch_qos_class_t qos_class, int relative_priority) 1035{ 1036 qos_class_t qos = (qos_class_t)qos_class; 1037 switch (qos) { 1038 case _DISPATCH_QOS_CLASS_MAINTENANCE: 1039 case _DISPATCH_QOS_CLASS_BACKGROUND: 1040 case _DISPATCH_QOS_CLASS_UTILITY: 1041 case _DISPATCH_QOS_CLASS_DEFAULT: 1042 case _DISPATCH_QOS_CLASS_USER_INITIATED: 1043 case _DISPATCH_QOS_CLASS_USER_INTERACTIVE: 1044 case _DISPATCH_QOS_CLASS_UNSPECIFIED: 1045 break; 1046 default: 1047 return false; 1048 } 1049 if (relative_priority > 0 || relative_priority < QOS_MIN_RELATIVE_PRIORITY){ 1050 return false; 1051 } 1052 return true; 1053} 1054 1055#define DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(qos) \ 1056 [_DISPATCH_QOS_CLASS_##qos] = DQA_INDEX_QOS_CLASS_##qos 1057 1058static const 1059_dispatch_queue_attr_index_qos_class_t _dispatch_queue_attr_qos2idx[] = { 1060 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(UNSPECIFIED), 1061 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(MAINTENANCE), 1062 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(BACKGROUND), 1063 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(UTILITY), 1064 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(DEFAULT), 1065 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(USER_INITIATED), 1066 DISPATCH_QUEUE_ATTR_QOS2IDX_INITIALIZER(USER_INTERACTIVE), 1067}; 1068 1069#define DISPATCH_QUEUE_ATTR_OVERCOMMIT2IDX(overcommit) \ 1070 (overcommit ? DQA_INDEX_OVERCOMMIT : DQA_INDEX_NON_OVERCOMMIT) 1071 1072#define DISPATCH_QUEUE_ATTR_CONCURRENT2IDX(concurrent) \ 1073 (concurrent ? DQA_INDEX_CONCURRENT : DQA_INDEX_SERIAL) 1074 1075#define DISPATCH_QUEUE_ATTR_PRIO2IDX(prio) (-(prio)) 1076 1077#define DISPATCH_QUEUE_ATTR_QOS2IDX(qos) (_dispatch_queue_attr_qos2idx[(qos)]) 1078 1079static inline dispatch_queue_attr_t 1080_dispatch_get_queue_attr(qos_class_t qos, int prio, bool overcommit, 1081 bool concurrent) 1082{ 1083 return (dispatch_queue_attr_t)&_dispatch_queue_attrs 1084 [DISPATCH_QUEUE_ATTR_QOS2IDX(qos)] 1085 [DISPATCH_QUEUE_ATTR_PRIO2IDX(prio)] 1086 [DISPATCH_QUEUE_ATTR_OVERCOMMIT2IDX(overcommit)] 1087 [DISPATCH_QUEUE_ATTR_CONCURRENT2IDX(concurrent)]; 1088} 1089 1090dispatch_queue_attr_t 1091dispatch_queue_attr_make_with_qos_class(dispatch_queue_attr_t dqa, 1092 dispatch_qos_class_t qos_class, int relative_priority) 1093{ 1094 if (!_dispatch_qos_class_valid(qos_class, relative_priority)) return NULL; 1095 if (!slowpath(dqa)) { 1096 dqa = _dispatch_get_queue_attr(0, 0, false, false); 1097 } else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) { 1098 DISPATCH_CLIENT_CRASH("Invalid queue attribute"); 1099 } 1100 return _dispatch_get_queue_attr(qos_class, relative_priority, 1101 dqa->dqa_overcommit, dqa->dqa_concurrent); 1102} 1103 1104dispatch_queue_attr_t 1105dispatch_queue_attr_make_with_overcommit(dispatch_queue_attr_t dqa, 1106 bool overcommit) 1107{ 1108 if (!slowpath(dqa)) { 1109 dqa = _dispatch_get_queue_attr(0, 0, false, false); 1110 } else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) { 1111 DISPATCH_CLIENT_CRASH("Invalid queue attribute"); 1112 } 1113 return _dispatch_get_queue_attr(dqa->dqa_qos_class, 1114 dqa->dqa_relative_priority, overcommit, dqa->dqa_concurrent); 1115} 1116 1117#pragma mark - 1118#pragma mark dispatch_queue_t 1119 1120// skip zero 1121// 1 - main_q 1122// 2 - mgr_q 1123// 3 - mgr_root_q 1124// 4,5,6,7,8,9,10,11,12,13,14,15 - global queues 1125// we use 'xadd' on Intel, so the initial value == next assigned 1126unsigned long volatile _dispatch_queue_serial_numbers = 16; 1127 1128dispatch_queue_t 1129dispatch_queue_create_with_target(const char *label, dispatch_queue_attr_t dqa, 1130 dispatch_queue_t tq) 1131{ 1132#if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK 1133 // Be sure the root queue priorities are set 1134 dispatch_once_f(&_dispatch_root_queues_pred, NULL, 1135 _dispatch_root_queues_init); 1136#endif 1137 bool disallow_tq = (slowpath(dqa) && dqa != DISPATCH_QUEUE_CONCURRENT); 1138 if (!slowpath(dqa)) { 1139 dqa = _dispatch_get_queue_attr(0, 0, false, false); 1140 } else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) { 1141 DISPATCH_CLIENT_CRASH("Invalid queue attribute"); 1142 } 1143 dispatch_queue_t dq = _dispatch_alloc(DISPATCH_VTABLE(queue), 1144 sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD); 1145 _dispatch_queue_init(dq); 1146 if (label) { 1147 dq->dq_label = strdup(label); 1148 } 1149 qos_class_t qos = dqa->dqa_qos_class; 1150 bool overcommit = dqa->dqa_overcommit; 1151#if HAVE_PTHREAD_WORKQUEUE_QOS 1152 dq->dq_priority = _pthread_qos_class_encode(qos, dqa->dqa_relative_priority, 1153 overcommit); 1154#endif 1155 if (dqa->dqa_concurrent) { 1156 dq->dq_width = DISPATCH_QUEUE_WIDTH_MAX; 1157 } else { 1158 // Default serial queue target queue is overcommit! 1159 overcommit = true; 1160 } 1161 if (!tq) { 1162 if (qos == _DISPATCH_QOS_CLASS_UNSPECIFIED) { 1163 qos = _DISPATCH_QOS_CLASS_DEFAULT; 1164 } 1165#if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK 1166 if (qos == _DISPATCH_QOS_CLASS_USER_INTERACTIVE && 1167 !_dispatch_root_queues[ 1168 DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS].dq_priority) { 1169 qos = _DISPATCH_QOS_CLASS_USER_INITIATED; 1170 } 1171#endif 1172 bool maintenance_fallback = false; 1173#if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK 1174 maintenance_fallback = true; 1175#endif // DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK 1176 if (maintenance_fallback) { 1177 if (qos == _DISPATCH_QOS_CLASS_MAINTENANCE && 1178 !_dispatch_root_queues[ 1179 DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS].dq_priority) { 1180 qos = _DISPATCH_QOS_CLASS_BACKGROUND; 1181 } 1182 } 1183 1184 tq = _dispatch_get_root_queue(qos, overcommit); 1185 if (slowpath(!tq)) { 1186 DISPATCH_CLIENT_CRASH("Invalid queue attribute"); 1187 } 1188 } else { 1189 _dispatch_retain(tq); 1190 if (disallow_tq) { 1191 // TODO: override target queue's qos/overcommit ? 1192 DISPATCH_CLIENT_CRASH("Invalid combination of target queue & " 1193 "queue attribute"); 1194 } 1195 _dispatch_queue_priority_inherit_from_target(dq, tq); 1196 } 1197 _dispatch_queue_set_override_priority(dq); 1198 dq->do_targetq = tq; 1199 _dispatch_object_debug(dq, "%s", __func__); 1200 return _dispatch_introspection_queue_create(dq); 1201} 1202 1203dispatch_queue_t 1204dispatch_queue_create(const char *label, dispatch_queue_attr_t attr) 1205{ 1206 return dispatch_queue_create_with_target(label, attr, 1207 DISPATCH_TARGET_QUEUE_DEFAULT); 1208} 1209 1210void 1211_dispatch_queue_destroy(dispatch_object_t dou) 1212{ 1213 dispatch_queue_t dq = dou._dq; 1214 if (slowpath(dq == _dispatch_queue_get_current())) { 1215 DISPATCH_CRASH("Release of a queue by itself"); 1216 } 1217 if (slowpath(dq->dq_items_tail)) { 1218 DISPATCH_CRASH("Release of a queue while items are enqueued"); 1219 } 1220 1221 // trash the tail queue so that use after free will crash 1222 dq->dq_items_tail = (void *)0x200; 1223 1224 dispatch_queue_t dqsq = dispatch_atomic_xchg2o(dq, dq_specific_q, 1225 (void *)0x200, relaxed); 1226 if (dqsq) { 1227 _dispatch_release(dqsq); 1228 } 1229} 1230 1231// 6618342 Contact the team that owns the Instrument DTrace probe before 1232// renaming this symbol 1233void 1234_dispatch_queue_dispose(dispatch_queue_t dq) 1235{ 1236 _dispatch_object_debug(dq, "%s", __func__); 1237 _dispatch_introspection_queue_dispose(dq); 1238 if (dq->dq_label) { 1239 free((void*)dq->dq_label); 1240 } 1241 _dispatch_queue_destroy(dq); 1242} 1243 1244const char * 1245dispatch_queue_get_label(dispatch_queue_t dq) 1246{ 1247 if (slowpath(dq == DISPATCH_CURRENT_QUEUE_LABEL)) { 1248 dq = _dispatch_get_current_queue(); 1249 } 1250 return dq->dq_label ? dq->dq_label : ""; 1251} 1252 1253qos_class_t 1254dispatch_queue_get_qos_class(dispatch_queue_t dq, int *relative_priority_ptr) 1255{ 1256 qos_class_t qos = _DISPATCH_QOS_CLASS_UNSPECIFIED; 1257 int relative_priority = 0; 1258#if HAVE_PTHREAD_WORKQUEUE_QOS 1259 pthread_priority_t dqp = dq->dq_priority; 1260 if (dqp & _PTHREAD_PRIORITY_INHERIT_FLAG) dqp = 0; 1261 qos = _pthread_qos_class_decode(dqp, &relative_priority, NULL); 1262#else 1263 (void)dq; 1264#endif 1265 if (relative_priority_ptr) *relative_priority_ptr = relative_priority; 1266 return qos; 1267} 1268 1269static void 1270_dispatch_queue_set_width2(void *ctxt) 1271{ 1272 int w = (int)(intptr_t)ctxt; // intentional truncation 1273 uint32_t tmp; 1274 dispatch_queue_t dq = _dispatch_queue_get_current(); 1275 1276 if (w == 1 || w == 0) { 1277 dq->dq_width = 1; 1278 _dispatch_object_debug(dq, "%s", __func__); 1279 return; 1280 } 1281 if (w > 0) { 1282 tmp = (unsigned int)w; 1283 } else switch (w) { 1284 case DISPATCH_QUEUE_WIDTH_MAX_PHYSICAL_CPUS: 1285 tmp = dispatch_hw_config(physical_cpus); 1286 break; 1287 case DISPATCH_QUEUE_WIDTH_ACTIVE_CPUS: 1288 tmp = dispatch_hw_config(active_cpus); 1289 break; 1290 default: 1291 // fall through 1292 case DISPATCH_QUEUE_WIDTH_MAX_LOGICAL_CPUS: 1293 tmp = dispatch_hw_config(logical_cpus); 1294 break; 1295 } 1296 if (tmp > DISPATCH_QUEUE_WIDTH_MAX / 2) { 1297 tmp = DISPATCH_QUEUE_WIDTH_MAX / 2; 1298 } 1299 // multiply by two since the running count is inc/dec by two 1300 // (the low bit == barrier) 1301 dq->dq_width = (typeof(dq->dq_width))(tmp * 2); 1302 _dispatch_object_debug(dq, "%s", __func__); 1303} 1304 1305void 1306dispatch_queue_set_width(dispatch_queue_t dq, long width) 1307{ 1308 if (slowpath(dq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) || 1309 slowpath(dx_type(dq) == DISPATCH_QUEUE_ROOT_TYPE)) { 1310 return; 1311 } 1312 _dispatch_barrier_trysync_f(dq, (void*)(intptr_t)width, 1313 _dispatch_queue_set_width2); 1314} 1315 1316// 6618342 Contact the team that owns the Instrument DTrace probe before 1317// renaming this symbol 1318static void 1319_dispatch_set_target_queue2(void *ctxt) 1320{ 1321 dispatch_queue_t prev_dq, dq = _dispatch_queue_get_current(), tq = ctxt; 1322 mach_port_t th; 1323 1324 while (!dispatch_atomic_cmpxchgv2o(dq, dq_tqthread, MACH_PORT_NULL, 1325 _dispatch_thread_port(), &th, acquire)) { 1326 _dispatch_thread_switch(th, DISPATCH_YIELD_THREAD_SWITCH_OPTION, 1327 DISPATCH_CONTENTION_USLEEP_START); 1328 } 1329 _dispatch_queue_priority_inherit_from_target(dq, tq); 1330 prev_dq = dq->do_targetq; 1331 dq->do_targetq = tq; 1332 _dispatch_release(prev_dq); 1333 _dispatch_object_debug(dq, "%s", __func__); 1334 dispatch_atomic_store2o(dq, dq_tqthread, MACH_PORT_NULL, release); 1335} 1336 1337void 1338dispatch_set_target_queue(dispatch_object_t dou, dispatch_queue_t dq) 1339{ 1340 DISPATCH_OBJECT_TFB(_dispatch_objc_set_target_queue, dou, dq); 1341 if (slowpath(dou._do->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) || 1342 slowpath(dx_type(dou._do) == DISPATCH_QUEUE_ROOT_TYPE)) { 1343 return; 1344 } 1345 unsigned long type = dx_metatype(dou._do); 1346 if (slowpath(!dq)) { 1347 bool is_concurrent_q = (type == _DISPATCH_QUEUE_TYPE && 1348 slowpath(dou._dq->dq_width > 1)); 1349 dq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT, 1350 !is_concurrent_q); 1351 } 1352 // TODO: put into the vtable 1353 switch(type) { 1354 case _DISPATCH_QUEUE_TYPE: 1355 case _DISPATCH_SOURCE_TYPE: 1356 _dispatch_retain(dq); 1357 return _dispatch_barrier_trysync_f(dou._dq, dq, 1358 _dispatch_set_target_queue2); 1359 case _DISPATCH_IO_TYPE: 1360 return _dispatch_io_set_target_queue(dou._dchannel, dq); 1361 default: { 1362 dispatch_queue_t prev_dq; 1363 _dispatch_retain(dq); 1364 prev_dq = dispatch_atomic_xchg2o(dou._do, do_targetq, dq, release); 1365 if (prev_dq) _dispatch_release(prev_dq); 1366 _dispatch_object_debug(dou._do, "%s", __func__); 1367 return; 1368 } 1369 } 1370} 1371 1372#pragma mark - 1373#pragma mark dispatch_pthread_root_queue 1374 1375#if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES 1376static struct dispatch_pthread_root_queue_context_s 1377 _dispatch_mgr_root_queue_pthread_context; 1378static struct dispatch_root_queue_context_s 1379 _dispatch_mgr_root_queue_context = {{{ 1380#if HAVE_PTHREAD_WORKQUEUES 1381 .dgq_kworkqueue = (void*)(~0ul), 1382#endif 1383 .dgq_ctxt = &_dispatch_mgr_root_queue_pthread_context, 1384 .dgq_thread_pool_size = 1, 1385}}}; 1386static struct dispatch_queue_s _dispatch_mgr_root_queue = { 1387 .do_vtable = DISPATCH_VTABLE(queue_root), 1388 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 1389 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 1390 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK, 1391 .do_ctxt = &_dispatch_mgr_root_queue_context, 1392 .dq_label = "com.apple.root.libdispatch-manager", 1393 .dq_running = 2, 1394 .dq_width = DISPATCH_QUEUE_WIDTH_MAX, 1395 .dq_serialnum = 3, 1396}; 1397static struct { 1398 volatile int prio; 1399 int default_prio; 1400 int policy; 1401 pthread_t tid; 1402} _dispatch_mgr_sched; 1403static dispatch_once_t _dispatch_mgr_sched_pred; 1404 1405static void 1406_dispatch_mgr_sched_init(void *ctxt DISPATCH_UNUSED) 1407{ 1408 struct sched_param param; 1409 pthread_attr_t *attr; 1410 attr = &_dispatch_mgr_root_queue_pthread_context.dpq_thread_attr; 1411 (void)dispatch_assume_zero(pthread_attr_init(attr)); 1412 (void)dispatch_assume_zero(pthread_attr_getschedpolicy(attr, 1413 &_dispatch_mgr_sched.policy)); 1414 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr, ¶m)); 1415 // legacy priority calls allowed when requesting above default priority 1416 _dispatch_mgr_sched.default_prio = param.sched_priority; 1417 _dispatch_mgr_sched.prio = _dispatch_mgr_sched.default_prio; 1418} 1419 1420DISPATCH_NOINLINE 1421static pthread_t * 1422_dispatch_mgr_root_queue_init(void) 1423{ 1424 dispatch_once_f(&_dispatch_mgr_sched_pred, NULL, _dispatch_mgr_sched_init); 1425 struct sched_param param; 1426 pthread_attr_t *attr; 1427 attr = &_dispatch_mgr_root_queue_pthread_context.dpq_thread_attr; 1428 (void)dispatch_assume_zero(pthread_attr_setdetachstate(attr, 1429 PTHREAD_CREATE_DETACHED)); 1430#if !DISPATCH_DEBUG 1431 (void)dispatch_assume_zero(pthread_attr_setstacksize(attr, 64 * 1024)); 1432#endif 1433#if HAVE_PTHREAD_WORKQUEUE_QOS 1434 if (_dispatch_set_qos_class_enabled) { 1435 qos_class_t qos = qos_class_main(); 1436 (void)dispatch_assume_zero(pthread_attr_set_qos_class_np(attr, qos, 0)); 1437 _dispatch_mgr_q.dq_priority = _pthread_qos_class_encode(qos, 0, 0); 1438 _dispatch_queue_set_override_priority(&_dispatch_mgr_q); 1439 } 1440#endif 1441 param.sched_priority = _dispatch_mgr_sched.prio; 1442 if (param.sched_priority > _dispatch_mgr_sched.default_prio) { 1443 (void)dispatch_assume_zero(pthread_attr_setschedparam(attr, ¶m)); 1444 } 1445 return &_dispatch_mgr_sched.tid; 1446} 1447 1448static inline void 1449_dispatch_mgr_priority_apply(void) 1450{ 1451 struct sched_param param; 1452 do { 1453 param.sched_priority = _dispatch_mgr_sched.prio; 1454 if (param.sched_priority > _dispatch_mgr_sched.default_prio) { 1455 (void)dispatch_assume_zero(pthread_setschedparam( 1456 _dispatch_mgr_sched.tid, _dispatch_mgr_sched.policy, 1457 ¶m)); 1458 } 1459 } while (_dispatch_mgr_sched.prio > param.sched_priority); 1460} 1461 1462DISPATCH_NOINLINE 1463void 1464_dispatch_mgr_priority_init(void) 1465{ 1466 struct sched_param param; 1467 pthread_attr_t *attr; 1468 attr = &_dispatch_mgr_root_queue_pthread_context.dpq_thread_attr; 1469 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr, ¶m)); 1470 if (slowpath(_dispatch_mgr_sched.prio > param.sched_priority)) { 1471 return _dispatch_mgr_priority_apply(); 1472 } 1473} 1474 1475DISPATCH_NOINLINE 1476static void 1477_dispatch_mgr_priority_raise(const pthread_attr_t *attr) 1478{ 1479 dispatch_once_f(&_dispatch_mgr_sched_pred, NULL, _dispatch_mgr_sched_init); 1480 struct sched_param param; 1481 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr, ¶m)); 1482 int p = _dispatch_mgr_sched.prio; 1483 do if (p >= param.sched_priority) { 1484 return; 1485 } while (slowpath(!dispatch_atomic_cmpxchgvw2o(&_dispatch_mgr_sched, prio, 1486 p, param.sched_priority, &p, relaxed))); 1487 if (_dispatch_mgr_sched.tid) { 1488 return _dispatch_mgr_priority_apply(); 1489 } 1490} 1491 1492dispatch_queue_t 1493dispatch_pthread_root_queue_create(const char *label, unsigned long flags, 1494 const pthread_attr_t *attr, dispatch_block_t configure) 1495{ 1496 dispatch_queue_t dq; 1497 dispatch_root_queue_context_t qc; 1498 dispatch_pthread_root_queue_context_t pqc; 1499 size_t dqs; 1500 uint8_t pool_size = flags & _DISPATCH_PTHREAD_ROOT_QUEUE_FLAG_POOL_SIZE ? 1501 (uint8_t)(flags & ~_DISPATCH_PTHREAD_ROOT_QUEUE_FLAG_POOL_SIZE) : 0; 1502 1503 dqs = sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD; 1504 dq = _dispatch_alloc(DISPATCH_VTABLE(queue_root), dqs + 1505 sizeof(struct dispatch_root_queue_context_s) + 1506 sizeof(struct dispatch_pthread_root_queue_context_s)); 1507 qc = (void*)dq + dqs; 1508 pqc = (void*)qc + sizeof(struct dispatch_root_queue_context_s); 1509 1510 _dispatch_queue_init(dq); 1511 if (label) { 1512 dq->dq_label = strdup(label); 1513 } 1514 1515 dq->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK; 1516 dq->do_ctxt = qc; 1517 dq->do_targetq = NULL; 1518 dq->dq_running = 2; 1519 dq->dq_width = DISPATCH_QUEUE_WIDTH_MAX; 1520 1521 pqc->dpq_thread_mediator.do_vtable = DISPATCH_VTABLE(semaphore); 1522 qc->dgq_ctxt = pqc; 1523#if HAVE_PTHREAD_WORKQUEUES 1524 qc->dgq_kworkqueue = (void*)(~0ul); 1525#endif 1526 _dispatch_root_queue_init_pthread_pool(qc, pool_size, true); 1527 1528 if (attr) { 1529 memcpy(&pqc->dpq_thread_attr, attr, sizeof(pthread_attr_t)); 1530#if HAVE_PTHREAD_WORKQUEUE_QOS 1531 qos_class_t qos = 0; 1532 if (!pthread_attr_get_qos_class_np(&pqc->dpq_thread_attr, &qos, NULL) 1533 && qos > _DISPATCH_QOS_CLASS_DEFAULT) { 1534 DISPATCH_CLIENT_CRASH("pthread root queues do not support " 1535 "explicit QoS attributes"); 1536 } 1537#endif 1538 _dispatch_mgr_priority_raise(&pqc->dpq_thread_attr); 1539 } else { 1540 (void)dispatch_assume_zero(pthread_attr_init(&pqc->dpq_thread_attr)); 1541 } 1542 (void)dispatch_assume_zero(pthread_attr_setdetachstate( 1543 &pqc->dpq_thread_attr, PTHREAD_CREATE_DETACHED)); 1544 if (configure) { 1545 pqc->dpq_thread_configure = _dispatch_Block_copy(configure); 1546 } 1547 _dispatch_object_debug(dq, "%s", __func__); 1548 return _dispatch_introspection_queue_create(dq); 1549} 1550#endif 1551 1552void 1553_dispatch_pthread_root_queue_dispose(dispatch_queue_t dq) 1554{ 1555 if (slowpath(dq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) { 1556 DISPATCH_CRASH("Global root queue disposed"); 1557 } 1558 _dispatch_object_debug(dq, "%s", __func__); 1559 _dispatch_introspection_queue_dispose(dq); 1560#if DISPATCH_USE_PTHREAD_POOL 1561 dispatch_root_queue_context_t qc = dq->do_ctxt; 1562 dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt; 1563 1564 pthread_attr_destroy(&pqc->dpq_thread_attr); 1565 _dispatch_semaphore_dispose(&pqc->dpq_thread_mediator); 1566 if (pqc->dpq_thread_configure) { 1567 Block_release(pqc->dpq_thread_configure); 1568 } 1569 dq->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT, 1570 false); 1571#endif 1572 if (dq->dq_label) { 1573 free((void*)dq->dq_label); 1574 } 1575 _dispatch_queue_destroy(dq); 1576} 1577 1578#pragma mark - 1579#pragma mark dispatch_queue_specific 1580 1581struct dispatch_queue_specific_queue_s { 1582 DISPATCH_STRUCT_HEADER(queue_specific_queue); 1583 DISPATCH_QUEUE_HEADER; 1584 TAILQ_HEAD(dispatch_queue_specific_head_s, 1585 dispatch_queue_specific_s) dqsq_contexts; 1586}; 1587 1588struct dispatch_queue_specific_s { 1589 const void *dqs_key; 1590 void *dqs_ctxt; 1591 dispatch_function_t dqs_destructor; 1592 TAILQ_ENTRY(dispatch_queue_specific_s) dqs_list; 1593}; 1594DISPATCH_DECL(dispatch_queue_specific); 1595 1596void 1597_dispatch_queue_specific_queue_dispose(dispatch_queue_specific_queue_t dqsq) 1598{ 1599 dispatch_queue_specific_t dqs, tmp; 1600 1601 TAILQ_FOREACH_SAFE(dqs, &dqsq->dqsq_contexts, dqs_list, tmp) { 1602 if (dqs->dqs_destructor) { 1603 dispatch_async_f(_dispatch_get_root_queue( 1604 _DISPATCH_QOS_CLASS_DEFAULT, false), dqs->dqs_ctxt, 1605 dqs->dqs_destructor); 1606 } 1607 free(dqs); 1608 } 1609 _dispatch_queue_destroy((dispatch_queue_t)dqsq); 1610} 1611 1612static void 1613_dispatch_queue_init_specific(dispatch_queue_t dq) 1614{ 1615 dispatch_queue_specific_queue_t dqsq; 1616 1617 dqsq = _dispatch_alloc(DISPATCH_VTABLE(queue_specific_queue), 1618 sizeof(struct dispatch_queue_specific_queue_s)); 1619 _dispatch_queue_init((dispatch_queue_t)dqsq); 1620 dqsq->do_xref_cnt = -1; 1621 dqsq->do_targetq = _dispatch_get_root_queue( 1622 _DISPATCH_QOS_CLASS_USER_INITIATED, true); 1623 dqsq->dq_width = DISPATCH_QUEUE_WIDTH_MAX; 1624 dqsq->dq_label = "queue-specific"; 1625 TAILQ_INIT(&dqsq->dqsq_contexts); 1626 if (slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_specific_q, NULL, 1627 (dispatch_queue_t)dqsq, release))) { 1628 _dispatch_release((dispatch_queue_t)dqsq); 1629 } 1630} 1631 1632static void 1633_dispatch_queue_set_specific(void *ctxt) 1634{ 1635 dispatch_queue_specific_t dqs, dqsn = ctxt; 1636 dispatch_queue_specific_queue_t dqsq = 1637 (dispatch_queue_specific_queue_t)_dispatch_queue_get_current(); 1638 1639 TAILQ_FOREACH(dqs, &dqsq->dqsq_contexts, dqs_list) { 1640 if (dqs->dqs_key == dqsn->dqs_key) { 1641 // Destroy previous context for existing key 1642 if (dqs->dqs_destructor) { 1643 dispatch_async_f(_dispatch_get_root_queue( 1644 _DISPATCH_QOS_CLASS_DEFAULT, false), dqs->dqs_ctxt, 1645 dqs->dqs_destructor); 1646 } 1647 if (dqsn->dqs_ctxt) { 1648 // Copy new context for existing key 1649 dqs->dqs_ctxt = dqsn->dqs_ctxt; 1650 dqs->dqs_destructor = dqsn->dqs_destructor; 1651 } else { 1652 // Remove context storage for existing key 1653 TAILQ_REMOVE(&dqsq->dqsq_contexts, dqs, dqs_list); 1654 free(dqs); 1655 } 1656 return free(dqsn); 1657 } 1658 } 1659 // Insert context storage for new key 1660 TAILQ_INSERT_TAIL(&dqsq->dqsq_contexts, dqsn, dqs_list); 1661} 1662 1663DISPATCH_NOINLINE 1664void 1665dispatch_queue_set_specific(dispatch_queue_t dq, const void *key, 1666 void *ctxt, dispatch_function_t destructor) 1667{ 1668 if (slowpath(!key)) { 1669 return; 1670 } 1671 dispatch_queue_specific_t dqs; 1672 1673 dqs = _dispatch_calloc(1, sizeof(struct dispatch_queue_specific_s)); 1674 dqs->dqs_key = key; 1675 dqs->dqs_ctxt = ctxt; 1676 dqs->dqs_destructor = destructor; 1677 if (slowpath(!dq->dq_specific_q)) { 1678 _dispatch_queue_init_specific(dq); 1679 } 1680 _dispatch_barrier_trysync_f(dq->dq_specific_q, dqs, 1681 _dispatch_queue_set_specific); 1682} 1683 1684static void 1685_dispatch_queue_get_specific(void *ctxt) 1686{ 1687 void **ctxtp = ctxt; 1688 void *key = *ctxtp; 1689 dispatch_queue_specific_queue_t dqsq = 1690 (dispatch_queue_specific_queue_t)_dispatch_queue_get_current(); 1691 dispatch_queue_specific_t dqs; 1692 1693 TAILQ_FOREACH(dqs, &dqsq->dqsq_contexts, dqs_list) { 1694 if (dqs->dqs_key == key) { 1695 *ctxtp = dqs->dqs_ctxt; 1696 return; 1697 } 1698 } 1699 *ctxtp = NULL; 1700} 1701 1702DISPATCH_NOINLINE 1703void * 1704dispatch_queue_get_specific(dispatch_queue_t dq, const void *key) 1705{ 1706 if (slowpath(!key)) { 1707 return NULL; 1708 } 1709 void *ctxt = NULL; 1710 1711 if (fastpath(dq->dq_specific_q)) { 1712 ctxt = (void *)key; 1713 dispatch_sync_f(dq->dq_specific_q, &ctxt, _dispatch_queue_get_specific); 1714 } 1715 return ctxt; 1716} 1717 1718DISPATCH_NOINLINE 1719void * 1720dispatch_get_specific(const void *key) 1721{ 1722 if (slowpath(!key)) { 1723 return NULL; 1724 } 1725 void *ctxt = NULL; 1726 dispatch_queue_t dq = _dispatch_queue_get_current(); 1727 1728 while (slowpath(dq)) { 1729 if (slowpath(dq->dq_specific_q)) { 1730 ctxt = (void *)key; 1731 dispatch_sync_f(dq->dq_specific_q, &ctxt, 1732 _dispatch_queue_get_specific); 1733 if (ctxt) break; 1734 } 1735 dq = dq->do_targetq; 1736 } 1737 return ctxt; 1738} 1739 1740#pragma mark - 1741#pragma mark dispatch_queue_debug 1742 1743size_t 1744_dispatch_queue_debug_attr(dispatch_queue_t dq, char* buf, size_t bufsiz) 1745{ 1746 size_t offset = 0; 1747 dispatch_queue_t target = dq->do_targetq; 1748 offset += dsnprintf(buf, bufsiz, "target = %s[%p], width = 0x%x, " 1749 "running = 0x%x, barrier = %d ", target && target->dq_label ? 1750 target->dq_label : "", target, dq->dq_width / 2, 1751 dq->dq_running / 2, dq->dq_running & 1); 1752 if (dq->dq_is_thread_bound) { 1753 offset += dsnprintf(buf, bufsiz, ", thread = 0x%x ", 1754 _dispatch_queue_get_bound_thread(dq)); 1755 } 1756 return offset; 1757} 1758 1759size_t 1760dispatch_queue_debug(dispatch_queue_t dq, char* buf, size_t bufsiz) 1761{ 1762 size_t offset = 0; 1763 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ", 1764 dq->dq_label ? dq->dq_label : dx_kind(dq), dq); 1765 offset += _dispatch_object_debug_attr(dq, &buf[offset], bufsiz - offset); 1766 offset += _dispatch_queue_debug_attr(dq, &buf[offset], bufsiz - offset); 1767 offset += dsnprintf(&buf[offset], bufsiz - offset, "}"); 1768 return offset; 1769} 1770 1771#if DISPATCH_DEBUG 1772void 1773dispatch_debug_queue(dispatch_queue_t dq, const char* str) { 1774 if (fastpath(dq)) { 1775 _dispatch_object_debug(dq, "%s", str); 1776 } else { 1777 _dispatch_log("queue[NULL]: %s", str); 1778 } 1779} 1780#endif 1781 1782#if DISPATCH_PERF_MON && !DISPATCH_INTROSPECTION 1783static OSSpinLock _dispatch_stats_lock; 1784static struct { 1785 uint64_t time_total; 1786 uint64_t count_total; 1787 uint64_t thread_total; 1788} _dispatch_stats[65]; // ffs*/fls*() returns zero when no bits are set 1789 1790static void 1791_dispatch_queue_merge_stats(uint64_t start) 1792{ 1793 uint64_t delta = _dispatch_absolute_time() - start; 1794 unsigned long count; 1795 1796 count = (unsigned long)_dispatch_thread_getspecific(dispatch_bcounter_key); 1797 _dispatch_thread_setspecific(dispatch_bcounter_key, NULL); 1798 1799 int bucket = flsl((long)count); 1800 1801 // 64-bit counters on 32-bit require a lock or a queue 1802 OSSpinLockLock(&_dispatch_stats_lock); 1803 1804 _dispatch_stats[bucket].time_total += delta; 1805 _dispatch_stats[bucket].count_total += count; 1806 _dispatch_stats[bucket].thread_total++; 1807 1808 OSSpinLockUnlock(&_dispatch_stats_lock); 1809} 1810#endif 1811 1812#pragma mark - 1813#pragma mark dispatch_continuation_t 1814 1815static void 1816_dispatch_force_cache_cleanup(void) 1817{ 1818 dispatch_continuation_t dc; 1819 dc = _dispatch_thread_getspecific(dispatch_cache_key); 1820 if (dc) { 1821 _dispatch_thread_setspecific(dispatch_cache_key, NULL); 1822 _dispatch_cache_cleanup(dc); 1823 } 1824} 1825 1826DISPATCH_NOINLINE 1827static void 1828_dispatch_cache_cleanup(void *value) 1829{ 1830 dispatch_continuation_t dc, next_dc = value; 1831 1832 while ((dc = next_dc)) { 1833 next_dc = dc->do_next; 1834 _dispatch_continuation_free_to_heap(dc); 1835 } 1836} 1837 1838#if DISPATCH_USE_MEMORYSTATUS_SOURCE 1839int _dispatch_continuation_cache_limit = DISPATCH_CONTINUATION_CACHE_LIMIT; 1840 1841DISPATCH_NOINLINE 1842void 1843_dispatch_continuation_free_to_cache_limit(dispatch_continuation_t dc) 1844{ 1845 _dispatch_continuation_free_to_heap(dc); 1846 dispatch_continuation_t next_dc; 1847 dc = _dispatch_thread_getspecific(dispatch_cache_key); 1848 int cnt; 1849 if (!dc || (cnt = dc->dc_cache_cnt - 1850 _dispatch_continuation_cache_limit) <= 0){ 1851 return; 1852 } 1853 do { 1854 next_dc = dc->do_next; 1855 _dispatch_continuation_free_to_heap(dc); 1856 } while (--cnt && (dc = next_dc)); 1857 _dispatch_thread_setspecific(dispatch_cache_key, next_dc); 1858} 1859#endif 1860 1861DISPATCH_ALWAYS_INLINE_NDEBUG 1862static inline void 1863_dispatch_continuation_redirect(dispatch_queue_t dq, dispatch_object_t dou) 1864{ 1865 dispatch_continuation_t dc = dou._dc; 1866 1867 (void)dispatch_atomic_add2o(dq, dq_running, 2, acquire); 1868 if (!DISPATCH_OBJ_IS_VTABLE(dc) && 1869 (long)dc->do_vtable & DISPATCH_OBJ_SYNC_SLOW_BIT) { 1870 _dispatch_trace_continuation_pop(dq, dou); 1871 _dispatch_thread_semaphore_signal( 1872 (_dispatch_thread_semaphore_t)dc->dc_other); 1873 _dispatch_introspection_queue_item_complete(dou); 1874 } else { 1875 _dispatch_async_f_redirect(dq, dc, 1876 _dispatch_queue_get_override_priority(dq)); 1877 } 1878 _dispatch_perfmon_workitem_inc(); 1879} 1880 1881#pragma mark - 1882#pragma mark dispatch_block_create 1883 1884#if __BLOCKS__ 1885 1886DISPATCH_ALWAYS_INLINE 1887static inline bool 1888_dispatch_block_flags_valid(dispatch_block_flags_t flags) 1889{ 1890 return ((flags & ~DISPATCH_BLOCK_API_MASK) == 0); 1891} 1892 1893DISPATCH_ALWAYS_INLINE 1894static inline dispatch_block_flags_t 1895_dispatch_block_normalize_flags(dispatch_block_flags_t flags) 1896{ 1897 if (flags & (DISPATCH_BLOCK_NO_VOUCHER|DISPATCH_BLOCK_DETACHED)) { 1898 flags |= DISPATCH_BLOCK_HAS_VOUCHER; 1899 } 1900 if (flags & (DISPATCH_BLOCK_NO_QOS_CLASS|DISPATCH_BLOCK_DETACHED)) { 1901 flags |= DISPATCH_BLOCK_HAS_PRIORITY; 1902 } 1903 return flags; 1904} 1905 1906static inline dispatch_block_t 1907_dispatch_block_create_with_voucher_and_priority(dispatch_block_flags_t flags, 1908 voucher_t voucher, pthread_priority_t pri, dispatch_block_t block) 1909{ 1910 flags = _dispatch_block_normalize_flags(flags); 1911 voucher_t cv = NULL; 1912 bool assign = (flags & DISPATCH_BLOCK_ASSIGN_CURRENT); 1913 if (assign && !(flags & DISPATCH_BLOCK_HAS_VOUCHER)) { 1914 voucher = cv = voucher_copy(); 1915 flags |= DISPATCH_BLOCK_HAS_VOUCHER; 1916 } 1917 if (assign && !(flags & DISPATCH_BLOCK_HAS_PRIORITY)) { 1918 pri = _dispatch_priority_propagate(); 1919 flags |= DISPATCH_BLOCK_HAS_PRIORITY; 1920 } 1921 dispatch_block_t db = _dispatch_block_create(flags, voucher, pri, block); 1922 if (cv) _voucher_release(cv); 1923#if DISPATCH_DEBUG 1924 dispatch_assert(_dispatch_block_get_data(db)); 1925#endif 1926 return db; 1927} 1928 1929dispatch_block_t 1930dispatch_block_create(dispatch_block_flags_t flags, dispatch_block_t block) 1931{ 1932 if (!_dispatch_block_flags_valid(flags)) return NULL; 1933 return _dispatch_block_create_with_voucher_and_priority(flags, NULL, 0, 1934 block); 1935} 1936 1937dispatch_block_t 1938dispatch_block_create_with_qos_class(dispatch_block_flags_t flags, 1939 dispatch_qos_class_t qos_class, int relative_priority, 1940 dispatch_block_t block) 1941{ 1942 if (!_dispatch_block_flags_valid(flags)) return NULL; 1943 if (!_dispatch_qos_class_valid(qos_class, relative_priority)) return NULL; 1944 flags |= DISPATCH_BLOCK_HAS_PRIORITY; 1945 pthread_priority_t pri = 0; 1946#if HAVE_PTHREAD_WORKQUEUE_QOS 1947 pri = _pthread_qos_class_encode(qos_class, relative_priority, 0); 1948#endif 1949 return _dispatch_block_create_with_voucher_and_priority(flags, NULL, 1950 pri, block); 1951} 1952 1953dispatch_block_t 1954dispatch_block_create_with_voucher(dispatch_block_flags_t flags, 1955 voucher_t voucher, dispatch_block_t block) 1956{ 1957 if (!_dispatch_block_flags_valid(flags)) return NULL; 1958 flags |= DISPATCH_BLOCK_HAS_VOUCHER; 1959 return _dispatch_block_create_with_voucher_and_priority(flags, voucher, 0, 1960 block); 1961} 1962 1963dispatch_block_t 1964dispatch_block_create_with_voucher_and_qos_class(dispatch_block_flags_t flags, 1965 voucher_t voucher, dispatch_qos_class_t qos_class, 1966 int relative_priority, dispatch_block_t block) 1967{ 1968 if (!_dispatch_block_flags_valid(flags)) return NULL; 1969 if (!_dispatch_qos_class_valid(qos_class, relative_priority)) return NULL; 1970 flags |= (DISPATCH_BLOCK_HAS_VOUCHER|DISPATCH_BLOCK_HAS_PRIORITY); 1971 pthread_priority_t pri = 0; 1972#if HAVE_PTHREAD_WORKQUEUE_QOS 1973 pri = _pthread_qos_class_encode(qos_class, relative_priority, 0); 1974#endif 1975 return _dispatch_block_create_with_voucher_and_priority(flags, voucher, 1976 pri, block); 1977} 1978 1979void 1980dispatch_block_perform(dispatch_block_flags_t flags, dispatch_block_t block) 1981{ 1982 if (!_dispatch_block_flags_valid(flags)) { 1983 DISPATCH_CLIENT_CRASH("Invalid flags passed to " 1984 "dispatch_block_perform()"); 1985 } 1986 flags = _dispatch_block_normalize_flags(flags); 1987 struct dispatch_block_private_data_s dbpds = 1988 DISPATCH_BLOCK_PRIVATE_DATA_INITIALIZER(flags, NULL, 0, block); 1989 dbpds.dbpd_atomic_flags |= DBF_PERFORM; // no group_leave at end of invoke 1990 return _dispatch_block_invoke(&dbpds); 1991} 1992 1993#define _dbpd_group(dbpd) ((dispatch_group_t)&(dbpd)->dbpd_group) 1994 1995void 1996_dispatch_block_invoke(const struct dispatch_block_private_data_s *dbcpd) 1997{ 1998 dispatch_block_private_data_t dbpd = (dispatch_block_private_data_t)dbcpd; 1999 dispatch_block_flags_t flags = dbpd->dbpd_flags; 2000 unsigned int atomic_flags = dbpd->dbpd_atomic_flags; 2001 if (slowpath(atomic_flags & DBF_WAITED)) { 2002 DISPATCH_CLIENT_CRASH("A block object may not be both run more " 2003 "than once and waited for"); 2004 } 2005 if (atomic_flags & DBF_CANCELED) goto out; 2006 2007 pthread_priority_t op = DISPATCH_NO_PRIORITY, p = DISPATCH_NO_PRIORITY; 2008 unsigned long override = 0; 2009 if (flags & DISPATCH_BLOCK_HAS_PRIORITY) { 2010 op = _dispatch_get_priority(); 2011 p = dbpd->dbpd_priority; 2012 override = (flags & DISPATCH_BLOCK_ENFORCE_QOS_CLASS) || 2013 !(flags & DISPATCH_BLOCK_INHERIT_QOS_CLASS) ? 2014 DISPATCH_PRIORITY_ENFORCE : 0; 2015 } 2016 voucher_t ov, v = DISPATCH_NO_VOUCHER; 2017 if (flags & DISPATCH_BLOCK_HAS_VOUCHER) { 2018 v = dbpd->dbpd_voucher; 2019 if (v) _voucher_retain(v); 2020 } 2021 ov = _dispatch_adopt_priority_and_voucher(p, v, override); 2022 dbpd->dbpd_thread = _dispatch_thread_port(); 2023 dbpd->dbpd_block(); 2024 _dispatch_set_priority_and_replace_voucher(op, ov); 2025out: 2026 if ((atomic_flags & DBF_PERFORM) == 0) { 2027 if (dispatch_atomic_inc2o(dbpd, dbpd_performed, acquire) == 1) { 2028 dispatch_group_leave(_dbpd_group(dbpd)); 2029 } 2030 } 2031} 2032 2033static void 2034_dispatch_block_sync_invoke(void *block) 2035{ 2036 dispatch_block_t b = block; 2037 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(b); 2038 dispatch_block_flags_t flags = dbpd->dbpd_flags; 2039 unsigned int atomic_flags = dbpd->dbpd_atomic_flags; 2040 if (slowpath(atomic_flags & DBF_WAITED)) { 2041 DISPATCH_CLIENT_CRASH("A block object may not be both run more " 2042 "than once and waited for"); 2043 } 2044 if (atomic_flags & DBF_CANCELED) goto out; 2045 2046 pthread_priority_t op = DISPATCH_NO_PRIORITY, p = DISPATCH_NO_PRIORITY; 2047 unsigned long override = 0; 2048 if (flags & DISPATCH_BLOCK_HAS_PRIORITY) { 2049 op = _dispatch_get_priority(); 2050 p = dbpd->dbpd_priority; 2051 override = (flags & DISPATCH_BLOCK_ENFORCE_QOS_CLASS) || 2052 !(flags & DISPATCH_BLOCK_INHERIT_QOS_CLASS) ? 2053 DISPATCH_PRIORITY_ENFORCE : 0; 2054 } 2055 voucher_t ov, v = DISPATCH_NO_VOUCHER; 2056 if (flags & DISPATCH_BLOCK_HAS_VOUCHER) { 2057 v = dbpd->dbpd_voucher; 2058 if (v) _voucher_retain(v); 2059 } 2060 ov = _dispatch_adopt_priority_and_voucher(p, v, override); 2061 dbpd->dbpd_block(); 2062 _dispatch_set_priority_and_replace_voucher(op, ov); 2063out: 2064 if ((atomic_flags & DBF_PERFORM) == 0) { 2065 if (dispatch_atomic_inc2o(dbpd, dbpd_performed, acquire) == 1) { 2066 dispatch_group_leave(_dbpd_group(dbpd)); 2067 } 2068 } 2069 2070 dispatch_queue_t dq = _dispatch_queue_get_current(); 2071 if (dispatch_atomic_cmpxchg2o(dbpd, dbpd_queue, dq, NULL, acquire)) { 2072 // balances dispatch_{,barrier_,}sync 2073 _dispatch_release(dq); 2074 } 2075} 2076 2077static void 2078_dispatch_block_async_invoke_and_release(void *block) 2079{ 2080 dispatch_block_t b = block; 2081 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(b); 2082 dispatch_block_flags_t flags = dbpd->dbpd_flags; 2083 unsigned int atomic_flags = dbpd->dbpd_atomic_flags; 2084 if (slowpath(atomic_flags & DBF_WAITED)) { 2085 DISPATCH_CLIENT_CRASH("A block object may not be both run more " 2086 "than once and waited for"); 2087 } 2088 if (atomic_flags & DBF_CANCELED) goto out; 2089 2090 pthread_priority_t p = DISPATCH_NO_PRIORITY; 2091 unsigned long override = 0; 2092 if (flags & DISPATCH_BLOCK_HAS_PRIORITY) { 2093 override = (flags & DISPATCH_BLOCK_ENFORCE_QOS_CLASS) ? 2094 DISPATCH_PRIORITY_ENFORCE : 0; 2095 p = dbpd->dbpd_priority; 2096 } 2097 voucher_t v = DISPATCH_NO_VOUCHER; 2098 if (flags & DISPATCH_BLOCK_HAS_VOUCHER) { 2099 v = dbpd->dbpd_voucher; 2100 if (v) _voucher_retain(v); 2101 } 2102 _dispatch_adopt_priority_and_replace_voucher(p, v, override); 2103 dbpd->dbpd_block(); 2104out: 2105 if ((atomic_flags & DBF_PERFORM) == 0) { 2106 if (dispatch_atomic_inc2o(dbpd, dbpd_performed, acquire) == 1) { 2107 dispatch_group_leave(_dbpd_group(dbpd)); 2108 } 2109 } 2110 dispatch_queue_t dq = _dispatch_queue_get_current(); 2111 if (dispatch_atomic_cmpxchg2o(dbpd, dbpd_queue, dq, NULL, acquire)) { 2112 // balances dispatch_{,barrier_,group_}async 2113 _dispatch_release(dq); 2114 } 2115 Block_release(b); 2116} 2117 2118void 2119dispatch_block_cancel(dispatch_block_t db) 2120{ 2121 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(db); 2122 if (!dbpd) { 2123 DISPATCH_CLIENT_CRASH("Invalid block object passed to " 2124 "dispatch_block_cancel()"); 2125 } 2126 (void)dispatch_atomic_or2o(dbpd, dbpd_atomic_flags, DBF_CANCELED, relaxed); 2127} 2128 2129long 2130dispatch_block_testcancel(dispatch_block_t db) 2131{ 2132 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(db); 2133 if (!dbpd) { 2134 DISPATCH_CLIENT_CRASH("Invalid block object passed to " 2135 "dispatch_block_testcancel()"); 2136 } 2137 return (bool)(dbpd->dbpd_atomic_flags & DBF_CANCELED); 2138} 2139 2140long 2141dispatch_block_wait(dispatch_block_t db, dispatch_time_t timeout) 2142{ 2143 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(db); 2144 if (!dbpd) { 2145 DISPATCH_CLIENT_CRASH("Invalid block object passed to " 2146 "dispatch_block_wait()"); 2147 } 2148 2149 unsigned int flags = dispatch_atomic_or_orig2o(dbpd, dbpd_atomic_flags, 2150 DBF_WAITING, relaxed); 2151 if (slowpath(flags & (DBF_WAITED | DBF_WAITING))) { 2152 DISPATCH_CLIENT_CRASH("A block object may not be waited for " 2153 "more than once"); 2154 } 2155 2156 // <rdar://problem/17703192> If we know the queue where this block is 2157 // enqueued, or the thread that's executing it, then we should boost 2158 // it here. 2159 2160 pthread_priority_t pp = _dispatch_get_priority(); 2161 2162 dispatch_queue_t boost_dq; 2163 boost_dq = dispatch_atomic_xchg2o(dbpd, dbpd_queue, NULL, acquire); 2164 if (boost_dq) { 2165 // release balances dispatch_{,barrier_,group_}async. 2166 // Can't put the queue back in the timeout case: the block might 2167 // finish after we fell out of group_wait and see our NULL, so 2168 // neither of us would ever release. Side effect: After a _wait 2169 // that times out, subsequent waits will not boost the qos of the 2170 // still-running block. 2171 _dispatch_queue_wakeup_with_qos_and_release(boost_dq, pp); 2172 } 2173 2174 mach_port_t boost_th = dbpd->dbpd_thread; 2175 if (boost_th) { 2176 _dispatch_thread_override_start(boost_th, pp); 2177 } 2178 2179 int performed = dispatch_atomic_load2o(dbpd, dbpd_performed, relaxed); 2180 if (slowpath(performed > 1 || (boost_th && boost_dq))) { 2181 DISPATCH_CLIENT_CRASH("A block object may not be both run more " 2182 "than once and waited for"); 2183 } 2184 2185 long ret = dispatch_group_wait(_dbpd_group(dbpd), timeout); 2186 2187 if (boost_th) { 2188 _dispatch_thread_override_end(boost_th); 2189 } 2190 2191 if (ret) { 2192 // timed out: reverse our changes 2193 (void)dispatch_atomic_and2o(dbpd, dbpd_atomic_flags, 2194 ~DBF_WAITING, relaxed); 2195 } else { 2196 (void)dispatch_atomic_or2o(dbpd, dbpd_atomic_flags, 2197 DBF_WAITED, relaxed); 2198 // don't need to re-test here: the second call would see 2199 // the first call's WAITING 2200 } 2201 2202 return ret; 2203} 2204 2205void 2206dispatch_block_notify(dispatch_block_t db, dispatch_queue_t queue, 2207 dispatch_block_t notification_block) 2208{ 2209 dispatch_block_private_data_t dbpd = _dispatch_block_get_data(db); 2210 if (!dbpd) { 2211 DISPATCH_CLIENT_CRASH("Invalid block object passed to " 2212 "dispatch_block_notify()"); 2213 } 2214 int performed = dispatch_atomic_load2o(dbpd, dbpd_performed, relaxed); 2215 if (slowpath(performed > 1)) { 2216 DISPATCH_CLIENT_CRASH("A block object may not be both run more " 2217 "than once and observed"); 2218 } 2219 2220 return dispatch_group_notify(_dbpd_group(dbpd), queue, notification_block); 2221} 2222 2223#endif // __BLOCKS__ 2224 2225#pragma mark - 2226#pragma mark dispatch_barrier_async 2227 2228DISPATCH_NOINLINE 2229static void 2230_dispatch_barrier_async_f_slow(dispatch_queue_t dq, void *ctxt, 2231 dispatch_function_t func, pthread_priority_t pp, 2232 dispatch_block_flags_t flags) 2233{ 2234 dispatch_continuation_t dc = _dispatch_continuation_alloc_from_heap(); 2235 2236 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT); 2237 dc->dc_func = func; 2238 dc->dc_ctxt = ctxt; 2239 _dispatch_continuation_voucher_set(dc, flags); 2240 _dispatch_continuation_priority_set(dc, pp, flags); 2241 2242 pp = _dispatch_continuation_get_override_priority(dq, dc); 2243 2244 _dispatch_queue_push(dq, dc, pp); 2245} 2246 2247DISPATCH_ALWAYS_INLINE 2248static inline void 2249_dispatch_barrier_async_f2(dispatch_queue_t dq, void *ctxt, 2250 dispatch_function_t func, pthread_priority_t pp, 2251 dispatch_block_flags_t flags) 2252{ 2253 dispatch_continuation_t dc; 2254 2255 dc = fastpath(_dispatch_continuation_alloc_cacheonly()); 2256 if (!dc) { 2257 return _dispatch_barrier_async_f_slow(dq, ctxt, func, pp, flags); 2258 } 2259 2260 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT); 2261 dc->dc_func = func; 2262 dc->dc_ctxt = ctxt; 2263 _dispatch_continuation_voucher_set(dc, flags); 2264 _dispatch_continuation_priority_set(dc, pp, flags); 2265 2266 pp = _dispatch_continuation_get_override_priority(dq, dc); 2267 2268 _dispatch_queue_push(dq, dc, pp); 2269} 2270 2271DISPATCH_NOINLINE 2272static void 2273_dispatch_barrier_async_f(dispatch_queue_t dq, void *ctxt, 2274 dispatch_function_t func, pthread_priority_t pp, 2275 dispatch_block_flags_t flags) 2276{ 2277 return _dispatch_barrier_async_f2(dq, ctxt, func, pp, flags); 2278} 2279 2280DISPATCH_NOINLINE 2281void 2282dispatch_barrier_async_f(dispatch_queue_t dq, void *ctxt, 2283 dispatch_function_t func) 2284{ 2285 return _dispatch_barrier_async_f2(dq, ctxt, func, 0, 0); 2286} 2287 2288DISPATCH_NOINLINE 2289void 2290_dispatch_barrier_async_detached_f(dispatch_queue_t dq, void *ctxt, 2291 dispatch_function_t func) 2292{ 2293 return _dispatch_barrier_async_f2(dq, ctxt, func, 0, 2294 DISPATCH_BLOCK_NO_QOS_CLASS|DISPATCH_BLOCK_NO_VOUCHER); 2295} 2296 2297#ifdef __BLOCKS__ 2298void 2299dispatch_barrier_async(dispatch_queue_t dq, void (^work)(void)) 2300{ 2301 dispatch_function_t func = _dispatch_call_block_and_release; 2302 pthread_priority_t pp = 0; 2303 dispatch_block_flags_t flags = 0; 2304 if (slowpath(_dispatch_block_has_private_data(work))) { 2305 func = _dispatch_block_async_invoke_and_release; 2306 pp = _dispatch_block_get_priority(work); 2307 flags = _dispatch_block_get_flags(work); 2308 // balanced in d_block_async_invoke_and_release or d_block_wait 2309 if (dispatch_atomic_cmpxchg2o(_dispatch_block_get_data(work), 2310 dbpd_queue, NULL, dq, release)) { 2311 _dispatch_retain(dq); 2312 } 2313 } 2314 _dispatch_barrier_async_f(dq, _dispatch_Block_copy(work), func, pp, flags); 2315} 2316#endif 2317 2318#pragma mark - 2319#pragma mark dispatch_async 2320 2321void 2322_dispatch_async_redirect_invoke(void *ctxt) 2323{ 2324 struct dispatch_continuation_s *dc = ctxt; 2325 struct dispatch_continuation_s *other_dc = dc->dc_other; 2326 dispatch_queue_t old_dq, dq = dc->dc_data, rq; 2327 2328 old_dq = _dispatch_thread_getspecific(dispatch_queue_key); 2329 _dispatch_thread_setspecific(dispatch_queue_key, dq); 2330 pthread_priority_t old_dp = _dispatch_set_defaultpriority(dq->dq_priority); 2331 _dispatch_continuation_pop(other_dc); 2332 _dispatch_reset_defaultpriority(old_dp); 2333 _dispatch_thread_setspecific(dispatch_queue_key, old_dq); 2334 2335 rq = dq->do_targetq; 2336 while (slowpath(rq->do_targetq) && rq != old_dq) { 2337 if (dispatch_atomic_sub2o(rq, dq_running, 2, relaxed) == 0) { 2338 _dispatch_queue_wakeup(rq); 2339 } 2340 rq = rq->do_targetq; 2341 } 2342 2343 if (dispatch_atomic_sub2o(dq, dq_running, 2, relaxed) == 0) { 2344 _dispatch_queue_wakeup(dq); 2345 } 2346 _dispatch_release(dq); 2347} 2348 2349static inline void 2350_dispatch_async_f_redirect2(dispatch_queue_t dq, dispatch_continuation_t dc, 2351 pthread_priority_t pp) 2352{ 2353 uint32_t running = 2; 2354 2355 // Find the queue to redirect to 2356 do { 2357 if (slowpath(dq->dq_items_tail) || 2358 slowpath(DISPATCH_OBJECT_SUSPENDED(dq)) || 2359 slowpath(dq->dq_width == 1)) { 2360 break; 2361 } 2362 running = dispatch_atomic_add2o(dq, dq_running, 2, relaxed); 2363 if (slowpath(running & 1) || slowpath(running > dq->dq_width)) { 2364 running = dispatch_atomic_sub2o(dq, dq_running, 2, relaxed); 2365 break; 2366 } 2367 dq = dq->do_targetq; 2368 } while (slowpath(dq->do_targetq)); 2369 2370 _dispatch_queue_push_wakeup(dq, dc, pp, running == 0); 2371} 2372 2373DISPATCH_NOINLINE 2374static void 2375_dispatch_async_f_redirect(dispatch_queue_t dq, 2376 dispatch_continuation_t other_dc, pthread_priority_t pp) 2377{ 2378 dispatch_continuation_t dc = _dispatch_continuation_alloc(); 2379 2380 dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT; 2381 dc->dc_func = _dispatch_async_redirect_invoke; 2382 dc->dc_ctxt = dc; 2383 dc->dc_data = dq; 2384 dc->dc_other = other_dc; 2385 dc->dc_priority = 0; 2386 dc->dc_voucher = NULL; 2387 2388 _dispatch_retain(dq); 2389 dq = dq->do_targetq; 2390 if (slowpath(dq->do_targetq)) { 2391 return _dispatch_async_f_redirect2(dq, dc, pp); 2392 } 2393 2394 _dispatch_queue_push(dq, dc, pp); 2395} 2396 2397DISPATCH_NOINLINE 2398static void 2399_dispatch_async_f2(dispatch_queue_t dq, dispatch_continuation_t dc, 2400 pthread_priority_t pp) 2401{ 2402 uint32_t running = 2; 2403 2404 do { 2405 if (slowpath(dq->dq_items_tail) 2406 || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))) { 2407 break; 2408 } 2409 running = dispatch_atomic_add2o(dq, dq_running, 2, relaxed); 2410 if (slowpath(running > dq->dq_width)) { 2411 running = dispatch_atomic_sub2o(dq, dq_running, 2, relaxed); 2412 break; 2413 } 2414 if (!slowpath(running & 1)) { 2415 return _dispatch_async_f_redirect(dq, dc, pp); 2416 } 2417 running = dispatch_atomic_sub2o(dq, dq_running, 2, relaxed); 2418 // We might get lucky and find that the barrier has ended by now 2419 } while (!(running & 1)); 2420 2421 _dispatch_queue_push_wakeup(dq, dc, pp, running == 0); 2422} 2423 2424DISPATCH_NOINLINE 2425static void 2426_dispatch_async_f_slow(dispatch_queue_t dq, void *ctxt, 2427 dispatch_function_t func, pthread_priority_t pp, 2428 dispatch_block_flags_t flags) 2429{ 2430 dispatch_continuation_t dc = _dispatch_continuation_alloc_from_heap(); 2431 2432 dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT; 2433 dc->dc_func = func; 2434 dc->dc_ctxt = ctxt; 2435 _dispatch_continuation_voucher_set(dc, flags); 2436 _dispatch_continuation_priority_set(dc, pp, flags); 2437 2438 pp = _dispatch_continuation_get_override_priority(dq, dc); 2439 2440 // No fastpath/slowpath hint because we simply don't know 2441 if (dq->do_targetq) { 2442 return _dispatch_async_f2(dq, dc, pp); 2443 } 2444 2445 _dispatch_queue_push(dq, dc, pp); 2446} 2447 2448DISPATCH_ALWAYS_INLINE 2449static inline void 2450_dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func, 2451 pthread_priority_t pp, dispatch_block_flags_t flags) 2452{ 2453 dispatch_continuation_t dc; 2454 2455 // No fastpath/slowpath hint because we simply don't know 2456 if (dq->dq_width == 1 || flags & DISPATCH_BLOCK_BARRIER) { 2457 return _dispatch_barrier_async_f(dq, ctxt, func, pp, flags); 2458 } 2459 2460 dc = fastpath(_dispatch_continuation_alloc_cacheonly()); 2461 if (!dc) { 2462 return _dispatch_async_f_slow(dq, ctxt, func, pp, flags); 2463 } 2464 2465 dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT; 2466 dc->dc_func = func; 2467 dc->dc_ctxt = ctxt; 2468 _dispatch_continuation_voucher_set(dc, flags); 2469 _dispatch_continuation_priority_set(dc, pp, flags); 2470 2471 pp = _dispatch_continuation_get_override_priority(dq, dc); 2472 2473 // No fastpath/slowpath hint because we simply don't know 2474 if (dq->do_targetq) { 2475 return _dispatch_async_f2(dq, dc, pp); 2476 } 2477 2478 _dispatch_queue_push(dq, dc, pp); 2479} 2480 2481DISPATCH_NOINLINE 2482void 2483dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func) 2484{ 2485 return _dispatch_async_f(dq, ctxt, func, 0, 0); 2486} 2487 2488#ifdef __BLOCKS__ 2489void 2490dispatch_async(dispatch_queue_t dq, void (^work)(void)) 2491{ 2492 dispatch_function_t func = _dispatch_call_block_and_release; 2493 dispatch_block_flags_t flags = 0; 2494 pthread_priority_t pp = 0; 2495 if (slowpath(_dispatch_block_has_private_data(work))) { 2496 func = _dispatch_block_async_invoke_and_release; 2497 pp = _dispatch_block_get_priority(work); 2498 flags = _dispatch_block_get_flags(work); 2499 // balanced in d_block_async_invoke_and_release or d_block_wait 2500 if (dispatch_atomic_cmpxchg2o(_dispatch_block_get_data(work), 2501 dbpd_queue, NULL, dq, release)) { 2502 _dispatch_retain(dq); 2503 } 2504 } 2505 _dispatch_async_f(dq, _dispatch_Block_copy(work), func, pp, flags); 2506} 2507#endif 2508 2509#pragma mark - 2510#pragma mark dispatch_group_async 2511 2512DISPATCH_ALWAYS_INLINE 2513static inline void 2514_dispatch_group_async_f(dispatch_group_t dg, dispatch_queue_t dq, void *ctxt, 2515 dispatch_function_t func, pthread_priority_t pp, 2516 dispatch_block_flags_t flags) 2517{ 2518 dispatch_continuation_t dc; 2519 2520 _dispatch_retain(dg); 2521 dispatch_group_enter(dg); 2522 2523 dc = _dispatch_continuation_alloc(); 2524 2525 unsigned long barrier = (flags & DISPATCH_BLOCK_BARRIER) ? 2526 DISPATCH_OBJ_BARRIER_BIT : 0; 2527 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_GROUP_BIT | 2528 barrier); 2529 dc->dc_func = func; 2530 dc->dc_ctxt = ctxt; 2531 dc->dc_data = dg; 2532 _dispatch_continuation_voucher_set(dc, flags); 2533 _dispatch_continuation_priority_set(dc, pp, flags); 2534 2535 pp = _dispatch_continuation_get_override_priority(dq, dc); 2536 2537 // No fastpath/slowpath hint because we simply don't know 2538 if (dq->dq_width != 1 && !barrier && dq->do_targetq) { 2539 return _dispatch_async_f2(dq, dc, pp); 2540 } 2541 2542 _dispatch_queue_push(dq, dc, pp); 2543} 2544 2545DISPATCH_NOINLINE 2546void 2547dispatch_group_async_f(dispatch_group_t dg, dispatch_queue_t dq, void *ctxt, 2548 dispatch_function_t func) 2549{ 2550 return _dispatch_group_async_f(dg, dq, ctxt, func, 0, 0); 2551} 2552 2553#ifdef __BLOCKS__ 2554void 2555dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq, 2556 dispatch_block_t db) 2557{ 2558 dispatch_function_t func = _dispatch_call_block_and_release; 2559 dispatch_block_flags_t flags = 0; 2560 pthread_priority_t pp = 0; 2561 if (slowpath(_dispatch_block_has_private_data(db))) { 2562 func = _dispatch_block_async_invoke_and_release; 2563 pp = _dispatch_block_get_priority(db); 2564 flags = _dispatch_block_get_flags(db); 2565 // balanced in d_block_async_invoke_and_release or d_block_wait 2566 if (dispatch_atomic_cmpxchg2o(_dispatch_block_get_data(db), 2567 dbpd_queue, NULL, dq, release)) { 2568 _dispatch_retain(dq); 2569 } 2570 } 2571 _dispatch_group_async_f(dg, dq, _dispatch_Block_copy(db), func, pp, flags); 2572} 2573#endif 2574 2575#pragma mark - 2576#pragma mark dispatch_function_invoke 2577 2578static void _dispatch_sync_f(dispatch_queue_t dq, void *ctxt, 2579 dispatch_function_t func, pthread_priority_t pp); 2580 2581DISPATCH_ALWAYS_INLINE 2582static inline void 2583_dispatch_function_invoke(dispatch_queue_t dq, void *ctxt, 2584 dispatch_function_t func) 2585{ 2586 dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key); 2587 _dispatch_thread_setspecific(dispatch_queue_key, dq); 2588 _dispatch_client_callout(ctxt, func); 2589 _dispatch_perfmon_workitem_inc(); 2590 _dispatch_thread_setspecific(dispatch_queue_key, old_dq); 2591} 2592 2593void 2594_dispatch_sync_recurse_invoke(void *ctxt) 2595{ 2596 dispatch_continuation_t dc = ctxt; 2597 _dispatch_function_invoke(dc->dc_data, dc->dc_ctxt, dc->dc_func); 2598} 2599 2600DISPATCH_ALWAYS_INLINE 2601static inline void 2602_dispatch_function_recurse(dispatch_queue_t dq, void *ctxt, 2603 dispatch_function_t func, pthread_priority_t pp) 2604{ 2605 struct dispatch_continuation_s dc = { 2606 .dc_data = dq, 2607 .dc_func = func, 2608 .dc_ctxt = ctxt, 2609 }; 2610 _dispatch_sync_f(dq->do_targetq, &dc, _dispatch_sync_recurse_invoke, pp); 2611} 2612 2613#pragma mark - 2614#pragma mark dispatch_barrier_sync 2615 2616static void _dispatch_sync_f_invoke(dispatch_queue_t dq, void *ctxt, 2617 dispatch_function_t func); 2618 2619DISPATCH_ALWAYS_INLINE_NDEBUG 2620static inline _dispatch_thread_semaphore_t 2621_dispatch_barrier_sync_f_pop(dispatch_queue_t dq, dispatch_object_t dou, 2622 bool lock) 2623{ 2624 _dispatch_thread_semaphore_t sema; 2625 dispatch_continuation_t dc = dou._dc; 2626 mach_port_t th; 2627 2628 if (DISPATCH_OBJ_IS_VTABLE(dc) || ((long)dc->do_vtable & 2629 (DISPATCH_OBJ_BARRIER_BIT | DISPATCH_OBJ_SYNC_SLOW_BIT)) != 2630 (DISPATCH_OBJ_BARRIER_BIT | DISPATCH_OBJ_SYNC_SLOW_BIT)) { 2631 return 0; 2632 } 2633 _dispatch_trace_continuation_pop(dq, dc); 2634 _dispatch_perfmon_workitem_inc(); 2635 2636 th = (mach_port_t)dc->dc_data; 2637 dc = dc->dc_ctxt; 2638 dq = dc->dc_data; 2639 sema = (_dispatch_thread_semaphore_t)dc->dc_other; 2640 if (lock) { 2641 (void)dispatch_atomic_add2o(dq, do_suspend_cnt, 2642 DISPATCH_OBJECT_SUSPEND_INTERVAL, relaxed); 2643 // rdar://problem/9032024 running lock must be held until sync_f_slow 2644 // returns 2645 (void)dispatch_atomic_add2o(dq, dq_running, 2, relaxed); 2646 } 2647 _dispatch_introspection_queue_item_complete(dou); 2648 _dispatch_wqthread_override_start(th, 2649 _dispatch_queue_get_override_priority(dq)); 2650 return sema ? sema : MACH_PORT_DEAD; 2651} 2652 2653static void 2654_dispatch_barrier_sync_f_slow_invoke(void *ctxt) 2655{ 2656 dispatch_continuation_t dc = ctxt; 2657 dispatch_queue_t dq = dc->dc_data; 2658 _dispatch_thread_semaphore_t sema; 2659 sema = (_dispatch_thread_semaphore_t)dc->dc_other; 2660 2661 dispatch_assert(dq == _dispatch_queue_get_current()); 2662#if DISPATCH_COCOA_COMPAT 2663 if (slowpath(dq->dq_is_thread_bound)) { 2664 // The queue is bound to a non-dispatch thread (e.g. main thread) 2665 _dispatch_continuation_voucher_adopt(dc); 2666 _dispatch_client_callout(dc->dc_ctxt, dc->dc_func); 2667 dispatch_atomic_store2o(dc, dc_func, NULL, release); 2668 _dispatch_thread_semaphore_signal(sema); // release 2669 return; 2670 } 2671#endif 2672 (void)dispatch_atomic_add2o(dq, do_suspend_cnt, 2673 DISPATCH_OBJECT_SUSPEND_INTERVAL, relaxed); 2674 // rdar://9032024 running lock must be held until sync_f_slow returns 2675 (void)dispatch_atomic_add2o(dq, dq_running, 2, relaxed); 2676 _dispatch_thread_semaphore_signal(sema); // release 2677} 2678 2679DISPATCH_NOINLINE 2680static void 2681_dispatch_barrier_sync_f_slow(dispatch_queue_t dq, void *ctxt, 2682 dispatch_function_t func, pthread_priority_t pp) 2683{ 2684 if (slowpath(!dq->do_targetq)) { 2685 // the global concurrent queues do not need strict ordering 2686 (void)dispatch_atomic_add2o(dq, dq_running, 2, relaxed); 2687 return _dispatch_sync_f_invoke(dq, ctxt, func); 2688 } 2689 if (!pp) pp = (_dispatch_get_priority() | _PTHREAD_PRIORITY_ENFORCE_FLAG); 2690 _dispatch_thread_semaphore_t sema = _dispatch_get_thread_semaphore(); 2691 struct dispatch_continuation_s dc = { 2692 .dc_data = dq, 2693#if DISPATCH_COCOA_COMPAT 2694 .dc_func = func, 2695 .dc_ctxt = ctxt, 2696#endif 2697 .dc_other = (void*)sema, 2698 }; 2699#if DISPATCH_COCOA_COMPAT 2700 // It's preferred to execute synchronous blocks on the current thread 2701 // due to thread-local side effects, garbage collection, etc. However, 2702 // blocks submitted to the main thread MUST be run on the main thread 2703 if (slowpath(dq->dq_is_thread_bound)) { 2704 _dispatch_continuation_voucher_set(&dc, 0); 2705 } 2706#endif 2707 struct dispatch_continuation_s dbss = { 2708 .do_vtable = (void *)(DISPATCH_OBJ_BARRIER_BIT | 2709 DISPATCH_OBJ_SYNC_SLOW_BIT), 2710 .dc_func = _dispatch_barrier_sync_f_slow_invoke, 2711 .dc_ctxt = &dc, 2712 .dc_data = (void*)(uintptr_t)_dispatch_thread_port(), 2713 .dc_priority = pp, 2714 }; 2715 _dispatch_queue_push(dq, &dbss, 2716 _dispatch_continuation_get_override_priority(dq, &dbss)); 2717 2718 _dispatch_thread_semaphore_wait(sema); // acquire 2719 _dispatch_put_thread_semaphore(sema); 2720 2721#if DISPATCH_COCOA_COMPAT 2722 // Queue bound to a non-dispatch thread 2723 if (dc.dc_func == NULL) { 2724 return; 2725 } 2726#endif 2727 2728 _dispatch_queue_set_thread(dq); 2729 if (slowpath(dq->do_targetq->do_targetq)) { 2730 _dispatch_function_recurse(dq, ctxt, func, pp); 2731 } else { 2732 _dispatch_function_invoke(dq, ctxt, func); 2733 } 2734 _dispatch_queue_clear_thread(dq); 2735 2736 if (fastpath(dq->do_suspend_cnt < 2 * DISPATCH_OBJECT_SUSPEND_INTERVAL) && 2737 dq->dq_running == 2) { 2738 // rdar://problem/8290662 "lock transfer" 2739 sema = _dispatch_queue_drain_one_barrier_sync(dq); 2740 if (sema) { 2741 _dispatch_thread_semaphore_signal(sema); // release 2742 return; 2743 } 2744 } 2745 (void)dispatch_atomic_sub2o(dq, do_suspend_cnt, 2746 DISPATCH_OBJECT_SUSPEND_INTERVAL, release); 2747 if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2, relaxed) == 0)) { 2748 _dispatch_queue_wakeup(dq); 2749 } 2750} 2751 2752DISPATCH_NOINLINE 2753static void 2754_dispatch_barrier_sync_f2(dispatch_queue_t dq) 2755{ 2756 if (!slowpath(DISPATCH_OBJECT_SUSPENDED(dq))) { 2757 // rdar://problem/8290662 "lock transfer" 2758 _dispatch_thread_semaphore_t sema; 2759 sema = _dispatch_queue_drain_one_barrier_sync(dq); 2760 if (sema) { 2761 (void)dispatch_atomic_add2o(dq, do_suspend_cnt, 2762 DISPATCH_OBJECT_SUSPEND_INTERVAL, relaxed); 2763 // rdar://9032024 running lock must be held until sync_f_slow 2764 // returns: increment by 2 and decrement by 1 2765 (void)dispatch_atomic_inc2o(dq, dq_running, relaxed); 2766 _dispatch_thread_semaphore_signal(sema); 2767 return; 2768 } 2769 } 2770 if (slowpath(dispatch_atomic_dec2o(dq, dq_running, release) == 0)) { 2771 _dispatch_queue_wakeup(dq); 2772 } 2773} 2774 2775DISPATCH_NOINLINE 2776static void 2777_dispatch_barrier_sync_f_invoke(dispatch_queue_t dq, void *ctxt, 2778 dispatch_function_t func) 2779{ 2780 _dispatch_queue_set_thread(dq); 2781 _dispatch_function_invoke(dq, ctxt, func); 2782 _dispatch_queue_clear_thread(dq); 2783 if (slowpath(dq->dq_items_tail)) { 2784 return _dispatch_barrier_sync_f2(dq); 2785 } 2786 if (slowpath(dispatch_atomic_dec2o(dq, dq_running, release) == 0)) { 2787 _dispatch_queue_wakeup(dq); 2788 } 2789} 2790 2791DISPATCH_NOINLINE 2792static void 2793_dispatch_barrier_sync_f_recurse(dispatch_queue_t dq, void *ctxt, 2794 dispatch_function_t func, pthread_priority_t pp) 2795{ 2796 _dispatch_queue_set_thread(dq); 2797 _dispatch_function_recurse(dq, ctxt, func, pp); 2798 _dispatch_queue_clear_thread(dq); 2799 if (slowpath(dq->dq_items_tail)) { 2800 return _dispatch_barrier_sync_f2(dq); 2801 } 2802 if (slowpath(dispatch_atomic_dec2o(dq, dq_running, release) == 0)) { 2803 _dispatch_queue_wakeup(dq); 2804 } 2805} 2806 2807DISPATCH_NOINLINE 2808static void 2809_dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt, 2810 dispatch_function_t func, pthread_priority_t pp) 2811{ 2812 // 1) ensure that this thread hasn't enqueued anything ahead of this call 2813 // 2) the queue is not suspended 2814 if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){ 2815 return _dispatch_barrier_sync_f_slow(dq, ctxt, func, pp); 2816 } 2817 if (slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_running, 0, 1, acquire))) { 2818 // global concurrent queues and queues bound to non-dispatch threads 2819 // always fall into the slow case 2820 return _dispatch_barrier_sync_f_slow(dq, ctxt, func, pp); 2821 } 2822 if (slowpath(dq->do_targetq->do_targetq)) { 2823 return _dispatch_barrier_sync_f_recurse(dq, ctxt, func, pp); 2824 } 2825 _dispatch_barrier_sync_f_invoke(dq, ctxt, func); 2826} 2827 2828DISPATCH_NOINLINE 2829void 2830dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt, 2831 dispatch_function_t func) 2832{ 2833 // 1) ensure that this thread hasn't enqueued anything ahead of this call 2834 // 2) the queue is not suspended 2835 if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){ 2836 return _dispatch_barrier_sync_f_slow(dq, ctxt, func, 0); 2837 } 2838 if (slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_running, 0, 1, acquire))) { 2839 // global concurrent queues and queues bound to non-dispatch threads 2840 // always fall into the slow case 2841 return _dispatch_barrier_sync_f_slow(dq, ctxt, func, 0); 2842 } 2843 if (slowpath(dq->do_targetq->do_targetq)) { 2844 return _dispatch_barrier_sync_f_recurse(dq, ctxt, func, 0); 2845 } 2846 _dispatch_barrier_sync_f_invoke(dq, ctxt, func); 2847} 2848 2849#ifdef __BLOCKS__ 2850DISPATCH_NOINLINE 2851static void 2852_dispatch_barrier_sync_slow(dispatch_queue_t dq, void (^work)(void)) 2853{ 2854 bool has_pd = _dispatch_block_has_private_data(work); 2855 dispatch_function_t func = _dispatch_Block_invoke(work); 2856 pthread_priority_t pp = 0; 2857 if (has_pd) { 2858 func = _dispatch_block_sync_invoke; 2859 pp = _dispatch_block_get_priority(work); 2860 dispatch_block_flags_t flags = _dispatch_block_get_flags(work); 2861 if (flags & DISPATCH_BLOCK_HAS_PRIORITY) { 2862 pthread_priority_t tp = _dispatch_get_priority(); 2863 if (pp < tp) { 2864 pp = tp | _PTHREAD_PRIORITY_ENFORCE_FLAG; 2865 } else if (!(flags & DISPATCH_BLOCK_INHERIT_QOS_CLASS)) { 2866 pp |= _PTHREAD_PRIORITY_ENFORCE_FLAG; 2867 } 2868 } 2869 // balanced in d_block_sync_invoke or d_block_wait 2870 if (dispatch_atomic_cmpxchg2o(_dispatch_block_get_data(work), 2871 dbpd_queue, NULL, dq, release)) { 2872 _dispatch_retain(dq); 2873 } 2874#if DISPATCH_COCOA_COMPAT 2875 } else if (dq->dq_is_thread_bound && dispatch_begin_thread_4GC) { 2876 // Blocks submitted to the main queue MUST be run on the main thread, 2877 // under GC we must Block_copy in order to notify the thread-local 2878 // garbage collector that the objects are transferring to another thread 2879 // rdar://problem/7176237&7181849&7458685 2880 work = _dispatch_Block_copy(work); 2881 func = _dispatch_call_block_and_release; 2882 } 2883#endif 2884 _dispatch_barrier_sync_f(dq, work, func, pp); 2885} 2886 2887void 2888dispatch_barrier_sync(dispatch_queue_t dq, void (^work)(void)) 2889{ 2890 if (slowpath(dq->dq_is_thread_bound) || 2891 slowpath(_dispatch_block_has_private_data(work))) { 2892 return _dispatch_barrier_sync_slow(dq, work); 2893 } 2894 dispatch_barrier_sync_f(dq, work, _dispatch_Block_invoke(work)); 2895} 2896#endif 2897 2898DISPATCH_NOINLINE 2899static void 2900_dispatch_barrier_trysync_f_invoke(dispatch_queue_t dq, void *ctxt, 2901 dispatch_function_t func) 2902{ 2903 _dispatch_queue_set_thread(dq); 2904 _dispatch_function_invoke(dq, ctxt, func); 2905 _dispatch_queue_clear_thread(dq); 2906 if (slowpath(dispatch_atomic_dec2o(dq, dq_running, release) == 0)) { 2907 _dispatch_queue_wakeup(dq); 2908 } 2909} 2910 2911DISPATCH_NOINLINE 2912void 2913_dispatch_barrier_trysync_f(dispatch_queue_t dq, void *ctxt, 2914 dispatch_function_t func) 2915{ 2916 // Use for mutation of queue-/source-internal state only, ignores target 2917 // queue hierarchy! 2918 if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq)) 2919 || slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_running, 0, 1, 2920 acquire))) { 2921 return _dispatch_barrier_async_detached_f(dq, ctxt, func); 2922 } 2923 _dispatch_barrier_trysync_f_invoke(dq, ctxt, func); 2924} 2925 2926#pragma mark - 2927#pragma mark dispatch_sync 2928 2929DISPATCH_NOINLINE 2930static void 2931_dispatch_sync_f_slow(dispatch_queue_t dq, void *ctxt, dispatch_function_t func, 2932 pthread_priority_t pp, bool wakeup) 2933{ 2934 if (!pp) pp = (_dispatch_get_priority() | _PTHREAD_PRIORITY_ENFORCE_FLAG); 2935 _dispatch_thread_semaphore_t sema = _dispatch_get_thread_semaphore(); 2936 struct dispatch_continuation_s dc = { 2937 .do_vtable = (void*)DISPATCH_OBJ_SYNC_SLOW_BIT, 2938#if DISPATCH_INTROSPECTION 2939 .dc_func = func, 2940 .dc_ctxt = ctxt, 2941 .dc_data = (void*)(uintptr_t)_dispatch_thread_port(), 2942#endif 2943 .dc_other = (void*)sema, 2944 .dc_priority = pp, 2945 }; 2946 _dispatch_queue_push_wakeup(dq, &dc, 2947 _dispatch_continuation_get_override_priority(dq, &dc), wakeup); 2948 2949 _dispatch_thread_semaphore_wait(sema); 2950 _dispatch_put_thread_semaphore(sema); 2951 2952 if (slowpath(dq->do_targetq->do_targetq)) { 2953 _dispatch_function_recurse(dq, ctxt, func, pp); 2954 } else { 2955 _dispatch_function_invoke(dq, ctxt, func); 2956 } 2957 2958 if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2, relaxed) == 0)) { 2959 _dispatch_queue_wakeup(dq); 2960 } 2961} 2962 2963DISPATCH_NOINLINE 2964static void 2965_dispatch_sync_f_invoke(dispatch_queue_t dq, void *ctxt, 2966 dispatch_function_t func) 2967{ 2968 _dispatch_function_invoke(dq, ctxt, func); 2969 if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2, relaxed) == 0)) { 2970 _dispatch_queue_wakeup(dq); 2971 } 2972} 2973 2974DISPATCH_NOINLINE 2975static void 2976_dispatch_sync_f_recurse(dispatch_queue_t dq, void *ctxt, 2977 dispatch_function_t func, pthread_priority_t pp) 2978{ 2979 _dispatch_function_recurse(dq, ctxt, func, pp); 2980 if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2, relaxed) == 0)) { 2981 _dispatch_queue_wakeup(dq); 2982 } 2983} 2984 2985static inline void 2986_dispatch_sync_f2(dispatch_queue_t dq, void *ctxt, dispatch_function_t func, 2987 pthread_priority_t pp) 2988{ 2989 // 1) ensure that this thread hasn't enqueued anything ahead of this call 2990 // 2) the queue is not suspended 2991 if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){ 2992 return _dispatch_sync_f_slow(dq, ctxt, func, pp, false); 2993 } 2994 uint32_t running = dispatch_atomic_add2o(dq, dq_running, 2, relaxed); 2995 // re-check suspension after barrier check <rdar://problem/15242126> 2996 if (slowpath(running & 1) || _dispatch_object_suspended(dq)) { 2997 running = dispatch_atomic_sub2o(dq, dq_running, 2, relaxed); 2998 return _dispatch_sync_f_slow(dq, ctxt, func, pp, running == 0); 2999 } 3000 if (slowpath(dq->do_targetq->do_targetq)) { 3001 return _dispatch_sync_f_recurse(dq, ctxt, func, pp); 3002 } 3003 _dispatch_sync_f_invoke(dq, ctxt, func); 3004} 3005 3006DISPATCH_NOINLINE 3007static void 3008_dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func, 3009 pthread_priority_t pp) 3010{ 3011 if (fastpath(dq->dq_width == 1)) { 3012 return _dispatch_barrier_sync_f(dq, ctxt, func, pp); 3013 } 3014 if (slowpath(!dq->do_targetq)) { 3015 // the global concurrent queues do not need strict ordering 3016 (void)dispatch_atomic_add2o(dq, dq_running, 2, relaxed); 3017 return _dispatch_sync_f_invoke(dq, ctxt, func); 3018 } 3019 _dispatch_sync_f2(dq, ctxt, func, pp); 3020} 3021 3022DISPATCH_NOINLINE 3023void 3024dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func) 3025{ 3026 if (fastpath(dq->dq_width == 1)) { 3027 return dispatch_barrier_sync_f(dq, ctxt, func); 3028 } 3029 if (slowpath(!dq->do_targetq)) { 3030 // the global concurrent queues do not need strict ordering 3031 (void)dispatch_atomic_add2o(dq, dq_running, 2, relaxed); 3032 return _dispatch_sync_f_invoke(dq, ctxt, func); 3033 } 3034 _dispatch_sync_f2(dq, ctxt, func, 0); 3035} 3036 3037#ifdef __BLOCKS__ 3038DISPATCH_NOINLINE 3039static void 3040_dispatch_sync_slow(dispatch_queue_t dq, void (^work)(void)) 3041{ 3042 bool has_pd = _dispatch_block_has_private_data(work); 3043 if (has_pd && (_dispatch_block_get_flags(work) & DISPATCH_BLOCK_BARRIER)) { 3044 return _dispatch_barrier_sync_slow(dq, work); 3045 } 3046 dispatch_function_t func = _dispatch_Block_invoke(work); 3047 pthread_priority_t pp = 0; 3048 if (has_pd) { 3049 func = _dispatch_block_sync_invoke; 3050 pp = _dispatch_block_get_priority(work); 3051 dispatch_block_flags_t flags = _dispatch_block_get_flags(work); 3052 if (flags & DISPATCH_BLOCK_HAS_PRIORITY) { 3053 pthread_priority_t tp = _dispatch_get_priority(); 3054 if (pp < tp) { 3055 pp = tp | _PTHREAD_PRIORITY_ENFORCE_FLAG; 3056 } else if (!(flags & DISPATCH_BLOCK_INHERIT_QOS_CLASS)) { 3057 pp |= _PTHREAD_PRIORITY_ENFORCE_FLAG; 3058 } 3059 } 3060 // balanced in d_block_sync_invoke or d_block_wait 3061 if (dispatch_atomic_cmpxchg2o(_dispatch_block_get_data(work), 3062 dbpd_queue, NULL, dq, release)) { 3063 _dispatch_retain(dq); 3064 } 3065#if DISPATCH_COCOA_COMPAT 3066 } else if (dq->dq_is_thread_bound && dispatch_begin_thread_4GC) { 3067 // Blocks submitted to the main queue MUST be run on the main thread, 3068 // under GC we must Block_copy in order to notify the thread-local 3069 // garbage collector that the objects are transferring to another thread 3070 // rdar://problem/7176237&7181849&7458685 3071 work = _dispatch_Block_copy(work); 3072 func = _dispatch_call_block_and_release; 3073#endif 3074 } 3075 if (slowpath(!dq->do_targetq)) { 3076 // the global concurrent queues do not need strict ordering 3077 (void)dispatch_atomic_add2o(dq, dq_running, 2, relaxed); 3078 return _dispatch_sync_f_invoke(dq, work, func); 3079 } 3080 _dispatch_sync_f2(dq, work, func, pp); 3081} 3082 3083void 3084dispatch_sync(dispatch_queue_t dq, void (^work)(void)) 3085{ 3086 if (fastpath(dq->dq_width == 1)) { 3087 return dispatch_barrier_sync(dq, work); 3088 } 3089 if (slowpath(dq->dq_is_thread_bound) || 3090 slowpath(_dispatch_block_has_private_data(work)) ) { 3091 return _dispatch_sync_slow(dq, work); 3092 } 3093 dispatch_sync_f(dq, work, _dispatch_Block_invoke(work)); 3094} 3095#endif 3096 3097#pragma mark - 3098#pragma mark dispatch_after 3099 3100void 3101_dispatch_after_timer_callback(void *ctxt) 3102{ 3103 dispatch_continuation_t dc = ctxt, dc1; 3104 dispatch_source_t ds = dc->dc_data; 3105 dc1 = _dispatch_continuation_free_cacheonly(dc); 3106 _dispatch_client_callout(dc->dc_ctxt, dc->dc_func); 3107 dispatch_source_cancel(ds); 3108 dispatch_release(ds); 3109 if (slowpath(dc1)) { 3110 _dispatch_continuation_free_to_cache_limit(dc1); 3111 } 3112} 3113 3114DISPATCH_NOINLINE 3115void 3116dispatch_after_f(dispatch_time_t when, dispatch_queue_t queue, void *ctxt, 3117 dispatch_function_t func) 3118{ 3119 uint64_t delta, leeway; 3120 dispatch_source_t ds; 3121 3122 if (when == DISPATCH_TIME_FOREVER) { 3123#if DISPATCH_DEBUG 3124 DISPATCH_CLIENT_CRASH( 3125 "dispatch_after_f() called with 'when' == infinity"); 3126#endif 3127 return; 3128 } 3129 3130 delta = _dispatch_timeout(when); 3131 if (delta == 0) { 3132 return dispatch_async_f(queue, ctxt, func); 3133 } 3134 leeway = delta / 10; // <rdar://problem/13447496> 3135 if (leeway < NSEC_PER_MSEC) leeway = NSEC_PER_MSEC; 3136 if (leeway > 60 * NSEC_PER_SEC) leeway = 60 * NSEC_PER_SEC; 3137 3138 // this function can and should be optimized to not use a dispatch source 3139 ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, queue); 3140 dispatch_assert(ds); 3141 3142 // TODO: don't use a separate continuation & voucher 3143 dispatch_continuation_t dc = _dispatch_continuation_alloc(); 3144 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT); 3145 dc->dc_func = func; 3146 dc->dc_ctxt = ctxt; 3147 dc->dc_data = ds; 3148 3149 dispatch_set_context(ds, dc); 3150 dispatch_source_set_event_handler_f(ds, _dispatch_after_timer_callback); 3151 dispatch_source_set_timer(ds, when, DISPATCH_TIME_FOREVER, leeway); 3152 dispatch_resume(ds); 3153} 3154 3155#ifdef __BLOCKS__ 3156void 3157dispatch_after(dispatch_time_t when, dispatch_queue_t queue, 3158 dispatch_block_t work) 3159{ 3160 // test before the copy of the block 3161 if (when == DISPATCH_TIME_FOREVER) { 3162#if DISPATCH_DEBUG 3163 DISPATCH_CLIENT_CRASH( 3164 "dispatch_after() called with 'when' == infinity"); 3165#endif 3166 return; 3167 } 3168 dispatch_after_f(when, queue, _dispatch_Block_copy(work), 3169 _dispatch_call_block_and_release); 3170} 3171#endif 3172 3173#pragma mark - 3174#pragma mark dispatch_queue_push 3175 3176DISPATCH_ALWAYS_INLINE 3177static inline void 3178_dispatch_queue_push_list_slow2(dispatch_queue_t dq, pthread_priority_t pp, 3179 struct dispatch_object_s *obj, bool retained) 3180{ 3181 // The queue must be retained before dq_items_head is written in order 3182 // to ensure that the reference is still valid when _dispatch_wakeup is 3183 // called. Otherwise, if preempted between the assignment to 3184 // dq_items_head and _dispatch_wakeup, the blocks submitted to the 3185 // queue may release the last reference to the queue when invoked by 3186 // _dispatch_queue_drain. <rdar://problem/6932776> 3187 if (!retained) _dispatch_retain(dq); 3188 dq->dq_items_head = obj; 3189 return _dispatch_queue_wakeup_with_qos_and_release(dq, pp); 3190} 3191 3192DISPATCH_NOINLINE 3193void 3194_dispatch_queue_push_list_slow(dispatch_queue_t dq, pthread_priority_t pp, 3195 struct dispatch_object_s *obj, unsigned int n, bool retained) 3196{ 3197 if (dx_type(dq) == DISPATCH_QUEUE_ROOT_TYPE && !dq->dq_is_thread_bound) { 3198 dispatch_assert(!retained); 3199 dispatch_atomic_store2o(dq, dq_items_head, obj, relaxed); 3200 return _dispatch_queue_wakeup_global2(dq, n); 3201 } 3202 _dispatch_queue_push_list_slow2(dq, pp, obj, retained); 3203} 3204 3205DISPATCH_NOINLINE 3206void 3207_dispatch_queue_push_slow(dispatch_queue_t dq, pthread_priority_t pp, 3208 struct dispatch_object_s *obj, bool retained) 3209{ 3210 if (dx_type(dq) == DISPATCH_QUEUE_ROOT_TYPE && !dq->dq_is_thread_bound) { 3211 dispatch_assert(!retained); 3212 dispatch_atomic_store2o(dq, dq_items_head, obj, relaxed); 3213 return _dispatch_queue_wakeup_global(dq); 3214 } 3215 _dispatch_queue_push_list_slow2(dq, pp, obj, retained); 3216} 3217 3218#pragma mark - 3219#pragma mark dispatch_queue_probe 3220 3221unsigned long 3222_dispatch_queue_probe(dispatch_queue_t dq) 3223{ 3224 return _dispatch_queue_class_probe(dq); 3225} 3226 3227#if DISPATCH_COCOA_COMPAT 3228unsigned long 3229_dispatch_runloop_queue_probe(dispatch_queue_t dq) 3230{ 3231 if (_dispatch_queue_class_probe(dq)) { 3232 if (dq->do_xref_cnt == -1) return true; // <rdar://problem/14026816> 3233 return _dispatch_runloop_queue_wakeup(dq); 3234 } 3235 return false; 3236} 3237#endif 3238 3239unsigned long 3240_dispatch_mgr_queue_probe(dispatch_queue_t dq) 3241{ 3242 if (_dispatch_queue_class_probe(dq)) { 3243 return _dispatch_mgr_wakeup(dq); 3244 } 3245 return false; 3246} 3247 3248unsigned long 3249_dispatch_root_queue_probe(dispatch_queue_t dq) 3250{ 3251 _dispatch_queue_wakeup_global(dq); 3252 return false; 3253} 3254 3255#pragma mark - 3256#pragma mark dispatch_wakeup 3257 3258// 6618342 Contact the team that owns the Instrument DTrace probe before 3259// renaming this symbol 3260dispatch_queue_t 3261_dispatch_wakeup(dispatch_object_t dou) 3262{ 3263 unsigned long type = dx_metatype(dou._do); 3264 if (type == _DISPATCH_QUEUE_TYPE || type == _DISPATCH_SOURCE_TYPE) { 3265 return _dispatch_queue_wakeup(dou._dq); 3266 } 3267 if (_dispatch_object_suspended(dou)) { 3268 return NULL; 3269 } 3270 if (!dx_probe(dou._do)) { 3271 return NULL; 3272 } 3273 if (!dispatch_atomic_cmpxchg2o(dou._do, do_suspend_cnt, 0, 3274 DISPATCH_OBJECT_SUSPEND_LOCK, acquire)) { 3275 return NULL; 3276 } 3277 _dispatch_retain(dou._do); 3278 dispatch_queue_t tq = dou._do->do_targetq; 3279 _dispatch_queue_push(tq, dou._do, 0); 3280 return tq; // libdispatch does not need this, but the Instrument DTrace 3281 // probe does 3282} 3283 3284#if DISPATCH_COCOA_COMPAT 3285static inline void 3286_dispatch_runloop_queue_wakeup_thread(dispatch_queue_t dq) 3287{ 3288 mach_port_t mp = (mach_port_t)dq->do_ctxt; 3289 if (!mp) { 3290 return; 3291 } 3292 kern_return_t kr = _dispatch_send_wakeup_runloop_thread(mp, 0); 3293 switch (kr) { 3294 case MACH_SEND_TIMEOUT: 3295 case MACH_SEND_TIMED_OUT: 3296 case MACH_SEND_INVALID_DEST: 3297 break; 3298 default: 3299 (void)dispatch_assume_zero(kr); 3300 break; 3301 } 3302} 3303 3304DISPATCH_NOINLINE DISPATCH_WEAK 3305unsigned long 3306_dispatch_runloop_queue_wakeup(dispatch_queue_t dq) 3307{ 3308 _dispatch_runloop_queue_wakeup_thread(dq); 3309 return false; 3310} 3311 3312DISPATCH_NOINLINE 3313static dispatch_queue_t 3314_dispatch_main_queue_wakeup(void) 3315{ 3316 dispatch_queue_t dq = &_dispatch_main_q; 3317 if (!dq->dq_is_thread_bound) { 3318 return NULL; 3319 } 3320 dispatch_once_f(&_dispatch_main_q_port_pred, dq, 3321 _dispatch_runloop_queue_port_init); 3322 _dispatch_runloop_queue_wakeup_thread(dq); 3323 return NULL; 3324} 3325#endif 3326 3327DISPATCH_NOINLINE 3328static void 3329_dispatch_queue_wakeup_global_slow(dispatch_queue_t dq, unsigned int n) 3330{ 3331 dispatch_root_queue_context_t qc = dq->do_ctxt; 3332 uint32_t i = n; 3333 int r; 3334 3335 _dispatch_debug_root_queue(dq, __func__); 3336 dispatch_once_f(&_dispatch_root_queues_pred, NULL, 3337 _dispatch_root_queues_init); 3338 3339#if HAVE_PTHREAD_WORKQUEUES 3340#if DISPATCH_USE_PTHREAD_POOL 3341 if (qc->dgq_kworkqueue != (void*)(~0ul)) 3342#endif 3343 { 3344 _dispatch_root_queue_debug("requesting new worker thread for global " 3345 "queue: %p", dq); 3346#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 3347 if (qc->dgq_kworkqueue) { 3348 pthread_workitem_handle_t wh; 3349 unsigned int gen_cnt; 3350 do { 3351 r = pthread_workqueue_additem_np(qc->dgq_kworkqueue, 3352 _dispatch_worker_thread4, dq, &wh, &gen_cnt); 3353 (void)dispatch_assume_zero(r); 3354 } while (--i); 3355 return; 3356 } 3357#endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 3358#if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP 3359 if (!dq->dq_priority) { 3360 r = pthread_workqueue_addthreads_np(qc->dgq_wq_priority, 3361 qc->dgq_wq_options, (int)i); 3362 (void)dispatch_assume_zero(r); 3363 return; 3364 } 3365#endif 3366#if HAVE_PTHREAD_WORKQUEUE_QOS 3367 r = _pthread_workqueue_addthreads((int)i, dq->dq_priority); 3368 (void)dispatch_assume_zero(r); 3369#endif 3370 return; 3371 } 3372#endif // HAVE_PTHREAD_WORKQUEUES 3373#if DISPATCH_USE_PTHREAD_POOL 3374 dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt; 3375 if (fastpath(pqc->dpq_thread_mediator.do_vtable)) { 3376 while (dispatch_semaphore_signal(&pqc->dpq_thread_mediator)) { 3377 if (!--i) { 3378 return; 3379 } 3380 } 3381 } 3382 uint32_t j, t_count; 3383 // seq_cst with atomic store to tail <rdar://problem/16932833> 3384 t_count = dispatch_atomic_load2o(qc, dgq_thread_pool_size, seq_cst); 3385 do { 3386 if (!t_count) { 3387 _dispatch_root_queue_debug("pthread pool is full for root queue: " 3388 "%p", dq); 3389 return; 3390 } 3391 j = i > t_count ? t_count : i; 3392 } while (!dispatch_atomic_cmpxchgvw2o(qc, dgq_thread_pool_size, t_count, 3393 t_count - j, &t_count, acquire)); 3394 3395 pthread_attr_t *attr = &pqc->dpq_thread_attr; 3396 pthread_t tid, *pthr = &tid; 3397#if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES 3398 if (slowpath(dq == &_dispatch_mgr_root_queue)) { 3399 pthr = _dispatch_mgr_root_queue_init(); 3400 } 3401#endif 3402 do { 3403 _dispatch_retain(dq); 3404 while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) { 3405 if (r != EAGAIN) { 3406 (void)dispatch_assume_zero(r); 3407 } 3408 _dispatch_temporary_resource_shortage(); 3409 } 3410 } while (--j); 3411#endif // DISPATCH_USE_PTHREAD_POOL 3412} 3413 3414static inline void 3415_dispatch_queue_wakeup_global2(dispatch_queue_t dq, unsigned int n) 3416{ 3417 if (!_dispatch_queue_class_probe(dq)) { 3418 return; 3419 } 3420#if HAVE_PTHREAD_WORKQUEUES 3421 dispatch_root_queue_context_t qc = dq->do_ctxt; 3422 if ( 3423#if DISPATCH_USE_PTHREAD_POOL 3424 (qc->dgq_kworkqueue != (void*)(~0ul)) && 3425#endif 3426 !dispatch_atomic_cmpxchg2o(qc, dgq_pending, 0, n, relaxed)) { 3427 _dispatch_root_queue_debug("worker thread request still pending for " 3428 "global queue: %p", dq); 3429 return; 3430 } 3431#endif // HAVE_PTHREAD_WORKQUEUES 3432 return _dispatch_queue_wakeup_global_slow(dq, n); 3433} 3434 3435static inline void 3436_dispatch_queue_wakeup_global(dispatch_queue_t dq) 3437{ 3438 return _dispatch_queue_wakeup_global2(dq, 1); 3439} 3440 3441#pragma mark - 3442#pragma mark dispatch_queue_invoke 3443 3444DISPATCH_ALWAYS_INLINE 3445static inline dispatch_queue_t 3446dispatch_queue_invoke2(dispatch_object_t dou, 3447 _dispatch_thread_semaphore_t *sema_ptr) 3448{ 3449 dispatch_queue_t dq = dou._dq; 3450 dispatch_queue_t otq = dq->do_targetq; 3451 dispatch_queue_t cq = _dispatch_queue_get_current(); 3452 3453 if (slowpath(cq != otq)) { 3454 return otq; 3455 } 3456 3457 *sema_ptr = _dispatch_queue_drain(dq); 3458 3459 if (slowpath(otq != dq->do_targetq)) { 3460 // An item on the queue changed the target queue 3461 return dq->do_targetq; 3462 } 3463 return NULL; 3464} 3465 3466// 6618342 Contact the team that owns the Instrument DTrace probe before 3467// renaming this symbol 3468DISPATCH_NOINLINE 3469void 3470_dispatch_queue_invoke(dispatch_queue_t dq) 3471{ 3472 _dispatch_queue_class_invoke(dq, dispatch_queue_invoke2); 3473} 3474 3475#pragma mark - 3476#pragma mark dispatch_queue_drain 3477 3478DISPATCH_ALWAYS_INLINE 3479static inline struct dispatch_object_s* 3480_dispatch_queue_head(dispatch_queue_t dq) 3481{ 3482 struct dispatch_object_s *dc; 3483 _dispatch_wait_until(dc = fastpath(dq->dq_items_head)); 3484 return dc; 3485} 3486 3487DISPATCH_ALWAYS_INLINE 3488static inline struct dispatch_object_s* 3489_dispatch_queue_next(dispatch_queue_t dq, struct dispatch_object_s *dc) 3490{ 3491 struct dispatch_object_s *next_dc; 3492 next_dc = fastpath(dc->do_next); 3493 dq->dq_items_head = next_dc; 3494 if (!next_dc && !dispatch_atomic_cmpxchg2o(dq, dq_items_tail, dc, NULL, 3495 relaxed)) { 3496 _dispatch_wait_until(next_dc = fastpath(dc->do_next)); 3497 dq->dq_items_head = next_dc; 3498 } 3499 return next_dc; 3500} 3501 3502_dispatch_thread_semaphore_t 3503_dispatch_queue_drain(dispatch_object_t dou) 3504{ 3505 dispatch_queue_t dq = dou._dq, orig_tq, old_dq; 3506 old_dq = _dispatch_thread_getspecific(dispatch_queue_key); 3507 struct dispatch_object_s *dc, *next_dc; 3508 _dispatch_thread_semaphore_t sema = 0; 3509 3510 // Continue draining sources after target queue change rdar://8928171 3511 bool check_tq = (dx_type(dq) != DISPATCH_SOURCE_KEVENT_TYPE); 3512 3513 orig_tq = dq->do_targetq; 3514 3515 _dispatch_thread_setspecific(dispatch_queue_key, dq); 3516 pthread_priority_t old_dp = _dispatch_set_defaultpriority(dq->dq_priority); 3517 3518 pthread_priority_t op = _dispatch_queue_get_override_priority(dq); 3519 pthread_priority_t dp = _dispatch_get_defaultpriority(); 3520 dp &= _PTHREAD_PRIORITY_QOS_CLASS_MASK; 3521 if (op > dp) { 3522 _dispatch_wqthread_override_start(dq->dq_thread, op); 3523 } 3524 3525 //dispatch_debug_queue(dq, __func__); 3526 3527 while (dq->dq_items_tail) { 3528 dc = _dispatch_queue_head(dq); 3529 do { 3530 if (DISPATCH_OBJECT_SUSPENDED(dq)) { 3531 goto out; 3532 } 3533 if (dq->dq_running > dq->dq_width) { 3534 goto out; 3535 } 3536 if (slowpath(orig_tq != dq->do_targetq) && check_tq) { 3537 goto out; 3538 } 3539 bool redirect = false; 3540 if (!fastpath(dq->dq_width == 1)) { 3541 if (!DISPATCH_OBJ_IS_VTABLE(dc) && 3542 (long)dc->do_vtable & DISPATCH_OBJ_BARRIER_BIT) { 3543 if (dq->dq_running > 1) { 3544 goto out; 3545 } 3546 } else { 3547 redirect = true; 3548 } 3549 } 3550 next_dc = _dispatch_queue_next(dq, dc); 3551 if (redirect) { 3552 _dispatch_continuation_redirect(dq, dc); 3553 continue; 3554 } 3555 if ((sema = _dispatch_barrier_sync_f_pop(dq, dc, true))) { 3556 goto out; 3557 } 3558 _dispatch_continuation_pop(dc); 3559 _dispatch_perfmon_workitem_inc(); 3560 } while ((dc = next_dc)); 3561 } 3562 3563out: 3564 _dispatch_reset_defaultpriority(old_dp); 3565 _dispatch_thread_setspecific(dispatch_queue_key, old_dq); 3566 return sema; 3567} 3568 3569#if DISPATCH_COCOA_COMPAT 3570static void 3571_dispatch_main_queue_drain(void) 3572{ 3573 dispatch_queue_t dq = &_dispatch_main_q; 3574 if (!dq->dq_items_tail) { 3575 return; 3576 } 3577 struct dispatch_continuation_s marker = { 3578 .do_vtable = NULL, 3579 }; 3580 struct dispatch_object_s *dmarker = (void*)▮ 3581 _dispatch_queue_push_notrace(dq, dmarker, 0); 3582 3583 _dispatch_perfmon_start(); 3584 dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key); 3585 _dispatch_thread_setspecific(dispatch_queue_key, dq); 3586 pthread_priority_t old_pri = _dispatch_get_priority(); 3587 pthread_priority_t old_dp = _dispatch_set_defaultpriority(old_pri); 3588 voucher_t voucher = _voucher_copy(); 3589 3590 struct dispatch_object_s *dc, *next_dc; 3591 dc = _dispatch_queue_head(dq); 3592 do { 3593 next_dc = _dispatch_queue_next(dq, dc); 3594 if (dc == dmarker) { 3595 goto out; 3596 } 3597 _dispatch_continuation_pop(dc); 3598 _dispatch_perfmon_workitem_inc(); 3599 } while ((dc = next_dc)); 3600 DISPATCH_CRASH("Main queue corruption"); 3601 3602out: 3603 if (next_dc) { 3604 _dispatch_main_queue_wakeup(); 3605 } 3606 _dispatch_voucher_debug("main queue restore", voucher); 3607 _dispatch_set_priority_and_replace_voucher(old_pri, voucher); 3608 _dispatch_queue_reset_override_priority(dq); 3609 _dispatch_reset_defaultpriority(old_dp); 3610 _dispatch_thread_setspecific(dispatch_queue_key, old_dq); 3611 _dispatch_perfmon_end(); 3612 _dispatch_force_cache_cleanup(); 3613} 3614 3615static bool 3616_dispatch_runloop_queue_drain_one(dispatch_queue_t dq) 3617{ 3618 if (!dq->dq_items_tail) { 3619 return false; 3620 } 3621 _dispatch_perfmon_start(); 3622 dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key); 3623 _dispatch_thread_setspecific(dispatch_queue_key, dq); 3624 pthread_priority_t old_pri = _dispatch_get_priority(); 3625 pthread_priority_t old_dp = _dispatch_set_defaultpriority(old_pri); 3626 voucher_t voucher = _voucher_copy(); 3627 3628 struct dispatch_object_s *dc, *next_dc; 3629 dc = _dispatch_queue_head(dq); 3630 next_dc = _dispatch_queue_next(dq, dc); 3631 _dispatch_continuation_pop(dc); 3632 _dispatch_perfmon_workitem_inc(); 3633 3634 _dispatch_voucher_debug("runloop queue restore", voucher); 3635 _dispatch_set_priority_and_replace_voucher(old_pri, voucher); 3636 _dispatch_reset_defaultpriority(old_dp); 3637 _dispatch_thread_setspecific(dispatch_queue_key, old_dq); 3638 _dispatch_perfmon_end(); 3639 _dispatch_force_cache_cleanup(); 3640 return next_dc; 3641} 3642#endif 3643 3644DISPATCH_ALWAYS_INLINE_NDEBUG 3645static inline _dispatch_thread_semaphore_t 3646_dispatch_queue_drain_one_barrier_sync(dispatch_queue_t dq) 3647{ 3648 // rdar://problem/8290662 "lock transfer" 3649 struct dispatch_object_s *dc; 3650 _dispatch_thread_semaphore_t sema; 3651 3652 // queue is locked, or suspended and not being drained 3653 dc = dq->dq_items_head; 3654 if (slowpath(!dc) || !(sema = _dispatch_barrier_sync_f_pop(dq, dc, false))){ 3655 return 0; 3656 } 3657 // dequeue dc, it is a barrier sync 3658 (void)_dispatch_queue_next(dq, dc); 3659 return sema; 3660} 3661 3662void 3663_dispatch_mgr_queue_drain(void) 3664{ 3665 dispatch_queue_t dq = &_dispatch_mgr_q; 3666 if (!dq->dq_items_tail) { 3667 return _dispatch_force_cache_cleanup(); 3668 } 3669 _dispatch_perfmon_start(); 3670 if (slowpath(_dispatch_queue_drain(dq))) { 3671 DISPATCH_CRASH("Sync onto manager queue"); 3672 } 3673 _dispatch_voucher_debug("mgr queue clear", NULL); 3674 _voucher_clear(); 3675 _dispatch_queue_reset_override_priority(dq); 3676 _dispatch_reset_defaultpriority_override(); 3677 _dispatch_perfmon_end(); 3678 _dispatch_force_cache_cleanup(); 3679} 3680 3681#pragma mark - 3682#pragma mark _dispatch_queue_wakeup_with_qos 3683 3684DISPATCH_NOINLINE 3685static dispatch_queue_t 3686_dispatch_queue_wakeup_with_qos_slow(dispatch_queue_t dq, pthread_priority_t pp, 3687 bool retained) 3688{ 3689 if (!dx_probe(dq) && (dq->dq_is_thread_bound || !dq->dq_thread)) { 3690 if (retained) _dispatch_release(dq); 3691 return NULL; 3692 } 3693 pp &= _PTHREAD_PRIORITY_QOS_CLASS_MASK; 3694 bool override = _dispatch_queue_override_priority(dq, pp); 3695 if (override && dq->dq_running > 1) { 3696 override = false; 3697 } 3698 3699 if (!dispatch_atomic_cmpxchg2o(dq, do_suspend_cnt, 0, 3700 DISPATCH_OBJECT_SUSPEND_LOCK, acquire)) { 3701#if DISPATCH_COCOA_COMPAT 3702 if (dq == &_dispatch_main_q && dq->dq_is_thread_bound) { 3703 return _dispatch_main_queue_wakeup(); 3704 } 3705#endif 3706 if (override) { 3707 mach_port_t th; 3708 // <rdar://problem/17735825> to traverse the tq chain safely we must 3709 // lock it to ensure it cannot change, unless the queue is running 3710 // and we can just override the thread itself 3711 if (dq->dq_thread) { 3712 _dispatch_wqthread_override_start(dq->dq_thread, pp); 3713 } else if (!dispatch_atomic_cmpxchgv2o(dq, dq_tqthread, 3714 MACH_PORT_NULL, _dispatch_thread_port(), &th, acquire)) { 3715 // already locked, override the owner, trysync will do a queue 3716 // wakeup when it returns. 3717 _dispatch_wqthread_override_start(th, pp); 3718 } else { 3719 dispatch_queue_t tq = dq->do_targetq; 3720 if (_dispatch_queue_prepare_override(dq, tq, pp)) { 3721 _dispatch_queue_push_override(dq, tq, pp); 3722 } else { 3723 _dispatch_queue_wakeup_with_qos(tq, pp); 3724 } 3725 dispatch_atomic_store2o(dq, dq_tqthread, MACH_PORT_NULL, 3726 release); 3727 } 3728 } 3729 if (retained) _dispatch_release(dq); 3730 return NULL; 3731 } 3732 dispatch_queue_t tq = dq->do_targetq; 3733 if (!retained) _dispatch_retain(dq); 3734 if (override) { 3735 override = _dispatch_queue_prepare_override(dq, tq, pp); 3736 } 3737 _dispatch_queue_push(tq, dq, pp); 3738 if (override) { 3739 _dispatch_queue_push_override(dq, tq, pp); 3740 } 3741 return tq; // libdispatch does not need this, but the Instrument DTrace 3742 // probe does 3743} 3744 3745DISPATCH_ALWAYS_INLINE 3746static inline dispatch_queue_t 3747_dispatch_queue_wakeup_with_qos2(dispatch_queue_t dq, pthread_priority_t pp, 3748 bool retained) 3749{ 3750 if (_dispatch_object_suspended(dq)) { 3751 _dispatch_queue_override_priority(dq, pp); 3752 if (retained) _dispatch_release(dq); 3753 return NULL; 3754 } 3755 return _dispatch_queue_wakeup_with_qos_slow(dq, pp, retained); 3756} 3757 3758DISPATCH_NOINLINE 3759void 3760_dispatch_queue_wakeup_with_qos_and_release(dispatch_queue_t dq, 3761 pthread_priority_t pp) 3762{ 3763 (void)_dispatch_queue_wakeup_with_qos2(dq, pp, true); 3764} 3765 3766DISPATCH_NOINLINE 3767void 3768_dispatch_queue_wakeup_with_qos(dispatch_queue_t dq, pthread_priority_t pp) 3769{ 3770 (void)_dispatch_queue_wakeup_with_qos2(dq, pp, false); 3771} 3772 3773DISPATCH_NOINLINE 3774dispatch_queue_t 3775_dispatch_queue_wakeup(dispatch_queue_t dq) 3776{ 3777 return _dispatch_queue_wakeup_with_qos2(dq, 3778 _dispatch_queue_get_override_priority(dq), false); 3779} 3780 3781#if HAVE_PTHREAD_WORKQUEUE_QOS 3782static void 3783_dispatch_queue_override_invoke(void *ctxt) 3784{ 3785 dispatch_continuation_t dc = (dispatch_continuation_t)ctxt; 3786 dispatch_queue_t dq = dc->dc_data; 3787 pthread_priority_t p = 0; 3788 3789 if (!slowpath(DISPATCH_OBJECT_SUSPENDED(dq)) && 3790 fastpath(dispatch_atomic_cmpxchg2o(dq, dq_running, 0, 1, acquire))) { 3791 _dispatch_queue_set_thread(dq); 3792 3793 _dispatch_object_debug(dq, "stolen onto thread 0x%x, 0x%lx", 3794 dq->dq_thread, _dispatch_get_defaultpriority()); 3795 3796 pthread_priority_t old_dp = _dispatch_get_defaultpriority(); 3797 _dispatch_reset_defaultpriority(dc->dc_priority); 3798 3799 dispatch_queue_t tq = NULL; 3800 _dispatch_thread_semaphore_t sema = 0; 3801 tq = dispatch_queue_invoke2(dq, &sema); 3802 3803 _dispatch_queue_clear_thread(dq); 3804 _dispatch_reset_defaultpriority(old_dp); 3805 3806 uint32_t running = dispatch_atomic_dec2o(dq, dq_running, release); 3807 if (sema) { 3808 _dispatch_thread_semaphore_signal(sema); 3809 } else if (!tq && running == 0) { 3810 p = _dispatch_queue_reset_override_priority(dq); 3811 if (p > (dq->dq_priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK)) { 3812 _dispatch_wqthread_override_reset(); 3813 } 3814 } 3815 _dispatch_introspection_queue_item_complete(dq); 3816 if (running == 0) { 3817 return _dispatch_queue_wakeup_with_qos_and_release(dq, p); 3818 } 3819 } else { 3820 mach_port_t th = dq->dq_thread; 3821 if (th) { 3822 p = _dispatch_queue_get_override_priority(dq); 3823 _dispatch_object_debug(dq, "overriding thr 0x%x to priority 0x%lx", 3824 th, p); 3825 _dispatch_wqthread_override_start(th, p); 3826 } 3827 } 3828 _dispatch_release(dq); // added when we pushed the override block 3829} 3830#endif 3831 3832static inline bool 3833_dispatch_queue_prepare_override(dispatch_queue_t dq, dispatch_queue_t tq, 3834 pthread_priority_t p) 3835{ 3836#if HAVE_PTHREAD_WORKQUEUE_QOS 3837 if (dx_type(tq) != DISPATCH_QUEUE_ROOT_TYPE || !tq->dq_priority) { 3838 return false; 3839 } 3840 if (p <= (dq->dq_priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK)) { 3841 return false; 3842 } 3843 if (p <= (tq->dq_priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK)) { 3844 return false; 3845 } 3846 _dispatch_retain(dq); 3847 return true; 3848#else 3849 (void)dq; (void)tq; (void)p; 3850 return false; 3851#endif 3852} 3853 3854static inline void 3855_dispatch_queue_push_override(dispatch_queue_t dq, dispatch_queue_t tq, 3856 pthread_priority_t p) 3857{ 3858#if HAVE_PTHREAD_WORKQUEUE_QOS 3859 unsigned int qosbit, idx, overcommit; 3860 overcommit = (tq->dq_priority & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) ? 1 : 0; 3861 qosbit = (p & _PTHREAD_PRIORITY_QOS_CLASS_MASK) >> 3862 _PTHREAD_PRIORITY_QOS_CLASS_SHIFT; 3863 idx = (unsigned int)__builtin_ffs((int)qosbit); 3864 if (!idx || idx > DISPATCH_QUEUE_QOS_COUNT) { 3865 DISPATCH_CRASH("Corrupted override priority"); 3866 } 3867 dispatch_queue_t rq = &_dispatch_root_queues[((idx-1) << 1) | overcommit]; 3868 3869 dispatch_continuation_t dc = _dispatch_continuation_alloc(); 3870 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT); 3871 dc->dc_func = _dispatch_queue_override_invoke; 3872 dc->dc_ctxt = dc; 3873 dc->dc_priority = tq->dq_priority; 3874 dc->dc_voucher = NULL; 3875 dc->dc_data = dq; 3876 // dq retained by _dispatch_queue_prepare_override 3877 3878 _dispatch_queue_push(rq, dc, 0); 3879#else 3880 (void)dq; (void)tq; (void)p; 3881#endif 3882} 3883 3884#pragma mark - 3885#pragma mark dispatch_root_queue_drain 3886 3887DISPATCH_NOINLINE 3888static bool 3889_dispatch_queue_concurrent_drain_one_slow(dispatch_queue_t dq) 3890{ 3891 dispatch_root_queue_context_t qc = dq->do_ctxt; 3892 struct dispatch_object_s *const mediator = (void *)~0ul; 3893 bool pending = false, available = true; 3894 unsigned int sleep_time = DISPATCH_CONTENTION_USLEEP_START; 3895 3896 do { 3897 // Spin for a short while in case the contention is temporary -- e.g. 3898 // when starting up after dispatch_apply, or when executing a few 3899 // short continuations in a row. 3900 if (_dispatch_contention_wait_until(dq->dq_items_head != mediator)) { 3901 goto out; 3902 } 3903 // Since we have serious contention, we need to back off. 3904 if (!pending) { 3905 // Mark this queue as pending to avoid requests for further threads 3906 (void)dispatch_atomic_inc2o(qc, dgq_pending, relaxed); 3907 pending = true; 3908 } 3909 _dispatch_contention_usleep(sleep_time); 3910 if (fastpath(dq->dq_items_head != mediator)) goto out; 3911 sleep_time *= 2; 3912 } while (sleep_time < DISPATCH_CONTENTION_USLEEP_MAX); 3913 3914 // The ratio of work to libdispatch overhead must be bad. This 3915 // scenario implies that there are too many threads in the pool. 3916 // Create a new pending thread and then exit this thread. 3917 // The kernel will grant a new thread when the load subsides. 3918 _dispatch_debug("contention on global queue: %p", dq); 3919 available = false; 3920out: 3921 if (pending) { 3922 (void)dispatch_atomic_dec2o(qc, dgq_pending, relaxed); 3923 } 3924 if (!available) { 3925 _dispatch_queue_wakeup_global(dq); 3926 } 3927 return available; 3928} 3929 3930DISPATCH_ALWAYS_INLINE 3931static inline bool 3932_dispatch_queue_concurrent_drain_one2(dispatch_queue_t dq) 3933{ 3934 // Wait for queue head and tail to be both non-empty or both empty 3935 bool available; // <rdar://problem/15917893> 3936 _dispatch_wait_until((dq->dq_items_head != NULL) == 3937 (available = (dq->dq_items_tail != NULL))); 3938 return available; 3939} 3940 3941DISPATCH_ALWAYS_INLINE_NDEBUG 3942static inline struct dispatch_object_s * 3943_dispatch_queue_concurrent_drain_one(dispatch_queue_t dq) 3944{ 3945 struct dispatch_object_s *head, *next, *const mediator = (void *)~0ul; 3946 3947start: 3948 // The mediator value acts both as a "lock" and a signal 3949 head = dispatch_atomic_xchg2o(dq, dq_items_head, mediator, relaxed); 3950 3951 if (slowpath(head == NULL)) { 3952 // The first xchg on the tail will tell the enqueueing thread that it 3953 // is safe to blindly write out to the head pointer. A cmpxchg honors 3954 // the algorithm. 3955 if (slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_items_head, mediator, 3956 NULL, relaxed))) { 3957 goto start; 3958 } 3959 if (slowpath(dq->dq_items_tail) && // <rdar://problem/14416349> 3960 _dispatch_queue_concurrent_drain_one2(dq)) { 3961 goto start; 3962 } 3963 _dispatch_root_queue_debug("no work on global queue: %p", dq); 3964 return NULL; 3965 } 3966 3967 if (slowpath(head == mediator)) { 3968 // This thread lost the race for ownership of the queue. 3969 if (fastpath(_dispatch_queue_concurrent_drain_one_slow(dq))) { 3970 goto start; 3971 } 3972 return NULL; 3973 } 3974 3975 // Restore the head pointer to a sane value before returning. 3976 // If 'next' is NULL, then this item _might_ be the last item. 3977 next = fastpath(head->do_next); 3978 3979 if (slowpath(!next)) { 3980 dispatch_atomic_store2o(dq, dq_items_head, NULL, relaxed); 3981 3982 if (dispatch_atomic_cmpxchg2o(dq, dq_items_tail, head, NULL, relaxed)) { 3983 // both head and tail are NULL now 3984 goto out; 3985 } 3986 // There must be a next item now. 3987 _dispatch_wait_until(next = head->do_next); 3988 } 3989 3990 dispatch_atomic_store2o(dq, dq_items_head, next, relaxed); 3991 _dispatch_queue_wakeup_global(dq); 3992out: 3993 return head; 3994} 3995 3996static void 3997_dispatch_root_queue_drain(dispatch_queue_t dq) 3998{ 3999#if DISPATCH_DEBUG 4000 if (_dispatch_thread_getspecific(dispatch_queue_key)) { 4001 DISPATCH_CRASH("Premature thread recycling"); 4002 } 4003#endif 4004 _dispatch_thread_setspecific(dispatch_queue_key, dq); 4005 pthread_priority_t old_pri = _dispatch_get_priority(); 4006 pthread_priority_t pri = dq->dq_priority ? dq->dq_priority : old_pri; 4007 pthread_priority_t old_dp = _dispatch_set_defaultpriority(pri); 4008 4009#if DISPATCH_COCOA_COMPAT 4010 // ensure that high-level memory management techniques do not leak/crash 4011 if (dispatch_begin_thread_4GC) { 4012 dispatch_begin_thread_4GC(); 4013 } 4014 void *pool = _dispatch_autorelease_pool_push(); 4015#endif // DISPATCH_COCOA_COMPAT 4016 4017 _dispatch_perfmon_start(); 4018 struct dispatch_object_s *item; 4019 bool reset = false; 4020 while ((item = fastpath(_dispatch_queue_concurrent_drain_one(dq)))) { 4021 if (reset) _dispatch_wqthread_override_reset(); 4022 _dispatch_continuation_pop(item); 4023 reset = _dispatch_reset_defaultpriority_override(); 4024 } 4025 _dispatch_voucher_debug("root queue clear", NULL); 4026 _dispatch_set_priority_and_replace_voucher(old_pri, NULL); 4027 _dispatch_reset_defaultpriority(old_dp); 4028 _dispatch_perfmon_end(); 4029 4030#if DISPATCH_COCOA_COMPAT 4031 _dispatch_autorelease_pool_pop(pool); 4032 if (dispatch_end_thread_4GC) { 4033 dispatch_end_thread_4GC(); 4034 } 4035#endif // DISPATCH_COCOA_COMPAT 4036 4037 _dispatch_thread_setspecific(dispatch_queue_key, NULL); 4038} 4039 4040#pragma mark - 4041#pragma mark dispatch_worker_thread 4042 4043#if HAVE_PTHREAD_WORKQUEUES 4044static void 4045_dispatch_worker_thread4(void *context) 4046{ 4047 dispatch_queue_t dq = context; 4048 dispatch_root_queue_context_t qc = dq->do_ctxt; 4049 4050 _dispatch_introspection_thread_add(); 4051 int pending = (int)dispatch_atomic_dec2o(qc, dgq_pending, relaxed); 4052 dispatch_assert(pending >= 0); 4053 _dispatch_root_queue_drain(dq); 4054 __asm__(""); // prevent tailcall (for Instrument DTrace probe) 4055} 4056 4057#if HAVE_PTHREAD_WORKQUEUE_QOS 4058static void 4059_dispatch_worker_thread3(pthread_priority_t priority) 4060{ 4061 // Reset priority TSD to workaround <rdar://problem/17825261> 4062 _dispatch_thread_setspecific(dispatch_priority_key, 4063 (void*)(uintptr_t)(priority & ~_PTHREAD_PRIORITY_FLAGS_MASK)); 4064 unsigned int overcommit, qosbit, idx; 4065 overcommit = (priority & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) ? 1 : 0; 4066 qosbit = (priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK) >> 4067 _PTHREAD_PRIORITY_QOS_CLASS_SHIFT; 4068 if (!_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS]. 4069 dq_priority) { 4070 // If kernel doesn't support maintenance, bottom bit is background. 4071 // Shift to our idea of where background bit is. 4072 qosbit <<= 1; 4073 } 4074 idx = (unsigned int)__builtin_ffs((int)qosbit); 4075 dispatch_assert(idx > 0 && idx < DISPATCH_QUEUE_QOS_COUNT+1); 4076 dispatch_queue_t dq = &_dispatch_root_queues[((idx-1) << 1) | overcommit]; 4077 return _dispatch_worker_thread4(dq); 4078} 4079#endif // HAVE_PTHREAD_WORKQUEUE_QOS 4080 4081#if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP 4082// 6618342 Contact the team that owns the Instrument DTrace probe before 4083// renaming this symbol 4084static void 4085_dispatch_worker_thread2(int priority, int options, 4086 void *context DISPATCH_UNUSED) 4087{ 4088 dispatch_assert(priority >= 0 && priority < WORKQ_NUM_PRIOQUEUE); 4089 dispatch_assert(!(options & ~WORKQ_ADDTHREADS_OPTION_OVERCOMMIT)); 4090 dispatch_queue_t dq = _dispatch_wq2root_queues[priority][options]; 4091 4092 return _dispatch_worker_thread4(dq); 4093} 4094#endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP 4095#endif // HAVE_PTHREAD_WORKQUEUES 4096 4097#if DISPATCH_USE_PTHREAD_POOL 4098// 6618342 Contact the team that owns the Instrument DTrace probe before 4099// renaming this symbol 4100static void * 4101_dispatch_worker_thread(void *context) 4102{ 4103 dispatch_queue_t dq = context; 4104 dispatch_root_queue_context_t qc = dq->do_ctxt; 4105 dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt; 4106 4107 if (pqc->dpq_thread_configure) { 4108 pqc->dpq_thread_configure(); 4109 } 4110 4111 sigset_t mask; 4112 int r; 4113 // workaround tweaks the kernel workqueue does for us 4114 r = sigfillset(&mask); 4115 (void)dispatch_assume_zero(r); 4116 r = _dispatch_pthread_sigmask(SIG_BLOCK, &mask, NULL); 4117 (void)dispatch_assume_zero(r); 4118 _dispatch_introspection_thread_add(); 4119 4120 const int64_t timeout = 5ull * NSEC_PER_SEC; 4121 do { 4122 _dispatch_root_queue_drain(dq); 4123 } while (dispatch_semaphore_wait(&pqc->dpq_thread_mediator, 4124 dispatch_time(0, timeout)) == 0); 4125 4126 (void)dispatch_atomic_inc2o(qc, dgq_thread_pool_size, release); 4127 _dispatch_queue_wakeup_global(dq); 4128 _dispatch_release(dq); 4129 4130 return NULL; 4131} 4132 4133int 4134_dispatch_pthread_sigmask(int how, sigset_t *set, sigset_t *oset) 4135{ 4136 int r; 4137 4138 /* Workaround: 6269619 Not all signals can be delivered on any thread */ 4139 4140 r = sigdelset(set, SIGILL); 4141 (void)dispatch_assume_zero(r); 4142 r = sigdelset(set, SIGTRAP); 4143 (void)dispatch_assume_zero(r); 4144#if HAVE_DECL_SIGEMT 4145 r = sigdelset(set, SIGEMT); 4146 (void)dispatch_assume_zero(r); 4147#endif 4148 r = sigdelset(set, SIGFPE); 4149 (void)dispatch_assume_zero(r); 4150 r = sigdelset(set, SIGBUS); 4151 (void)dispatch_assume_zero(r); 4152 r = sigdelset(set, SIGSEGV); 4153 (void)dispatch_assume_zero(r); 4154 r = sigdelset(set, SIGSYS); 4155 (void)dispatch_assume_zero(r); 4156 r = sigdelset(set, SIGPIPE); 4157 (void)dispatch_assume_zero(r); 4158 4159 return pthread_sigmask(how, set, oset); 4160} 4161#endif // DISPATCH_USE_PTHREAD_POOL 4162 4163#pragma mark - 4164#pragma mark dispatch_runloop_queue 4165 4166static bool _dispatch_program_is_probably_callback_driven; 4167 4168#if DISPATCH_COCOA_COMPAT 4169 4170dispatch_queue_t 4171_dispatch_runloop_root_queue_create_4CF(const char *label, unsigned long flags) 4172{ 4173 dispatch_queue_t dq; 4174 size_t dqs; 4175 4176 if (slowpath(flags)) { 4177 return NULL; 4178 } 4179 dqs = sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD; 4180 dq = _dispatch_alloc(DISPATCH_VTABLE(queue_runloop), dqs); 4181 _dispatch_queue_init(dq); 4182 dq->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,true); 4183 dq->dq_label = label ? label : "runloop-queue"; // no-copy contract 4184 dq->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK; 4185 dq->dq_running = 1; 4186 dq->dq_is_thread_bound = 1; 4187 _dispatch_runloop_queue_port_init(dq); 4188 _dispatch_queue_set_bound_thread(dq); 4189 _dispatch_object_debug(dq, "%s", __func__); 4190 return _dispatch_introspection_queue_create(dq); 4191} 4192 4193void 4194_dispatch_runloop_queue_xref_dispose(dispatch_queue_t dq) 4195{ 4196 _dispatch_object_debug(dq, "%s", __func__); 4197 (void)dispatch_atomic_dec2o(dq, dq_running, relaxed); 4198 unsigned int suspend_cnt = dispatch_atomic_sub2o(dq, do_suspend_cnt, 4199 DISPATCH_OBJECT_SUSPEND_LOCK, release); 4200 _dispatch_queue_clear_bound_thread(dq); 4201 if (suspend_cnt == 0) { 4202 _dispatch_queue_wakeup(dq); 4203 } 4204} 4205 4206void 4207_dispatch_runloop_queue_dispose(dispatch_queue_t dq) 4208{ 4209 _dispatch_object_debug(dq, "%s", __func__); 4210 _dispatch_introspection_queue_dispose(dq); 4211 _dispatch_runloop_queue_port_dispose(dq); 4212 _dispatch_queue_destroy(dq); 4213} 4214 4215bool 4216_dispatch_runloop_root_queue_perform_4CF(dispatch_queue_t dq) 4217{ 4218 if (slowpath(dq->do_vtable != DISPATCH_VTABLE(queue_runloop))) { 4219 DISPATCH_CLIENT_CRASH("Not a runloop queue"); 4220 } 4221 dispatch_retain(dq); 4222 bool r = _dispatch_runloop_queue_drain_one(dq); 4223 dispatch_release(dq); 4224 return r; 4225} 4226 4227void 4228_dispatch_runloop_root_queue_wakeup_4CF(dispatch_queue_t dq) 4229{ 4230 if (slowpath(dq->do_vtable != DISPATCH_VTABLE(queue_runloop))) { 4231 DISPATCH_CLIENT_CRASH("Not a runloop queue"); 4232 } 4233 _dispatch_runloop_queue_probe(dq); 4234} 4235 4236mach_port_t 4237_dispatch_runloop_root_queue_get_port_4CF(dispatch_queue_t dq) 4238{ 4239 if (slowpath(dq->do_vtable != DISPATCH_VTABLE(queue_runloop))) { 4240 DISPATCH_CLIENT_CRASH("Not a runloop queue"); 4241 } 4242 return (mach_port_t)dq->do_ctxt; 4243} 4244 4245static void 4246_dispatch_runloop_queue_port_init(void *ctxt) 4247{ 4248 dispatch_queue_t dq = (dispatch_queue_t)ctxt; 4249 mach_port_t mp; 4250 kern_return_t kr; 4251 4252 _dispatch_safe_fork = false; 4253 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE, &mp); 4254 DISPATCH_VERIFY_MIG(kr); 4255 (void)dispatch_assume_zero(kr); 4256 kr = mach_port_insert_right(mach_task_self(), mp, mp, 4257 MACH_MSG_TYPE_MAKE_SEND); 4258 DISPATCH_VERIFY_MIG(kr); 4259 (void)dispatch_assume_zero(kr); 4260 if (dq != &_dispatch_main_q) { 4261 struct mach_port_limits limits = { 4262 .mpl_qlimit = 1, 4263 }; 4264 kr = mach_port_set_attributes(mach_task_self(), mp, 4265 MACH_PORT_LIMITS_INFO, (mach_port_info_t)&limits, 4266 sizeof(limits)); 4267 DISPATCH_VERIFY_MIG(kr); 4268 (void)dispatch_assume_zero(kr); 4269 } 4270 dq->do_ctxt = (void*)(uintptr_t)mp; 4271 4272 _dispatch_program_is_probably_callback_driven = true; 4273} 4274 4275static void 4276_dispatch_runloop_queue_port_dispose(dispatch_queue_t dq) 4277{ 4278 mach_port_t mp = (mach_port_t)dq->do_ctxt; 4279 if (!mp) { 4280 return; 4281 } 4282 dq->do_ctxt = NULL; 4283 kern_return_t kr = mach_port_deallocate(mach_task_self(), mp); 4284 DISPATCH_VERIFY_MIG(kr); 4285 (void)dispatch_assume_zero(kr); 4286 kr = mach_port_mod_refs(mach_task_self(), mp, MACH_PORT_RIGHT_RECEIVE, -1); 4287 DISPATCH_VERIFY_MIG(kr); 4288 (void)dispatch_assume_zero(kr); 4289} 4290 4291#pragma mark - 4292#pragma mark dispatch_main_queue 4293 4294mach_port_t 4295_dispatch_get_main_queue_port_4CF(void) 4296{ 4297 dispatch_queue_t dq = &_dispatch_main_q; 4298 dispatch_once_f(&_dispatch_main_q_port_pred, dq, 4299 _dispatch_runloop_queue_port_init); 4300 return (mach_port_t)dq->do_ctxt; 4301} 4302 4303static bool main_q_is_draining; 4304 4305// 6618342 Contact the team that owns the Instrument DTrace probe before 4306// renaming this symbol 4307DISPATCH_NOINLINE 4308static void 4309_dispatch_queue_set_mainq_drain_state(bool arg) 4310{ 4311 main_q_is_draining = arg; 4312} 4313 4314void 4315_dispatch_main_queue_callback_4CF(mach_msg_header_t *msg DISPATCH_UNUSED) 4316{ 4317 if (main_q_is_draining) { 4318 return; 4319 } 4320 _dispatch_queue_set_mainq_drain_state(true); 4321 _dispatch_main_queue_drain(); 4322 _dispatch_queue_set_mainq_drain_state(false); 4323} 4324 4325#endif 4326 4327void 4328dispatch_main(void) 4329{ 4330#if HAVE_PTHREAD_MAIN_NP 4331 if (pthread_main_np()) { 4332#endif 4333 _dispatch_object_debug(&_dispatch_main_q, "%s", __func__); 4334 _dispatch_program_is_probably_callback_driven = true; 4335 pthread_exit(NULL); 4336 DISPATCH_CRASH("pthread_exit() returned"); 4337#if HAVE_PTHREAD_MAIN_NP 4338 } 4339 DISPATCH_CLIENT_CRASH("dispatch_main() must be called on the main thread"); 4340#endif 4341} 4342 4343DISPATCH_NOINLINE DISPATCH_NORETURN 4344static void 4345_dispatch_sigsuspend(void) 4346{ 4347 static const sigset_t mask; 4348 4349 for (;;) { 4350 sigsuspend(&mask); 4351 } 4352} 4353 4354DISPATCH_NORETURN 4355static void 4356_dispatch_sig_thread(void *ctxt DISPATCH_UNUSED) 4357{ 4358 // never returns, so burn bridges behind us 4359 _dispatch_clear_stack(0); 4360 _dispatch_sigsuspend(); 4361} 4362 4363DISPATCH_NOINLINE 4364static void 4365_dispatch_queue_cleanup2(void) 4366{ 4367 dispatch_queue_t dq = &_dispatch_main_q; 4368 (void)dispatch_atomic_dec2o(dq, dq_running, relaxed); 4369 unsigned int suspend_cnt = dispatch_atomic_sub2o(dq, do_suspend_cnt, 4370 DISPATCH_OBJECT_SUSPEND_LOCK, release); 4371 dq->dq_is_thread_bound = 0; 4372 if (suspend_cnt == 0) { 4373 _dispatch_queue_wakeup(dq); 4374 } 4375 4376 // overload the "probably" variable to mean that dispatch_main() or 4377 // similar non-POSIX API was called 4378 // this has to run before the DISPATCH_COCOA_COMPAT below 4379 if (_dispatch_program_is_probably_callback_driven) { 4380 _dispatch_barrier_async_detached_f(_dispatch_get_root_queue( 4381 _DISPATCH_QOS_CLASS_DEFAULT, true), NULL, _dispatch_sig_thread); 4382 sleep(1); // workaround 6778970 4383 } 4384 4385#if DISPATCH_COCOA_COMPAT 4386 dispatch_once_f(&_dispatch_main_q_port_pred, dq, 4387 _dispatch_runloop_queue_port_init); 4388 _dispatch_runloop_queue_port_dispose(dq); 4389#endif 4390} 4391 4392static void 4393_dispatch_queue_cleanup(void *ctxt) 4394{ 4395 if (ctxt == &_dispatch_main_q) { 4396 return _dispatch_queue_cleanup2(); 4397 } 4398 // POSIX defines that destructors are only called if 'ctxt' is non-null 4399 DISPATCH_CRASH("Premature thread exit while a dispatch queue is running"); 4400} 4401