1/* 2 * Copyright (c) 2008-2013 Apple Inc. All rights reserved. 3 * 4 * @APPLE_APACHE_LICENSE_HEADER_START@ 5 * 6 * Licensed under the Apache License, Version 2.0 (the "License"); 7 * you may not use this file except in compliance with the License. 8 * You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 * 18 * @APPLE_APACHE_LICENSE_HEADER_END@ 19 */ 20 21#include "internal.h" 22#if HAVE_MACH 23#include "protocol.h" 24#include "protocolServer.h" 25#endif 26#include <sys/mount.h> 27 28static void _dispatch_source_merge_kevent(dispatch_source_t ds, 29 const struct kevent64_s *ke); 30static bool _dispatch_kevent_register(dispatch_kevent_t *dkp, uint32_t *flgp); 31static void _dispatch_kevent_unregister(dispatch_kevent_t dk, uint32_t flg); 32static bool _dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags, 33 uint32_t del_flags); 34static void _dispatch_kevent_drain(struct kevent64_s *ke); 35static void _dispatch_kevent_merge(struct kevent64_s *ke); 36static void _dispatch_timers_kevent(struct kevent64_s *ke); 37static void _dispatch_timers_unregister(dispatch_source_t ds, 38 dispatch_kevent_t dk); 39static void _dispatch_timers_update(dispatch_source_t ds); 40static void _dispatch_timer_aggregates_check(void); 41static void _dispatch_timer_aggregates_register(dispatch_source_t ds); 42static void _dispatch_timer_aggregates_update(dispatch_source_t ds, 43 unsigned int tidx); 44static void _dispatch_timer_aggregates_unregister(dispatch_source_t ds, 45 unsigned int tidx); 46static inline unsigned long _dispatch_source_timer_data( 47 dispatch_source_refs_t dr, unsigned long prev); 48static long _dispatch_kq_update(const struct kevent64_s *); 49static void _dispatch_memorystatus_init(void); 50#if HAVE_MACH 51static void _dispatch_mach_host_calendar_change_register(void); 52static void _dispatch_mach_recv_msg_buf_init(void); 53static kern_return_t _dispatch_kevent_machport_resume(dispatch_kevent_t dk, 54 uint32_t new_flags, uint32_t del_flags); 55static kern_return_t _dispatch_kevent_mach_notify_resume(dispatch_kevent_t dk, 56 uint32_t new_flags, uint32_t del_flags); 57static inline void _dispatch_kevent_mach_portset(struct kevent64_s *ke); 58#else 59static inline void _dispatch_mach_host_calendar_change_register(void) {} 60static inline void _dispatch_mach_recv_msg_buf_init(void) {} 61#endif 62static const char * _evfiltstr(short filt); 63#if DISPATCH_DEBUG 64static void _dispatch_kevent_debug(struct kevent64_s* kev, const char* str); 65static void _dispatch_kevent_debugger(void *context); 66#define DISPATCH_ASSERT_ON_MANAGER_QUEUE() \ 67 dispatch_assert(_dispatch_queue_get_current() == &_dispatch_mgr_q) 68#else 69static inline void 70_dispatch_kevent_debug(struct kevent64_s* kev DISPATCH_UNUSED, 71 const char* str DISPATCH_UNUSED) {} 72#define DISPATCH_ASSERT_ON_MANAGER_QUEUE() 73#endif 74 75#pragma mark - 76#pragma mark dispatch_source_t 77 78dispatch_source_t 79dispatch_source_create(dispatch_source_type_t type, 80 uintptr_t handle, 81 unsigned long mask, 82 dispatch_queue_t q) 83{ 84 const struct kevent64_s *proto_kev = &type->ke; 85 dispatch_source_t ds; 86 dispatch_kevent_t dk; 87 88 // input validation 89 if (type == NULL || (mask & ~type->mask)) { 90 return NULL; 91 } 92 93 switch (type->ke.filter) { 94 case EVFILT_SIGNAL: 95 if (handle >= NSIG) { 96 return NULL; 97 } 98 break; 99 case EVFILT_FS: 100#if DISPATCH_USE_VM_PRESSURE 101 case EVFILT_VM: 102#endif 103#if DISPATCH_USE_MEMORYSTATUS 104 case EVFILT_MEMORYSTATUS: 105#endif 106 case DISPATCH_EVFILT_CUSTOM_ADD: 107 case DISPATCH_EVFILT_CUSTOM_OR: 108 if (handle) { 109 return NULL; 110 } 111 break; 112 case DISPATCH_EVFILT_TIMER: 113 if (!!handle ^ !!type->ke.ident) { 114 return NULL; 115 } 116 break; 117 default: 118 break; 119 } 120 121 ds = _dispatch_alloc(DISPATCH_VTABLE(source), 122 sizeof(struct dispatch_source_s)); 123 // Initialize as a queue first, then override some settings below. 124 _dispatch_queue_init((dispatch_queue_t)ds); 125 ds->dq_label = "source"; 126 127 ds->do_ref_cnt++; // the reference the manager queue holds 128 ds->do_ref_cnt++; // since source is created suspended 129 ds->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_INTERVAL; 130 // The initial target queue is the manager queue, in order to get 131 // the source installed. <rdar://problem/8928171> 132 ds->do_targetq = &_dispatch_mgr_q; 133 134 dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s)); 135 dk->dk_kevent = *proto_kev; 136 dk->dk_kevent.ident = handle; 137 dk->dk_kevent.flags |= EV_ADD|EV_ENABLE; 138 dk->dk_kevent.fflags |= (uint32_t)mask; 139 dk->dk_kevent.udata = (uintptr_t)dk; 140 TAILQ_INIT(&dk->dk_sources); 141 142 ds->ds_dkev = dk; 143 ds->ds_pending_data_mask = dk->dk_kevent.fflags; 144 ds->ds_ident_hack = (uintptr_t)dk->dk_kevent.ident; 145 if ((EV_DISPATCH|EV_ONESHOT) & proto_kev->flags) { 146 ds->ds_is_level = true; 147 ds->ds_needs_rearm = true; 148 } else if (!(EV_CLEAR & proto_kev->flags)) { 149 // we cheat and use EV_CLEAR to mean a "flag thingy" 150 ds->ds_is_adder = true; 151 } 152 // Some sources require special processing 153 if (type->init != NULL) { 154 type->init(ds, type, handle, mask, q); 155 } 156 dispatch_assert(!(ds->ds_is_level && ds->ds_is_adder)); 157 158 if (fastpath(!ds->ds_refs)) { 159 ds->ds_refs = _dispatch_calloc(1ul, 160 sizeof(struct dispatch_source_refs_s)); 161 } 162 ds->ds_refs->dr_source_wref = _dispatch_ptr2wref(ds); 163 164 // First item on the queue sets the user-specified target queue 165 dispatch_set_target_queue(ds, q); 166 _dispatch_object_debug(ds, "%s", __func__); 167 return ds; 168} 169 170void 171_dispatch_source_dispose(dispatch_source_t ds) 172{ 173 _dispatch_object_debug(ds, "%s", __func__); 174 free(ds->ds_refs); 175 _dispatch_queue_destroy(ds); 176} 177 178void 179_dispatch_source_xref_dispose(dispatch_source_t ds) 180{ 181 _dispatch_wakeup(ds); 182} 183 184void 185dispatch_source_cancel(dispatch_source_t ds) 186{ 187 _dispatch_object_debug(ds, "%s", __func__); 188 // Right after we set the cancel flag, someone else 189 // could potentially invoke the source, do the cancelation, 190 // unregister the source, and deallocate it. We would 191 // need to therefore retain/release before setting the bit 192 193 _dispatch_retain(ds); 194 (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_CANCELED, relaxed); 195 _dispatch_wakeup(ds); 196 _dispatch_release(ds); 197} 198 199long 200dispatch_source_testcancel(dispatch_source_t ds) 201{ 202 return (bool)(ds->ds_atomic_flags & DSF_CANCELED); 203} 204 205unsigned long 206dispatch_source_get_mask(dispatch_source_t ds) 207{ 208 unsigned long mask = ds->ds_pending_data_mask; 209 if (ds->ds_vmpressure_override) { 210 mask = NOTE_VM_PRESSURE; 211 } 212#if TARGET_IPHONE_SIMULATOR 213 else if (ds->ds_memorystatus_override) { 214 mask = NOTE_MEMORYSTATUS_PRESSURE_WARN; 215 } 216#endif 217 return mask; 218} 219 220uintptr_t 221dispatch_source_get_handle(dispatch_source_t ds) 222{ 223 unsigned int handle = (unsigned int)ds->ds_ident_hack; 224#if TARGET_IPHONE_SIMULATOR 225 if (ds->ds_memorystatus_override) { 226 handle = 0; 227 } 228#endif 229 return handle; 230} 231 232unsigned long 233dispatch_source_get_data(dispatch_source_t ds) 234{ 235 unsigned long data = ds->ds_data; 236 if (ds->ds_vmpressure_override) { 237 data = NOTE_VM_PRESSURE; 238 } 239#if TARGET_IPHONE_SIMULATOR 240 else if (ds->ds_memorystatus_override) { 241 data = NOTE_MEMORYSTATUS_PRESSURE_WARN; 242 } 243#endif 244 return data; 245} 246 247void 248dispatch_source_merge_data(dispatch_source_t ds, unsigned long val) 249{ 250 struct kevent64_s kev = { 251 .fflags = (typeof(kev.fflags))val, 252 .data = (typeof(kev.data))val, 253 }; 254 255 dispatch_assert( 256 ds->ds_dkev->dk_kevent.filter == DISPATCH_EVFILT_CUSTOM_ADD || 257 ds->ds_dkev->dk_kevent.filter == DISPATCH_EVFILT_CUSTOM_OR); 258 259 _dispatch_source_merge_kevent(ds, &kev); 260} 261 262#pragma mark - 263#pragma mark dispatch_source_handler 264 265DISPATCH_ALWAYS_INLINE 266static inline dispatch_continuation_t 267_dispatch_source_handler_alloc(dispatch_source_t ds, void *handler, long kind, 268 bool block) 269{ 270 dispatch_continuation_t dc = _dispatch_continuation_alloc(); 271 if (handler) { 272 dc->do_vtable = (void *)((block ? DISPATCH_OBJ_BLOCK_RELEASE_BIT : 273 DISPATCH_OBJ_CTXT_FETCH_BIT) | (kind != DS_EVENT_HANDLER ? 274 DISPATCH_OBJ_ASYNC_BIT : 0l)); 275 dc->dc_priority = 0; 276 dc->dc_voucher = NULL; 277 if (block) { 278#ifdef __BLOCKS__ 279 if (slowpath(_dispatch_block_has_private_data(handler))) { 280 // sources don't propagate priority by default 281 dispatch_block_flags_t flags = DISPATCH_BLOCK_NO_QOS_CLASS; 282 flags |= _dispatch_block_get_flags(handler); 283 _dispatch_continuation_priority_set(dc, 284 _dispatch_block_get_priority(handler), flags); 285 } 286 if (kind != DS_EVENT_HANDLER) { 287 dc->dc_func = _dispatch_call_block_and_release; 288 } else { 289 dc->dc_func = _dispatch_Block_invoke(handler); 290 } 291 dc->dc_ctxt = _dispatch_Block_copy(handler); 292#endif /* __BLOCKS__ */ 293 } else { 294 dc->dc_func = handler; 295 dc->dc_ctxt = ds->do_ctxt; 296 } 297 _dispatch_trace_continuation_push((dispatch_queue_t)ds, dc); 298 } else { 299 dc->dc_func = NULL; 300 } 301 dc->dc_data = (void*)kind; 302 return dc; 303} 304 305static inline void 306_dispatch_source_handler_replace(dispatch_source_refs_t dr, long kind, 307 dispatch_continuation_t dc_new) 308{ 309 dispatch_continuation_t dc = dr->ds_handler[kind]; 310 if (dc) { 311#ifdef __BLOCKS__ 312 if ((long)dc->do_vtable & DISPATCH_OBJ_BLOCK_RELEASE_BIT) { 313 Block_release(dc->dc_ctxt); 314 } 315#endif /* __BLOCKS__ */ 316 if (dc->dc_voucher) { 317 _voucher_release(dc->dc_voucher); 318 dc->dc_voucher = NULL; 319 } 320 _dispatch_continuation_free(dc); 321 } 322 dr->ds_handler[kind] = dc_new; 323} 324 325static inline void 326_dispatch_source_handler_free(dispatch_source_refs_t dr, long kind) 327{ 328 _dispatch_source_handler_replace(dr, kind, NULL); 329} 330 331static void 332_dispatch_source_set_handler(void *context) 333{ 334 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current(); 335 dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE); 336 dispatch_continuation_t dc = context; 337 long kind = (long)dc->dc_data; 338 dc->dc_data = 0; 339 if (!dc->dc_func) { 340 _dispatch_continuation_free(dc); 341 dc = NULL; 342 } else if ((long)dc->do_vtable & DISPATCH_OBJ_CTXT_FETCH_BIT) { 343 dc->dc_ctxt = ds->do_ctxt; 344 } 345 _dispatch_source_handler_replace(ds->ds_refs, kind, dc); 346 if (kind == DS_EVENT_HANDLER && dc && dc->dc_priority) { 347#if HAVE_PTHREAD_WORKQUEUE_QOS 348 ds->dq_priority = dc->dc_priority & ~_PTHREAD_PRIORITY_FLAGS_MASK; 349 _dispatch_queue_set_override_priority((dispatch_queue_t)ds); 350#endif 351 } 352} 353 354#ifdef __BLOCKS__ 355void 356dispatch_source_set_event_handler(dispatch_source_t ds, 357 dispatch_block_t handler) 358{ 359 dispatch_continuation_t dc; 360 dc = _dispatch_source_handler_alloc(ds, handler, DS_EVENT_HANDLER, true); 361 _dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc, 362 _dispatch_source_set_handler); 363} 364#endif /* __BLOCKS__ */ 365 366void 367dispatch_source_set_event_handler_f(dispatch_source_t ds, 368 dispatch_function_t handler) 369{ 370 dispatch_continuation_t dc; 371 dc = _dispatch_source_handler_alloc(ds, handler, DS_EVENT_HANDLER, false); 372 _dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc, 373 _dispatch_source_set_handler); 374} 375 376void 377_dispatch_source_set_event_handler_with_context_f(dispatch_source_t ds, 378 void *ctxt, dispatch_function_t handler) 379{ 380 dispatch_continuation_t dc; 381 dc = _dispatch_source_handler_alloc(ds, handler, DS_EVENT_HANDLER, false); 382 dc->do_vtable = (void *)((long)dc->do_vtable &~DISPATCH_OBJ_CTXT_FETCH_BIT); 383 dc->dc_other = dc->dc_ctxt; 384 dc->dc_ctxt = ctxt; 385 _dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc, 386 _dispatch_source_set_handler); 387} 388 389#ifdef __BLOCKS__ 390void 391dispatch_source_set_cancel_handler(dispatch_source_t ds, 392 dispatch_block_t handler) 393{ 394 dispatch_continuation_t dc; 395 dc = _dispatch_source_handler_alloc(ds, handler, DS_CANCEL_HANDLER, true); 396 _dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc, 397 _dispatch_source_set_handler); 398} 399#endif /* __BLOCKS__ */ 400 401void 402dispatch_source_set_cancel_handler_f(dispatch_source_t ds, 403 dispatch_function_t handler) 404{ 405 dispatch_continuation_t dc; 406 dc = _dispatch_source_handler_alloc(ds, handler, DS_CANCEL_HANDLER, false); 407 _dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc, 408 _dispatch_source_set_handler); 409} 410 411#ifdef __BLOCKS__ 412void 413dispatch_source_set_registration_handler(dispatch_source_t ds, 414 dispatch_block_t handler) 415{ 416 dispatch_continuation_t dc; 417 dc = _dispatch_source_handler_alloc(ds, handler, DS_REGISTN_HANDLER, true); 418 _dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc, 419 _dispatch_source_set_handler); 420} 421#endif /* __BLOCKS__ */ 422 423void 424dispatch_source_set_registration_handler_f(dispatch_source_t ds, 425 dispatch_function_t handler) 426{ 427 dispatch_continuation_t dc; 428 dc = _dispatch_source_handler_alloc(ds, handler, DS_REGISTN_HANDLER, false); 429 _dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc, 430 _dispatch_source_set_handler); 431} 432 433#pragma mark - 434#pragma mark dispatch_source_invoke 435 436static void 437_dispatch_source_registration_callout(dispatch_source_t ds) 438{ 439 dispatch_source_refs_t dr = ds->ds_refs; 440 dispatch_continuation_t dc = dr->ds_handler[DS_REGISTN_HANDLER]; 441 if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)) { 442 // no registration callout if source is canceled rdar://problem/8955246 443 return _dispatch_source_handler_free(dr, DS_REGISTN_HANDLER); 444 } 445 pthread_priority_t old_dp = _dispatch_set_defaultpriority(ds->dq_priority); 446 if ((long)dc->do_vtable & DISPATCH_OBJ_CTXT_FETCH_BIT) { 447 dc->dc_ctxt = ds->do_ctxt; 448 } 449 _dispatch_continuation_pop(dc); 450 dr->ds_handler[DS_REGISTN_HANDLER] = NULL; 451 _dispatch_reset_defaultpriority(old_dp); 452} 453 454static void 455_dispatch_source_cancel_callout(dispatch_source_t ds) 456{ 457 dispatch_source_refs_t dr = ds->ds_refs; 458 dispatch_continuation_t dc = dr->ds_handler[DS_CANCEL_HANDLER]; 459 ds->ds_pending_data_mask = 0; 460 ds->ds_pending_data = 0; 461 ds->ds_data = 0; 462 _dispatch_source_handler_free(dr, DS_EVENT_HANDLER); 463 _dispatch_source_handler_free(dr, DS_REGISTN_HANDLER); 464 if (!dc) { 465 return; 466 } 467 if (!(ds->ds_atomic_flags & DSF_CANCELED)) { 468 return _dispatch_source_handler_free(dr, DS_CANCEL_HANDLER); 469 } 470 pthread_priority_t old_dp = _dispatch_set_defaultpriority(ds->dq_priority); 471 if ((long)dc->do_vtable & DISPATCH_OBJ_CTXT_FETCH_BIT) { 472 dc->dc_ctxt = ds->do_ctxt; 473 } 474 _dispatch_continuation_pop(dc); 475 dr->ds_handler[DS_CANCEL_HANDLER] = NULL; 476 _dispatch_reset_defaultpriority(old_dp); 477} 478 479static void 480_dispatch_source_latch_and_call(dispatch_source_t ds) 481{ 482 unsigned long prev; 483 484 if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)) { 485 return; 486 } 487 dispatch_source_refs_t dr = ds->ds_refs; 488 dispatch_continuation_t dc = dr->ds_handler[DS_EVENT_HANDLER]; 489 prev = dispatch_atomic_xchg2o(ds, ds_pending_data, 0, relaxed); 490 if (ds->ds_is_level) { 491 ds->ds_data = ~prev; 492 } else if (ds->ds_is_timer && ds_timer(dr).target && prev) { 493 ds->ds_data = _dispatch_source_timer_data(dr, prev); 494 } else { 495 ds->ds_data = prev; 496 } 497 if (!dispatch_assume(prev) || !dc) { 498 return; 499 } 500 pthread_priority_t old_dp = _dispatch_set_defaultpriority(ds->dq_priority); 501 _dispatch_trace_continuation_pop(_dispatch_queue_get_current(), dc); 502 voucher_t voucher = dc->dc_voucher ? _voucher_retain(dc->dc_voucher) : NULL; 503 _dispatch_continuation_voucher_adopt(dc); // consumes voucher reference 504 _dispatch_client_callout(dc->dc_ctxt, dc->dc_func); 505 _dispatch_introspection_queue_item_complete(dc); 506 if (voucher) dc->dc_voucher = voucher; 507 _dispatch_reset_defaultpriority(old_dp); 508} 509 510static void 511_dispatch_source_kevent_unregister(dispatch_source_t ds) 512{ 513 _dispatch_object_debug(ds, "%s", __func__); 514 dispatch_kevent_t dk = ds->ds_dkev; 515 ds->ds_dkev = NULL; 516 switch (dk->dk_kevent.filter) { 517 case DISPATCH_EVFILT_TIMER: 518 _dispatch_timers_unregister(ds, dk); 519 break; 520 default: 521 TAILQ_REMOVE(&dk->dk_sources, ds->ds_refs, dr_list); 522 _dispatch_kevent_unregister(dk, (uint32_t)ds->ds_pending_data_mask); 523 break; 524 } 525 526 (void)dispatch_atomic_and2o(ds, ds_atomic_flags, ~DSF_ARMED, relaxed); 527 ds->ds_needs_rearm = false; // re-arm is pointless and bad now 528 _dispatch_release(ds); // the retain is done at creation time 529} 530 531static void 532_dispatch_source_kevent_resume(dispatch_source_t ds, uint32_t new_flags) 533{ 534 switch (ds->ds_dkev->dk_kevent.filter) { 535 case DISPATCH_EVFILT_TIMER: 536 return _dispatch_timers_update(ds); 537 case EVFILT_MACHPORT: 538 if (ds->ds_pending_data_mask & DISPATCH_MACH_RECV_MESSAGE) { 539 new_flags |= DISPATCH_MACH_RECV_MESSAGE; // emulate EV_DISPATCH 540 } 541 break; 542 } 543 if (_dispatch_kevent_resume(ds->ds_dkev, new_flags, 0)) { 544 _dispatch_source_kevent_unregister(ds); 545 } 546} 547 548static void 549_dispatch_source_kevent_register(dispatch_source_t ds) 550{ 551 dispatch_assert_zero(ds->ds_is_installed); 552 switch (ds->ds_dkev->dk_kevent.filter) { 553 case DISPATCH_EVFILT_TIMER: 554 return _dispatch_timers_update(ds); 555 } 556 uint32_t flags; 557 bool do_resume = _dispatch_kevent_register(&ds->ds_dkev, &flags); 558 TAILQ_INSERT_TAIL(&ds->ds_dkev->dk_sources, ds->ds_refs, dr_list); 559 if (do_resume || ds->ds_needs_rearm) { 560 _dispatch_source_kevent_resume(ds, flags); 561 } 562 (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, relaxed); 563 _dispatch_object_debug(ds, "%s", __func__); 564} 565 566DISPATCH_ALWAYS_INLINE 567static inline dispatch_queue_t 568_dispatch_source_invoke2(dispatch_object_t dou, 569 _dispatch_thread_semaphore_t *sema_ptr DISPATCH_UNUSED) 570{ 571 dispatch_source_t ds = dou._ds; 572 if (slowpath(_dispatch_queue_drain(ds))) { 573 DISPATCH_CLIENT_CRASH("Sync onto source"); 574 } 575 576 // This function performs all source actions. Each action is responsible 577 // for verifying that it takes place on the appropriate queue. If the 578 // current queue is not the correct queue for this action, the correct queue 579 // will be returned and the invoke will be re-driven on that queue. 580 581 // The order of tests here in invoke and in probe should be consistent. 582 583 dispatch_queue_t dq = _dispatch_queue_get_current(); 584 dispatch_source_refs_t dr = ds->ds_refs; 585 586 if (!ds->ds_is_installed) { 587 // The source needs to be installed on the manager queue. 588 if (dq != &_dispatch_mgr_q) { 589 return &_dispatch_mgr_q; 590 } 591 _dispatch_source_kevent_register(ds); 592 ds->ds_is_installed = true; 593 if (dr->ds_handler[DS_REGISTN_HANDLER]) { 594 return ds->do_targetq; 595 } 596 if (slowpath(ds->do_xref_cnt == -1)) { 597 return &_dispatch_mgr_q; // rdar://problem/9558246 598 } 599 } else if (slowpath(DISPATCH_OBJECT_SUSPENDED(ds))) { 600 // Source suspended by an item drained from the source queue. 601 return NULL; 602 } else if (dr->ds_handler[DS_REGISTN_HANDLER]) { 603 // The source has been registered and the registration handler needs 604 // to be delivered on the target queue. 605 if (dq != ds->do_targetq) { 606 return ds->do_targetq; 607 } 608 // clears ds_registration_handler 609 _dispatch_source_registration_callout(ds); 610 if (slowpath(ds->do_xref_cnt == -1)) { 611 return &_dispatch_mgr_q; // rdar://problem/9558246 612 } 613 } else if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)){ 614 // The source has been cancelled and needs to be uninstalled from the 615 // manager queue. After uninstallation, the cancellation handler needs 616 // to be delivered to the target queue. 617 if (ds->ds_dkev) { 618 if (dq != &_dispatch_mgr_q) { 619 return &_dispatch_mgr_q; 620 } 621 _dispatch_source_kevent_unregister(ds); 622 } 623 if (dr->ds_handler[DS_EVENT_HANDLER] || 624 dr->ds_handler[DS_CANCEL_HANDLER] || 625 dr->ds_handler[DS_REGISTN_HANDLER]) { 626 if (dq != ds->do_targetq) { 627 return ds->do_targetq; 628 } 629 } 630 _dispatch_source_cancel_callout(ds); 631 } else if (ds->ds_pending_data) { 632 // The source has pending data to deliver via the event handler callback 633 // on the target queue. Some sources need to be rearmed on the manager 634 // queue after event delivery. 635 if (dq != ds->do_targetq) { 636 return ds->do_targetq; 637 } 638 _dispatch_source_latch_and_call(ds); 639 if (ds->ds_needs_rearm) { 640 return &_dispatch_mgr_q; 641 } 642 } else if (ds->ds_needs_rearm && !(ds->ds_atomic_flags & DSF_ARMED)) { 643 // The source needs to be rearmed on the manager queue. 644 if (dq != &_dispatch_mgr_q) { 645 return &_dispatch_mgr_q; 646 } 647 _dispatch_source_kevent_resume(ds, 0); 648 (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, relaxed); 649 } 650 651 return NULL; 652} 653 654DISPATCH_NOINLINE 655void 656_dispatch_source_invoke(dispatch_source_t ds) 657{ 658 _dispatch_queue_class_invoke(ds, _dispatch_source_invoke2); 659} 660 661unsigned long 662_dispatch_source_probe(dispatch_source_t ds) 663{ 664 // This function determines whether the source needs to be invoked. 665 // The order of tests here in probe and in invoke should be consistent. 666 667 dispatch_source_refs_t dr = ds->ds_refs; 668 if (!ds->ds_is_installed) { 669 // The source needs to be installed on the manager queue. 670 return true; 671 } else if (dr->ds_handler[DS_REGISTN_HANDLER]) { 672 // The registration handler needs to be delivered to the target queue. 673 return true; 674 } else if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)){ 675 // The source needs to be uninstalled from the manager queue, or the 676 // cancellation handler needs to be delivered to the target queue. 677 // Note: cancellation assumes installation. 678 if (ds->ds_dkev || dr->ds_handler[DS_EVENT_HANDLER] || 679 dr->ds_handler[DS_CANCEL_HANDLER] || 680 dr->ds_handler[DS_REGISTN_HANDLER]) { 681 return true; 682 } 683 } else if (ds->ds_pending_data) { 684 // The source has pending data to deliver to the target queue. 685 return true; 686 } else if (ds->ds_needs_rearm && !(ds->ds_atomic_flags & DSF_ARMED)) { 687 // The source needs to be rearmed on the manager queue. 688 return true; 689 } 690 return _dispatch_queue_class_probe(ds); 691} 692 693static void 694_dispatch_source_merge_kevent(dispatch_source_t ds, const struct kevent64_s *ke) 695{ 696 if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)) { 697 return; 698 } 699 if (ds->ds_is_level) { 700 // ke->data is signed and "negative available data" makes no sense 701 // zero bytes happens when EV_EOF is set 702 // 10A268 does not fail this assert with EVFILT_READ and a 10 GB file 703 dispatch_assert(ke->data >= 0l); 704 dispatch_atomic_store2o(ds, ds_pending_data, ~(unsigned long)ke->data, 705 relaxed); 706 } else if (ds->ds_is_adder) { 707 (void)dispatch_atomic_add2o(ds, ds_pending_data, 708 (unsigned long)ke->data, relaxed); 709 } else if (ke->fflags & ds->ds_pending_data_mask) { 710 (void)dispatch_atomic_or2o(ds, ds_pending_data, 711 ke->fflags & ds->ds_pending_data_mask, relaxed); 712 } 713 // EV_DISPATCH and EV_ONESHOT sources are no longer armed after delivery 714 if (ds->ds_needs_rearm) { 715 (void)dispatch_atomic_and2o(ds, ds_atomic_flags, ~DSF_ARMED, relaxed); 716 } 717 718 _dispatch_wakeup(ds); 719} 720 721#pragma mark - 722#pragma mark dispatch_kevent_t 723 724#if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD 725static void _dispatch_kevent_guard(dispatch_kevent_t dk); 726static void _dispatch_kevent_unguard(dispatch_kevent_t dk); 727#else 728static inline void _dispatch_kevent_guard(dispatch_kevent_t dk) { (void)dk; } 729static inline void _dispatch_kevent_unguard(dispatch_kevent_t dk) { (void)dk; } 730#endif 731 732static struct dispatch_kevent_s _dispatch_kevent_data_or = { 733 .dk_kevent = { 734 .filter = DISPATCH_EVFILT_CUSTOM_OR, 735 .flags = EV_CLEAR, 736 }, 737 .dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_or.dk_sources), 738}; 739static struct dispatch_kevent_s _dispatch_kevent_data_add = { 740 .dk_kevent = { 741 .filter = DISPATCH_EVFILT_CUSTOM_ADD, 742 }, 743 .dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_add.dk_sources), 744}; 745 746#define DSL_HASH(x) ((x) & (DSL_HASH_SIZE - 1)) 747 748DISPATCH_CACHELINE_ALIGN 749static TAILQ_HEAD(, dispatch_kevent_s) _dispatch_sources[DSL_HASH_SIZE]; 750 751static void 752_dispatch_kevent_init() 753{ 754 unsigned int i; 755 for (i = 0; i < DSL_HASH_SIZE; i++) { 756 TAILQ_INIT(&_dispatch_sources[i]); 757 } 758 759 TAILQ_INSERT_TAIL(&_dispatch_sources[0], 760 &_dispatch_kevent_data_or, dk_list); 761 TAILQ_INSERT_TAIL(&_dispatch_sources[0], 762 &_dispatch_kevent_data_add, dk_list); 763 _dispatch_kevent_data_or.dk_kevent.udata = 764 (uintptr_t)&_dispatch_kevent_data_or; 765 _dispatch_kevent_data_add.dk_kevent.udata = 766 (uintptr_t)&_dispatch_kevent_data_add; 767} 768 769static inline uintptr_t 770_dispatch_kevent_hash(uint64_t ident, short filter) 771{ 772 uint64_t value; 773#if HAVE_MACH 774 value = (filter == EVFILT_MACHPORT || 775 filter == DISPATCH_EVFILT_MACH_NOTIFICATION ? 776 MACH_PORT_INDEX(ident) : ident); 777#else 778 value = ident; 779#endif 780 return DSL_HASH((uintptr_t)value); 781} 782 783static dispatch_kevent_t 784_dispatch_kevent_find(uint64_t ident, short filter) 785{ 786 uintptr_t hash = _dispatch_kevent_hash(ident, filter); 787 dispatch_kevent_t dki; 788 789 TAILQ_FOREACH(dki, &_dispatch_sources[hash], dk_list) { 790 if (dki->dk_kevent.ident == ident && dki->dk_kevent.filter == filter) { 791 break; 792 } 793 } 794 return dki; 795} 796 797static void 798_dispatch_kevent_insert(dispatch_kevent_t dk) 799{ 800 _dispatch_kevent_guard(dk); 801 uintptr_t hash = _dispatch_kevent_hash(dk->dk_kevent.ident, 802 dk->dk_kevent.filter); 803 TAILQ_INSERT_TAIL(&_dispatch_sources[hash], dk, dk_list); 804} 805 806// Find existing kevents, and merge any new flags if necessary 807static bool 808_dispatch_kevent_register(dispatch_kevent_t *dkp, uint32_t *flgp) 809{ 810 dispatch_kevent_t dk, ds_dkev = *dkp; 811 uint32_t new_flags; 812 bool do_resume = false; 813 814 dk = _dispatch_kevent_find(ds_dkev->dk_kevent.ident, 815 ds_dkev->dk_kevent.filter); 816 if (dk) { 817 // If an existing dispatch kevent is found, check to see if new flags 818 // need to be added to the existing kevent 819 new_flags = ~dk->dk_kevent.fflags & ds_dkev->dk_kevent.fflags; 820 dk->dk_kevent.fflags |= ds_dkev->dk_kevent.fflags; 821 free(ds_dkev); 822 *dkp = dk; 823 do_resume = new_flags; 824 } else { 825 dk = ds_dkev; 826 _dispatch_kevent_insert(dk); 827 new_flags = dk->dk_kevent.fflags; 828 do_resume = true; 829 } 830 // Re-register the kevent with the kernel if new flags were added 831 // by the dispatch kevent 832 if (do_resume) { 833 dk->dk_kevent.flags |= EV_ADD; 834 } 835 *flgp = new_flags; 836 return do_resume; 837} 838 839static bool 840_dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags, 841 uint32_t del_flags) 842{ 843 long r; 844 switch (dk->dk_kevent.filter) { 845 case DISPATCH_EVFILT_TIMER: 846 case DISPATCH_EVFILT_CUSTOM_ADD: 847 case DISPATCH_EVFILT_CUSTOM_OR: 848 // these types not registered with kevent 849 return 0; 850#if HAVE_MACH 851 case EVFILT_MACHPORT: 852 return _dispatch_kevent_machport_resume(dk, new_flags, del_flags); 853 case DISPATCH_EVFILT_MACH_NOTIFICATION: 854 return _dispatch_kevent_mach_notify_resume(dk, new_flags, del_flags); 855#endif 856 case EVFILT_PROC: 857 if (dk->dk_kevent.flags & EV_ONESHOT) { 858 return 0; 859 } 860 // fall through 861 default: 862 r = _dispatch_kq_update(&dk->dk_kevent); 863 if (dk->dk_kevent.flags & EV_DISPATCH) { 864 dk->dk_kevent.flags &= ~EV_ADD; 865 } 866 return r; 867 } 868} 869 870static void 871_dispatch_kevent_dispose(dispatch_kevent_t dk) 872{ 873 uintptr_t hash; 874 875 switch (dk->dk_kevent.filter) { 876 case DISPATCH_EVFILT_TIMER: 877 case DISPATCH_EVFILT_CUSTOM_ADD: 878 case DISPATCH_EVFILT_CUSTOM_OR: 879 // these sources live on statically allocated lists 880 return; 881#if HAVE_MACH 882 case EVFILT_MACHPORT: 883 _dispatch_kevent_machport_resume(dk, 0, dk->dk_kevent.fflags); 884 break; 885 case DISPATCH_EVFILT_MACH_NOTIFICATION: 886 _dispatch_kevent_mach_notify_resume(dk, 0, dk->dk_kevent.fflags); 887 break; 888#endif 889 case EVFILT_PROC: 890 if (dk->dk_kevent.flags & EV_ONESHOT) { 891 break; // implicitly deleted 892 } 893 // fall through 894 default: 895 if (~dk->dk_kevent.flags & EV_DELETE) { 896 dk->dk_kevent.flags |= EV_DELETE; 897 dk->dk_kevent.flags &= ~(EV_ADD|EV_ENABLE); 898 _dispatch_kq_update(&dk->dk_kevent); 899 } 900 break; 901 } 902 903 hash = _dispatch_kevent_hash(dk->dk_kevent.ident, 904 dk->dk_kevent.filter); 905 TAILQ_REMOVE(&_dispatch_sources[hash], dk, dk_list); 906 _dispatch_kevent_unguard(dk); 907 free(dk); 908} 909 910static void 911_dispatch_kevent_unregister(dispatch_kevent_t dk, uint32_t flg) 912{ 913 dispatch_source_refs_t dri; 914 uint32_t del_flags, fflags = 0; 915 916 if (TAILQ_EMPTY(&dk->dk_sources)) { 917 _dispatch_kevent_dispose(dk); 918 } else { 919 TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) { 920 dispatch_source_t dsi = _dispatch_source_from_refs(dri); 921 uint32_t mask = (uint32_t)dsi->ds_pending_data_mask; 922 fflags |= mask; 923 } 924 del_flags = flg & ~fflags; 925 if (del_flags) { 926 dk->dk_kevent.flags |= EV_ADD; 927 dk->dk_kevent.fflags = fflags; 928 _dispatch_kevent_resume(dk, 0, del_flags); 929 } 930 } 931} 932 933DISPATCH_NOINLINE 934static void 935_dispatch_kevent_proc_exit(struct kevent64_s *ke) 936{ 937 // EVFILT_PROC may fail with ESRCH when the process exists but is a zombie 938 // <rdar://problem/5067725>. As a workaround, we simulate an exit event for 939 // any EVFILT_PROC with an invalid pid <rdar://problem/6626350>. 940 struct kevent64_s fake; 941 fake = *ke; 942 fake.flags &= ~EV_ERROR; 943 fake.fflags = NOTE_EXIT; 944 fake.data = 0; 945 _dispatch_kevent_drain(&fake); 946} 947 948DISPATCH_NOINLINE 949static void 950_dispatch_kevent_error(struct kevent64_s *ke) 951{ 952 _dispatch_kevent_debug(ke, __func__); 953 if (ke->data) { 954 // log the unexpected error 955 _dispatch_bug_kevent_client("kevent", _evfiltstr(ke->filter), 956 ke->flags & EV_DELETE ? "delete" : 957 ke->flags & EV_ADD ? "add" : 958 ke->flags & EV_ENABLE ? "enable" : "monitor", 959 (int)ke->data); 960 } 961} 962 963static void 964_dispatch_kevent_drain(struct kevent64_s *ke) 965{ 966#if DISPATCH_DEBUG 967 static dispatch_once_t pred; 968 dispatch_once_f(&pred, NULL, _dispatch_kevent_debugger); 969#endif 970 if (ke->filter == EVFILT_USER) { 971 return; 972 } 973 if (slowpath(ke->flags & EV_ERROR)) { 974 if (ke->filter == EVFILT_PROC) { 975 if (ke->flags & EV_DELETE) { 976 // Process exited while monitored 977 return; 978 } else if (ke->data == ESRCH) { 979 return _dispatch_kevent_proc_exit(ke); 980 } 981 } 982 return _dispatch_kevent_error(ke); 983 } 984 _dispatch_kevent_debug(ke, __func__); 985 if (ke->filter == EVFILT_TIMER) { 986 return _dispatch_timers_kevent(ke); 987 } 988#if HAVE_MACH 989 if (ke->filter == EVFILT_MACHPORT) { 990 return _dispatch_kevent_mach_portset(ke); 991 } 992#endif 993 return _dispatch_kevent_merge(ke); 994} 995 996DISPATCH_NOINLINE 997static void 998_dispatch_kevent_merge(struct kevent64_s *ke) 999{ 1000 dispatch_kevent_t dk; 1001 dispatch_source_refs_t dri; 1002 1003 dk = (void*)ke->udata; 1004 dispatch_assert(dk); 1005 1006 if (ke->flags & EV_ONESHOT) { 1007 dk->dk_kevent.flags |= EV_ONESHOT; 1008 } 1009 TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) { 1010 _dispatch_source_merge_kevent(_dispatch_source_from_refs(dri), ke); 1011 } 1012} 1013 1014#if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD 1015static void 1016_dispatch_kevent_guard(dispatch_kevent_t dk) 1017{ 1018 guardid_t guard; 1019 const unsigned int guard_flags = GUARD_CLOSE; 1020 int r, fd_flags = 0; 1021 switch (dk->dk_kevent.filter) { 1022 case EVFILT_READ: 1023 case EVFILT_WRITE: 1024 case EVFILT_VNODE: 1025 guard = &dk->dk_kevent; 1026 r = change_fdguard_np((int)dk->dk_kevent.ident, NULL, 0, 1027 &guard, guard_flags, &fd_flags); 1028 if (slowpath(r == -1)) { 1029 int err = errno; 1030 if (err != EPERM) { 1031 (void)dispatch_assume_zero(err); 1032 } 1033 return; 1034 } 1035 dk->dk_kevent.ext[0] = guard_flags; 1036 dk->dk_kevent.ext[1] = fd_flags; 1037 break; 1038 } 1039} 1040 1041static void 1042_dispatch_kevent_unguard(dispatch_kevent_t dk) 1043{ 1044 guardid_t guard; 1045 unsigned int guard_flags; 1046 int r, fd_flags; 1047 switch (dk->dk_kevent.filter) { 1048 case EVFILT_READ: 1049 case EVFILT_WRITE: 1050 case EVFILT_VNODE: 1051 guard_flags = (unsigned int)dk->dk_kevent.ext[0]; 1052 if (!guard_flags) { 1053 return; 1054 } 1055 guard = &dk->dk_kevent; 1056 fd_flags = (int)dk->dk_kevent.ext[1]; 1057 r = change_fdguard_np((int)dk->dk_kevent.ident, &guard, 1058 guard_flags, NULL, 0, &fd_flags); 1059 if (slowpath(r == -1)) { 1060 (void)dispatch_assume_zero(errno); 1061 return; 1062 } 1063 dk->dk_kevent.ext[0] = 0; 1064 break; 1065 } 1066} 1067#endif // DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD 1068 1069#pragma mark - 1070#pragma mark dispatch_source_timer 1071 1072#if DISPATCH_USE_DTRACE 1073static dispatch_source_refs_t 1074 _dispatch_trace_next_timer[DISPATCH_TIMER_QOS_COUNT]; 1075#define _dispatch_trace_next_timer_set(x, q) \ 1076 _dispatch_trace_next_timer[(q)] = (x) 1077#define _dispatch_trace_next_timer_program(d, q) \ 1078 _dispatch_trace_timer_program(_dispatch_trace_next_timer[(q)], (d)) 1079#define _dispatch_trace_next_timer_wake(q) \ 1080 _dispatch_trace_timer_wake(_dispatch_trace_next_timer[(q)]) 1081#else 1082#define _dispatch_trace_next_timer_set(x, q) 1083#define _dispatch_trace_next_timer_program(d, q) 1084#define _dispatch_trace_next_timer_wake(q) 1085#endif 1086 1087#define _dispatch_source_timer_telemetry_enabled() false 1088 1089DISPATCH_NOINLINE 1090static void 1091_dispatch_source_timer_telemetry_slow(dispatch_source_t ds, 1092 uintptr_t ident, struct dispatch_timer_source_s *values) 1093{ 1094 if (_dispatch_trace_timer_configure_enabled()) { 1095 _dispatch_trace_timer_configure(ds, ident, values); 1096 } 1097} 1098 1099DISPATCH_ALWAYS_INLINE 1100static inline void 1101_dispatch_source_timer_telemetry(dispatch_source_t ds, uintptr_t ident, 1102 struct dispatch_timer_source_s *values) 1103{ 1104 if (_dispatch_trace_timer_configure_enabled() || 1105 _dispatch_source_timer_telemetry_enabled()) { 1106 _dispatch_source_timer_telemetry_slow(ds, ident, values); 1107 asm(""); // prevent tailcall 1108 } 1109} 1110 1111// approx 1 year (60s * 60m * 24h * 365d) 1112#define FOREVER_NSEC 31536000000000000ull 1113 1114DISPATCH_ALWAYS_INLINE 1115static inline uint64_t 1116_dispatch_source_timer_now(uint64_t nows[], unsigned int tidx) 1117{ 1118 unsigned int tk = DISPATCH_TIMER_KIND(tidx); 1119 if (nows && fastpath(nows[tk])) { 1120 return nows[tk]; 1121 } 1122 uint64_t now; 1123 switch (tk) { 1124 case DISPATCH_TIMER_KIND_MACH: 1125 now = _dispatch_absolute_time(); 1126 break; 1127 case DISPATCH_TIMER_KIND_WALL: 1128 now = _dispatch_get_nanoseconds(); 1129 break; 1130 } 1131 if (nows) { 1132 nows[tk] = now; 1133 } 1134 return now; 1135} 1136 1137static inline unsigned long 1138_dispatch_source_timer_data(dispatch_source_refs_t dr, unsigned long prev) 1139{ 1140 // calculate the number of intervals since last fire 1141 unsigned long data, missed; 1142 uint64_t now; 1143 now = _dispatch_source_timer_now(NULL, _dispatch_source_timer_idx(dr)); 1144 missed = (unsigned long)((now - ds_timer(dr).last_fire) / 1145 ds_timer(dr).interval); 1146 // correct for missed intervals already delivered last time 1147 data = prev - ds_timer(dr).missed + missed; 1148 ds_timer(dr).missed = missed; 1149 return data; 1150} 1151 1152struct dispatch_set_timer_params { 1153 dispatch_source_t ds; 1154 uintptr_t ident; 1155 struct dispatch_timer_source_s values; 1156}; 1157 1158static void 1159_dispatch_source_set_timer3(void *context) 1160{ 1161 // Called on the _dispatch_mgr_q 1162 struct dispatch_set_timer_params *params = context; 1163 dispatch_source_t ds = params->ds; 1164 ds->ds_ident_hack = params->ident; 1165 ds_timer(ds->ds_refs) = params->values; 1166 // Clear any pending data that might have accumulated on 1167 // older timer params <rdar://problem/8574886> 1168 ds->ds_pending_data = 0; 1169 // Re-arm in case we got disarmed because of pending set_timer suspension 1170 (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, release); 1171 dispatch_resume(ds); 1172 // Must happen after resume to avoid getting disarmed due to suspension 1173 _dispatch_timers_update(ds); 1174 dispatch_release(ds); 1175 if (params->values.flags & DISPATCH_TIMER_WALL_CLOCK) { 1176 _dispatch_mach_host_calendar_change_register(); 1177 } 1178 free(params); 1179} 1180 1181static void 1182_dispatch_source_set_timer2(void *context) 1183{ 1184 // Called on the source queue 1185 struct dispatch_set_timer_params *params = context; 1186 dispatch_suspend(params->ds); 1187 _dispatch_barrier_async_detached_f(&_dispatch_mgr_q, params, 1188 _dispatch_source_set_timer3); 1189} 1190 1191DISPATCH_NOINLINE 1192static struct dispatch_set_timer_params * 1193_dispatch_source_timer_params(dispatch_source_t ds, dispatch_time_t start, 1194 uint64_t interval, uint64_t leeway) 1195{ 1196 struct dispatch_set_timer_params *params; 1197 params = _dispatch_calloc(1ul, sizeof(struct dispatch_set_timer_params)); 1198 params->ds = ds; 1199 params->values.flags = ds_timer(ds->ds_refs).flags; 1200 1201 if (interval == 0) { 1202 // we use zero internally to mean disabled 1203 interval = 1; 1204 } else if ((int64_t)interval < 0) { 1205 // 6866347 - make sure nanoseconds won't overflow 1206 interval = INT64_MAX; 1207 } 1208 if ((int64_t)leeway < 0) { 1209 leeway = INT64_MAX; 1210 } 1211 if (start == DISPATCH_TIME_NOW) { 1212 start = _dispatch_absolute_time(); 1213 } else if (start == DISPATCH_TIME_FOREVER) { 1214 start = INT64_MAX; 1215 } 1216 1217 if ((int64_t)start < 0) { 1218 // wall clock 1219 start = (dispatch_time_t)-((int64_t)start); 1220 params->values.flags |= DISPATCH_TIMER_WALL_CLOCK; 1221 } else { 1222 // absolute clock 1223 interval = _dispatch_time_nano2mach(interval); 1224 if (interval < 1) { 1225 // rdar://problem/7287561 interval must be at least one in 1226 // in order to avoid later division by zero when calculating 1227 // the missed interval count. (NOTE: the wall clock's 1228 // interval is already "fixed" to be 1 or more) 1229 interval = 1; 1230 } 1231 leeway = _dispatch_time_nano2mach(leeway); 1232 params->values.flags &= ~(unsigned long)DISPATCH_TIMER_WALL_CLOCK; 1233 } 1234 params->ident = DISPATCH_TIMER_IDENT(params->values.flags); 1235 params->values.target = start; 1236 params->values.deadline = (start < UINT64_MAX - leeway) ? 1237 start + leeway : UINT64_MAX; 1238 params->values.interval = interval; 1239 params->values.leeway = (interval == INT64_MAX || leeway < interval / 2) ? 1240 leeway : interval / 2; 1241 return params; 1242} 1243 1244DISPATCH_ALWAYS_INLINE 1245static inline void 1246_dispatch_source_set_timer(dispatch_source_t ds, dispatch_time_t start, 1247 uint64_t interval, uint64_t leeway, bool source_sync) 1248{ 1249 if (slowpath(!ds->ds_is_timer) || 1250 slowpath(ds_timer(ds->ds_refs).flags & DISPATCH_TIMER_INTERVAL)) { 1251 DISPATCH_CLIENT_CRASH("Attempt to set timer on a non-timer source"); 1252 } 1253 1254 struct dispatch_set_timer_params *params; 1255 params = _dispatch_source_timer_params(ds, start, interval, leeway); 1256 1257 _dispatch_source_timer_telemetry(ds, params->ident, ¶ms->values); 1258 // Suspend the source so that it doesn't fire with pending changes 1259 // The use of suspend/resume requires the external retain/release 1260 dispatch_retain(ds); 1261 if (source_sync) { 1262 return _dispatch_barrier_trysync_f((dispatch_queue_t)ds, params, 1263 _dispatch_source_set_timer2); 1264 } else { 1265 return _dispatch_source_set_timer2(params); 1266 } 1267} 1268 1269void 1270dispatch_source_set_timer(dispatch_source_t ds, dispatch_time_t start, 1271 uint64_t interval, uint64_t leeway) 1272{ 1273 _dispatch_source_set_timer(ds, start, interval, leeway, true); 1274} 1275 1276void 1277_dispatch_source_set_runloop_timer_4CF(dispatch_source_t ds, 1278 dispatch_time_t start, uint64_t interval, uint64_t leeway) 1279{ 1280 // Don't serialize through the source queue for CF timers <rdar://13833190> 1281 _dispatch_source_set_timer(ds, start, interval, leeway, false); 1282} 1283 1284void 1285_dispatch_source_set_interval(dispatch_source_t ds, uint64_t interval) 1286{ 1287 dispatch_source_refs_t dr = ds->ds_refs; 1288 #define NSEC_PER_FRAME (NSEC_PER_SEC/60) 1289 const bool animation = ds_timer(dr).flags & DISPATCH_INTERVAL_UI_ANIMATION; 1290 if (fastpath(interval <= (animation ? FOREVER_NSEC/NSEC_PER_FRAME : 1291 FOREVER_NSEC/NSEC_PER_MSEC))) { 1292 interval *= animation ? NSEC_PER_FRAME : NSEC_PER_MSEC; 1293 } else { 1294 interval = FOREVER_NSEC; 1295 } 1296 interval = _dispatch_time_nano2mach(interval); 1297 uint64_t target = _dispatch_absolute_time() + interval; 1298 target = (target / interval) * interval; 1299 const uint64_t leeway = animation ? 1300 _dispatch_time_nano2mach(NSEC_PER_FRAME) : interval / 2; 1301 ds_timer(dr).target = target; 1302 ds_timer(dr).deadline = target + leeway; 1303 ds_timer(dr).interval = interval; 1304 ds_timer(dr).leeway = leeway; 1305 _dispatch_source_timer_telemetry(ds, ds->ds_ident_hack, &ds_timer(dr)); 1306} 1307 1308#pragma mark - 1309#pragma mark dispatch_timers 1310 1311#define DISPATCH_TIMER_STRUCT(refs) \ 1312 uint64_t target, deadline; \ 1313 TAILQ_HEAD(, refs) dt_sources 1314 1315typedef struct dispatch_timer_s { 1316 DISPATCH_TIMER_STRUCT(dispatch_timer_source_refs_s); 1317} *dispatch_timer_t; 1318 1319#define DISPATCH_TIMER_INITIALIZER(tidx) \ 1320 [tidx] = { \ 1321 .target = UINT64_MAX, \ 1322 .deadline = UINT64_MAX, \ 1323 .dt_sources = TAILQ_HEAD_INITIALIZER( \ 1324 _dispatch_timer[tidx].dt_sources), \ 1325 } 1326#define DISPATCH_TIMER_INIT(kind, qos) \ 1327 DISPATCH_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX( \ 1328 DISPATCH_TIMER_KIND_##kind, DISPATCH_TIMER_QOS_##qos)) 1329 1330struct dispatch_timer_s _dispatch_timer[] = { 1331 DISPATCH_TIMER_INIT(WALL, NORMAL), 1332 DISPATCH_TIMER_INIT(WALL, CRITICAL), 1333 DISPATCH_TIMER_INIT(WALL, BACKGROUND), 1334 DISPATCH_TIMER_INIT(MACH, NORMAL), 1335 DISPATCH_TIMER_INIT(MACH, CRITICAL), 1336 DISPATCH_TIMER_INIT(MACH, BACKGROUND), 1337}; 1338#define DISPATCH_TIMER_COUNT \ 1339 ((sizeof(_dispatch_timer) / sizeof(_dispatch_timer[0]))) 1340 1341#define DISPATCH_KEVENT_TIMER_UDATA(tidx) \ 1342 (uintptr_t)&_dispatch_kevent_timer[tidx] 1343#ifdef __LP64__ 1344#define DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx) \ 1345 .udata = DISPATCH_KEVENT_TIMER_UDATA(tidx) 1346#else // __LP64__ 1347// dynamic initialization in _dispatch_timers_init() 1348#define DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx) \ 1349 .udata = 0 1350#endif // __LP64__ 1351#define DISPATCH_KEVENT_TIMER_INITIALIZER(tidx) \ 1352 [tidx] = { \ 1353 .dk_kevent = { \ 1354 .ident = tidx, \ 1355 .filter = DISPATCH_EVFILT_TIMER, \ 1356 DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx), \ 1357 }, \ 1358 .dk_sources = TAILQ_HEAD_INITIALIZER( \ 1359 _dispatch_kevent_timer[tidx].dk_sources), \ 1360 } 1361#define DISPATCH_KEVENT_TIMER_INIT(kind, qos) \ 1362 DISPATCH_KEVENT_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX( \ 1363 DISPATCH_TIMER_KIND_##kind, DISPATCH_TIMER_QOS_##qos)) 1364 1365struct dispatch_kevent_s _dispatch_kevent_timer[] = { 1366 DISPATCH_KEVENT_TIMER_INIT(WALL, NORMAL), 1367 DISPATCH_KEVENT_TIMER_INIT(WALL, CRITICAL), 1368 DISPATCH_KEVENT_TIMER_INIT(WALL, BACKGROUND), 1369 DISPATCH_KEVENT_TIMER_INIT(MACH, NORMAL), 1370 DISPATCH_KEVENT_TIMER_INIT(MACH, CRITICAL), 1371 DISPATCH_KEVENT_TIMER_INIT(MACH, BACKGROUND), 1372 DISPATCH_KEVENT_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX_DISARM), 1373}; 1374#define DISPATCH_KEVENT_TIMER_COUNT \ 1375 ((sizeof(_dispatch_kevent_timer) / sizeof(_dispatch_kevent_timer[0]))) 1376 1377#define DISPATCH_KEVENT_TIMEOUT_IDENT_MASK (~0ull << 8) 1378#define DISPATCH_KEVENT_TIMEOUT_INITIALIZER(qos, note) \ 1379 [qos] = { \ 1380 .ident = DISPATCH_KEVENT_TIMEOUT_IDENT_MASK|(qos), \ 1381 .filter = EVFILT_TIMER, \ 1382 .flags = EV_ONESHOT, \ 1383 .fflags = NOTE_ABSOLUTE|NOTE_NSECONDS|NOTE_LEEWAY|(note), \ 1384 } 1385#define DISPATCH_KEVENT_TIMEOUT_INIT(qos, note) \ 1386 DISPATCH_KEVENT_TIMEOUT_INITIALIZER(DISPATCH_TIMER_QOS_##qos, note) 1387 1388struct kevent64_s _dispatch_kevent_timeout[] = { 1389 DISPATCH_KEVENT_TIMEOUT_INIT(NORMAL, 0), 1390 DISPATCH_KEVENT_TIMEOUT_INIT(CRITICAL, NOTE_CRITICAL), 1391 DISPATCH_KEVENT_TIMEOUT_INIT(BACKGROUND, NOTE_BACKGROUND), 1392}; 1393 1394#define DISPATCH_KEVENT_COALESCING_WINDOW_INIT(qos, ms) \ 1395 [DISPATCH_TIMER_QOS_##qos] = 2ull * (ms) * NSEC_PER_MSEC 1396 1397static const uint64_t _dispatch_kevent_coalescing_window[] = { 1398 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(NORMAL, 75), 1399 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(CRITICAL, 1), 1400 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(BACKGROUND, 100), 1401}; 1402 1403#define _dispatch_timers_insert(tidx, dra, dr, dr_list, dta, dt, dt_list) ({ \ 1404 typeof(dr) dri = NULL; typeof(dt) dti; \ 1405 if (tidx != DISPATCH_TIMER_INDEX_DISARM) { \ 1406 TAILQ_FOREACH(dri, &dra[tidx].dk_sources, dr_list) { \ 1407 if (ds_timer(dr).target < ds_timer(dri).target) { \ 1408 break; \ 1409 } \ 1410 } \ 1411 TAILQ_FOREACH(dti, &dta[tidx].dt_sources, dt_list) { \ 1412 if (ds_timer(dt).deadline < ds_timer(dti).deadline) { \ 1413 break; \ 1414 } \ 1415 } \ 1416 if (dti) { \ 1417 TAILQ_INSERT_BEFORE(dti, dt, dt_list); \ 1418 } else { \ 1419 TAILQ_INSERT_TAIL(&dta[tidx].dt_sources, dt, dt_list); \ 1420 } \ 1421 } \ 1422 if (dri) { \ 1423 TAILQ_INSERT_BEFORE(dri, dr, dr_list); \ 1424 } else { \ 1425 TAILQ_INSERT_TAIL(&dra[tidx].dk_sources, dr, dr_list); \ 1426 } \ 1427 }) 1428 1429#define _dispatch_timers_remove(tidx, dk, dra, dr, dr_list, dta, dt, dt_list) \ 1430 ({ \ 1431 if (tidx != DISPATCH_TIMER_INDEX_DISARM) { \ 1432 TAILQ_REMOVE(&dta[tidx].dt_sources, dt, dt_list); \ 1433 } \ 1434 TAILQ_REMOVE(dk ? &(*(dk)).dk_sources : &dra[tidx].dk_sources, dr, \ 1435 dr_list); }) 1436 1437#define _dispatch_timers_check(dra, dta) ({ \ 1438 unsigned int qosm = _dispatch_timers_qos_mask; \ 1439 bool update = false; \ 1440 unsigned int tidx; \ 1441 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) { \ 1442 if (!(qosm & 1 << DISPATCH_TIMER_QOS(tidx))){ \ 1443 continue; \ 1444 } \ 1445 dispatch_timer_source_refs_t dr = (dispatch_timer_source_refs_t) \ 1446 TAILQ_FIRST(&dra[tidx].dk_sources); \ 1447 dispatch_timer_source_refs_t dt = (dispatch_timer_source_refs_t) \ 1448 TAILQ_FIRST(&dta[tidx].dt_sources); \ 1449 uint64_t target = dr ? ds_timer(dr).target : UINT64_MAX; \ 1450 uint64_t deadline = dr ? ds_timer(dt).deadline : UINT64_MAX; \ 1451 if (target != dta[tidx].target) { \ 1452 dta[tidx].target = target; \ 1453 update = true; \ 1454 } \ 1455 if (deadline != dta[tidx].deadline) { \ 1456 dta[tidx].deadline = deadline; \ 1457 update = true; \ 1458 } \ 1459 } \ 1460 update; }) 1461 1462static bool _dispatch_timers_reconfigure, _dispatch_timer_expired; 1463static unsigned int _dispatch_timers_qos_mask; 1464static bool _dispatch_timers_force_max_leeway; 1465 1466static void 1467_dispatch_timers_init(void) 1468{ 1469#ifndef __LP64__ 1470 unsigned int tidx; 1471 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) { 1472 _dispatch_kevent_timer[tidx].dk_kevent.udata = \ 1473 DISPATCH_KEVENT_TIMER_UDATA(tidx); 1474 } 1475#endif // __LP64__ 1476 if (slowpath(getenv("LIBDISPATCH_TIMERS_FORCE_MAX_LEEWAY"))) { 1477 _dispatch_timers_force_max_leeway = true; 1478 } 1479} 1480 1481static inline void 1482_dispatch_timers_unregister(dispatch_source_t ds, dispatch_kevent_t dk) 1483{ 1484 dispatch_source_refs_t dr = ds->ds_refs; 1485 unsigned int tidx = (unsigned int)dk->dk_kevent.ident; 1486 1487 if (slowpath(ds_timer_aggregate(ds))) { 1488 _dispatch_timer_aggregates_unregister(ds, tidx); 1489 } 1490 _dispatch_timers_remove(tidx, dk, _dispatch_kevent_timer, dr, dr_list, 1491 _dispatch_timer, (dispatch_timer_source_refs_t)dr, dt_list); 1492 if (tidx != DISPATCH_TIMER_INDEX_DISARM) { 1493 _dispatch_timers_reconfigure = true; 1494 _dispatch_timers_qos_mask |= 1 << DISPATCH_TIMER_QOS(tidx); 1495 } 1496} 1497 1498// Updates the ordered list of timers based on next fire date for changes to ds. 1499// Should only be called from the context of _dispatch_mgr_q. 1500static void 1501_dispatch_timers_update(dispatch_source_t ds) 1502{ 1503 dispatch_kevent_t dk = ds->ds_dkev; 1504 dispatch_source_refs_t dr = ds->ds_refs; 1505 unsigned int tidx; 1506 1507 DISPATCH_ASSERT_ON_MANAGER_QUEUE(); 1508 1509 // Do not reschedule timers unregistered with _dispatch_kevent_unregister() 1510 if (slowpath(!dk)) { 1511 return; 1512 } 1513 // Move timers that are disabled, suspended or have missed intervals to the 1514 // disarmed list, rearm after resume resp. source invoke will reenable them 1515 if (!ds_timer(dr).target || DISPATCH_OBJECT_SUSPENDED(ds) || 1516 ds->ds_pending_data) { 1517 tidx = DISPATCH_TIMER_INDEX_DISARM; 1518 (void)dispatch_atomic_and2o(ds, ds_atomic_flags, ~DSF_ARMED, relaxed); 1519 } else { 1520 tidx = _dispatch_source_timer_idx(dr); 1521 } 1522 if (slowpath(ds_timer_aggregate(ds))) { 1523 _dispatch_timer_aggregates_register(ds); 1524 } 1525 if (slowpath(!ds->ds_is_installed)) { 1526 ds->ds_is_installed = true; 1527 if (tidx != DISPATCH_TIMER_INDEX_DISARM) { 1528 (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, relaxed); 1529 } 1530 _dispatch_object_debug(ds, "%s", __func__); 1531 ds->ds_dkev = NULL; 1532 free(dk); 1533 } else { 1534 _dispatch_timers_unregister(ds, dk); 1535 } 1536 if (tidx != DISPATCH_TIMER_INDEX_DISARM) { 1537 _dispatch_timers_reconfigure = true; 1538 _dispatch_timers_qos_mask |= 1 << DISPATCH_TIMER_QOS(tidx); 1539 } 1540 if (dk != &_dispatch_kevent_timer[tidx]){ 1541 ds->ds_dkev = &_dispatch_kevent_timer[tidx]; 1542 } 1543 _dispatch_timers_insert(tidx, _dispatch_kevent_timer, dr, dr_list, 1544 _dispatch_timer, (dispatch_timer_source_refs_t)dr, dt_list); 1545 if (slowpath(ds_timer_aggregate(ds))) { 1546 _dispatch_timer_aggregates_update(ds, tidx); 1547 } 1548} 1549 1550static inline void 1551_dispatch_timers_run2(uint64_t nows[], unsigned int tidx) 1552{ 1553 dispatch_source_refs_t dr; 1554 dispatch_source_t ds; 1555 uint64_t now, missed; 1556 1557 now = _dispatch_source_timer_now(nows, tidx); 1558 while ((dr = TAILQ_FIRST(&_dispatch_kevent_timer[tidx].dk_sources))) { 1559 ds = _dispatch_source_from_refs(dr); 1560 // We may find timers on the wrong list due to a pending update from 1561 // dispatch_source_set_timer. Force an update of the list in that case. 1562 if (tidx != ds->ds_ident_hack) { 1563 _dispatch_timers_update(ds); 1564 continue; 1565 } 1566 if (!ds_timer(dr).target) { 1567 // No configured timers on the list 1568 break; 1569 } 1570 if (ds_timer(dr).target > now) { 1571 // Done running timers for now. 1572 break; 1573 } 1574 // Remove timers that are suspended or have missed intervals from the 1575 // list, rearm after resume resp. source invoke will reenable them 1576 if (DISPATCH_OBJECT_SUSPENDED(ds) || ds->ds_pending_data) { 1577 _dispatch_timers_update(ds); 1578 continue; 1579 } 1580 // Calculate number of missed intervals. 1581 missed = (now - ds_timer(dr).target) / ds_timer(dr).interval; 1582 if (++missed > INT_MAX) { 1583 missed = INT_MAX; 1584 } 1585 if (ds_timer(dr).interval < INT64_MAX) { 1586 ds_timer(dr).target += missed * ds_timer(dr).interval; 1587 ds_timer(dr).deadline = ds_timer(dr).target + ds_timer(dr).leeway; 1588 } else { 1589 ds_timer(dr).target = UINT64_MAX; 1590 ds_timer(dr).deadline = UINT64_MAX; 1591 } 1592 _dispatch_timers_update(ds); 1593 ds_timer(dr).last_fire = now; 1594 1595 unsigned long data; 1596 data = dispatch_atomic_add2o(ds, ds_pending_data, 1597 (unsigned long)missed, relaxed); 1598 _dispatch_trace_timer_fire(dr, data, (unsigned long)missed); 1599 _dispatch_wakeup(ds); 1600 } 1601} 1602 1603DISPATCH_NOINLINE 1604static void 1605_dispatch_timers_run(uint64_t nows[]) 1606{ 1607 unsigned int tidx; 1608 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) { 1609 if (!TAILQ_EMPTY(&_dispatch_kevent_timer[tidx].dk_sources)) { 1610 _dispatch_timers_run2(nows, tidx); 1611 } 1612 } 1613} 1614 1615static inline unsigned int 1616_dispatch_timers_get_delay(uint64_t nows[], struct dispatch_timer_s timer[], 1617 uint64_t *delay, uint64_t *leeway, int qos) 1618{ 1619 unsigned int tidx, ridx = DISPATCH_TIMER_COUNT; 1620 uint64_t tmp, delta = UINT64_MAX, dldelta = UINT64_MAX; 1621 1622 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) { 1623 if (qos >= 0 && qos != DISPATCH_TIMER_QOS(tidx)){ 1624 continue; 1625 } 1626 uint64_t target = timer[tidx].target; 1627 if (target == UINT64_MAX) { 1628 continue; 1629 } 1630 uint64_t deadline = timer[tidx].deadline; 1631 if (qos >= 0) { 1632 // Timer pre-coalescing <rdar://problem/13222034> 1633 uint64_t window = _dispatch_kevent_coalescing_window[qos]; 1634 uint64_t latest = deadline > window ? deadline - window : 0; 1635 dispatch_source_refs_t dri; 1636 TAILQ_FOREACH(dri, &_dispatch_kevent_timer[tidx].dk_sources, 1637 dr_list) { 1638 tmp = ds_timer(dri).target; 1639 if (tmp > latest) break; 1640 target = tmp; 1641 } 1642 } 1643 uint64_t now = _dispatch_source_timer_now(nows, tidx); 1644 if (target <= now) { 1645 delta = 0; 1646 break; 1647 } 1648 tmp = target - now; 1649 if (DISPATCH_TIMER_KIND(tidx) != DISPATCH_TIMER_KIND_WALL) { 1650 tmp = _dispatch_time_mach2nano(tmp); 1651 } 1652 if (tmp < INT64_MAX && tmp < delta) { 1653 ridx = tidx; 1654 delta = tmp; 1655 } 1656 dispatch_assert(target <= deadline); 1657 tmp = deadline - now; 1658 if (DISPATCH_TIMER_KIND(tidx) != DISPATCH_TIMER_KIND_WALL) { 1659 tmp = _dispatch_time_mach2nano(tmp); 1660 } 1661 if (tmp < INT64_MAX && tmp < dldelta) { 1662 dldelta = tmp; 1663 } 1664 } 1665 *delay = delta; 1666 *leeway = delta && delta < UINT64_MAX ? dldelta - delta : UINT64_MAX; 1667 return ridx; 1668} 1669 1670static bool 1671_dispatch_timers_program2(uint64_t nows[], struct kevent64_s *ke, 1672 unsigned int qos) 1673{ 1674 unsigned int tidx; 1675 bool poll; 1676 uint64_t delay, leeway; 1677 1678 tidx = _dispatch_timers_get_delay(nows, _dispatch_timer, &delay, &leeway, 1679 (int)qos); 1680 poll = (delay == 0); 1681 if (poll || delay == UINT64_MAX) { 1682 _dispatch_trace_next_timer_set(NULL, qos); 1683 if (!ke->data) { 1684 return poll; 1685 } 1686 ke->data = 0; 1687 ke->flags |= EV_DELETE; 1688 ke->flags &= ~(EV_ADD|EV_ENABLE); 1689 } else { 1690 _dispatch_trace_next_timer_set( 1691 TAILQ_FIRST(&_dispatch_kevent_timer[tidx].dk_sources), qos); 1692 _dispatch_trace_next_timer_program(delay, qos); 1693 delay += _dispatch_source_timer_now(nows, DISPATCH_TIMER_KIND_WALL); 1694 if (slowpath(_dispatch_timers_force_max_leeway)) { 1695 ke->data = (int64_t)(delay + leeway); 1696 ke->ext[1] = 0; 1697 } else { 1698 ke->data = (int64_t)delay; 1699 ke->ext[1] = leeway; 1700 } 1701 ke->flags |= EV_ADD|EV_ENABLE; 1702 ke->flags &= ~EV_DELETE; 1703 } 1704 _dispatch_kq_update(ke); 1705 return poll; 1706} 1707 1708DISPATCH_NOINLINE 1709static bool 1710_dispatch_timers_program(uint64_t nows[]) 1711{ 1712 bool poll = false; 1713 unsigned int qos, qosm = _dispatch_timers_qos_mask; 1714 for (qos = 0; qos < DISPATCH_TIMER_QOS_COUNT; qos++) { 1715 if (!(qosm & 1 << qos)){ 1716 continue; 1717 } 1718 poll |= _dispatch_timers_program2(nows, &_dispatch_kevent_timeout[qos], 1719 qos); 1720 } 1721 return poll; 1722} 1723 1724DISPATCH_NOINLINE 1725static bool 1726_dispatch_timers_configure(void) 1727{ 1728 _dispatch_timer_aggregates_check(); 1729 // Find out if there is a new target/deadline on the timer lists 1730 return _dispatch_timers_check(_dispatch_kevent_timer, _dispatch_timer); 1731} 1732 1733static void 1734_dispatch_timers_calendar_change(void) 1735{ 1736 // calendar change may have gone past the wallclock deadline 1737 _dispatch_timer_expired = true; 1738 _dispatch_timers_qos_mask = ~0u; 1739} 1740 1741static void 1742_dispatch_timers_kevent(struct kevent64_s *ke) 1743{ 1744 dispatch_assert(ke->data > 0); 1745 dispatch_assert((ke->ident & DISPATCH_KEVENT_TIMEOUT_IDENT_MASK) == 1746 DISPATCH_KEVENT_TIMEOUT_IDENT_MASK); 1747 unsigned int qos = ke->ident & ~DISPATCH_KEVENT_TIMEOUT_IDENT_MASK; 1748 dispatch_assert(qos < DISPATCH_TIMER_QOS_COUNT); 1749 dispatch_assert(_dispatch_kevent_timeout[qos].data); 1750 _dispatch_kevent_timeout[qos].data = 0; // kevent deleted via EV_ONESHOT 1751 _dispatch_timer_expired = true; 1752 _dispatch_timers_qos_mask |= 1 << qos; 1753 _dispatch_trace_next_timer_wake(qos); 1754} 1755 1756static inline bool 1757_dispatch_mgr_timers(void) 1758{ 1759 uint64_t nows[DISPATCH_TIMER_KIND_COUNT] = {}; 1760 bool expired = slowpath(_dispatch_timer_expired); 1761 if (expired) { 1762 _dispatch_timers_run(nows); 1763 } 1764 bool reconfigure = slowpath(_dispatch_timers_reconfigure); 1765 if (reconfigure || expired) { 1766 if (reconfigure) { 1767 reconfigure = _dispatch_timers_configure(); 1768 _dispatch_timers_reconfigure = false; 1769 } 1770 if (reconfigure || expired) { 1771 expired = _dispatch_timer_expired = _dispatch_timers_program(nows); 1772 expired = expired || _dispatch_mgr_q.dq_items_tail; 1773 } 1774 _dispatch_timers_qos_mask = 0; 1775 } 1776 return expired; 1777} 1778 1779#pragma mark - 1780#pragma mark dispatch_timer_aggregate 1781 1782typedef struct { 1783 TAILQ_HEAD(, dispatch_timer_source_aggregate_refs_s) dk_sources; 1784} dispatch_timer_aggregate_refs_s; 1785 1786typedef struct dispatch_timer_aggregate_s { 1787 DISPATCH_STRUCT_HEADER(queue); 1788 DISPATCH_QUEUE_HEADER; 1789 TAILQ_ENTRY(dispatch_timer_aggregate_s) dta_list; 1790 dispatch_timer_aggregate_refs_s 1791 dta_kevent_timer[DISPATCH_KEVENT_TIMER_COUNT]; 1792 struct { 1793 DISPATCH_TIMER_STRUCT(dispatch_timer_source_aggregate_refs_s); 1794 } dta_timer[DISPATCH_TIMER_COUNT]; 1795 struct dispatch_timer_s dta_timer_data[DISPATCH_TIMER_COUNT]; 1796 unsigned int dta_refcount; 1797} dispatch_timer_aggregate_s; 1798 1799typedef TAILQ_HEAD(, dispatch_timer_aggregate_s) dispatch_timer_aggregates_s; 1800static dispatch_timer_aggregates_s _dispatch_timer_aggregates = 1801 TAILQ_HEAD_INITIALIZER(_dispatch_timer_aggregates); 1802 1803dispatch_timer_aggregate_t 1804dispatch_timer_aggregate_create(void) 1805{ 1806 unsigned int tidx; 1807 dispatch_timer_aggregate_t dta = _dispatch_alloc(DISPATCH_VTABLE(queue), 1808 sizeof(struct dispatch_timer_aggregate_s)); 1809 _dispatch_queue_init((dispatch_queue_t)dta); 1810 dta->do_targetq = _dispatch_get_root_queue( 1811 _DISPATCH_QOS_CLASS_USER_INITIATED, true); 1812 dta->dq_width = DISPATCH_QUEUE_WIDTH_MAX; 1813 //FIXME: aggregates need custom vtable 1814 //dta->dq_label = "timer-aggregate"; 1815 for (tidx = 0; tidx < DISPATCH_KEVENT_TIMER_COUNT; tidx++) { 1816 TAILQ_INIT(&dta->dta_kevent_timer[tidx].dk_sources); 1817 } 1818 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) { 1819 TAILQ_INIT(&dta->dta_timer[tidx].dt_sources); 1820 dta->dta_timer[tidx].target = UINT64_MAX; 1821 dta->dta_timer[tidx].deadline = UINT64_MAX; 1822 dta->dta_timer_data[tidx].target = UINT64_MAX; 1823 dta->dta_timer_data[tidx].deadline = UINT64_MAX; 1824 } 1825 return (dispatch_timer_aggregate_t)_dispatch_introspection_queue_create( 1826 (dispatch_queue_t)dta); 1827} 1828 1829typedef struct dispatch_timer_delay_s { 1830 dispatch_timer_t timer; 1831 uint64_t delay, leeway; 1832} *dispatch_timer_delay_t; 1833 1834static void 1835_dispatch_timer_aggregate_get_delay(void *ctxt) 1836{ 1837 dispatch_timer_delay_t dtd = ctxt; 1838 struct { uint64_t nows[DISPATCH_TIMER_KIND_COUNT]; } dtn = {}; 1839 _dispatch_timers_get_delay(dtn.nows, dtd->timer, &dtd->delay, &dtd->leeway, 1840 -1); 1841} 1842 1843uint64_t 1844dispatch_timer_aggregate_get_delay(dispatch_timer_aggregate_t dta, 1845 uint64_t *leeway_ptr) 1846{ 1847 struct dispatch_timer_delay_s dtd = { 1848 .timer = dta->dta_timer_data, 1849 }; 1850 dispatch_sync_f((dispatch_queue_t)dta, &dtd, 1851 _dispatch_timer_aggregate_get_delay); 1852 if (leeway_ptr) { 1853 *leeway_ptr = dtd.leeway; 1854 } 1855 return dtd.delay; 1856} 1857 1858static void 1859_dispatch_timer_aggregate_update(void *ctxt) 1860{ 1861 dispatch_timer_aggregate_t dta = (void*)_dispatch_queue_get_current(); 1862 dispatch_timer_t dtau = ctxt; 1863 unsigned int tidx; 1864 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) { 1865 dta->dta_timer_data[tidx].target = dtau[tidx].target; 1866 dta->dta_timer_data[tidx].deadline = dtau[tidx].deadline; 1867 } 1868 free(dtau); 1869} 1870 1871DISPATCH_NOINLINE 1872static void 1873_dispatch_timer_aggregates_configure(void) 1874{ 1875 dispatch_timer_aggregate_t dta; 1876 dispatch_timer_t dtau; 1877 TAILQ_FOREACH(dta, &_dispatch_timer_aggregates, dta_list) { 1878 if (!_dispatch_timers_check(dta->dta_kevent_timer, dta->dta_timer)) { 1879 continue; 1880 } 1881 dtau = _dispatch_calloc(DISPATCH_TIMER_COUNT, sizeof(*dtau)); 1882 memcpy(dtau, dta->dta_timer, sizeof(dta->dta_timer)); 1883 _dispatch_barrier_async_detached_f((dispatch_queue_t)dta, dtau, 1884 _dispatch_timer_aggregate_update); 1885 } 1886} 1887 1888static inline void 1889_dispatch_timer_aggregates_check(void) 1890{ 1891 if (fastpath(TAILQ_EMPTY(&_dispatch_timer_aggregates))) { 1892 return; 1893 } 1894 _dispatch_timer_aggregates_configure(); 1895} 1896 1897static void 1898_dispatch_timer_aggregates_register(dispatch_source_t ds) 1899{ 1900 dispatch_timer_aggregate_t dta = ds_timer_aggregate(ds); 1901 if (!dta->dta_refcount++) { 1902 TAILQ_INSERT_TAIL(&_dispatch_timer_aggregates, dta, dta_list); 1903 } 1904} 1905 1906DISPATCH_NOINLINE 1907static void 1908_dispatch_timer_aggregates_update(dispatch_source_t ds, unsigned int tidx) 1909{ 1910 dispatch_timer_aggregate_t dta = ds_timer_aggregate(ds); 1911 dispatch_timer_source_aggregate_refs_t dr; 1912 dr = (dispatch_timer_source_aggregate_refs_t)ds->ds_refs; 1913 _dispatch_timers_insert(tidx, dta->dta_kevent_timer, dr, dra_list, 1914 dta->dta_timer, dr, dta_list); 1915} 1916 1917DISPATCH_NOINLINE 1918static void 1919_dispatch_timer_aggregates_unregister(dispatch_source_t ds, unsigned int tidx) 1920{ 1921 dispatch_timer_aggregate_t dta = ds_timer_aggregate(ds); 1922 dispatch_timer_source_aggregate_refs_t dr; 1923 dr = (dispatch_timer_source_aggregate_refs_t)ds->ds_refs; 1924 _dispatch_timers_remove(tidx, (dispatch_timer_aggregate_refs_s*)NULL, 1925 dta->dta_kevent_timer, dr, dra_list, dta->dta_timer, dr, dta_list); 1926 if (!--dta->dta_refcount) { 1927 TAILQ_REMOVE(&_dispatch_timer_aggregates, dta, dta_list); 1928 } 1929} 1930 1931#pragma mark - 1932#pragma mark dispatch_select 1933 1934static int _dispatch_kq; 1935 1936static unsigned int _dispatch_select_workaround; 1937static fd_set _dispatch_rfds; 1938static fd_set _dispatch_wfds; 1939static uint64_t*_dispatch_rfd_ptrs; 1940static uint64_t*_dispatch_wfd_ptrs; 1941 1942DISPATCH_NOINLINE 1943static bool 1944_dispatch_select_register(struct kevent64_s *kev) 1945{ 1946 1947 // Must execute on manager queue 1948 DISPATCH_ASSERT_ON_MANAGER_QUEUE(); 1949 1950 // If an EINVAL or ENOENT error occurred while adding/enabling a read or 1951 // write kevent, assume it was due to a type of filedescriptor not 1952 // supported by kqueue and fall back to select 1953 switch (kev->filter) { 1954 case EVFILT_READ: 1955 if ((kev->data == EINVAL || kev->data == ENOENT) && 1956 dispatch_assume(kev->ident < FD_SETSIZE)) { 1957 FD_SET((int)kev->ident, &_dispatch_rfds); 1958 if (slowpath(!_dispatch_rfd_ptrs)) { 1959 _dispatch_rfd_ptrs = _dispatch_calloc(FD_SETSIZE, 1960 sizeof(*_dispatch_rfd_ptrs)); 1961 } 1962 if (!_dispatch_rfd_ptrs[kev->ident]) { 1963 _dispatch_rfd_ptrs[kev->ident] = kev->udata; 1964 _dispatch_select_workaround++; 1965 _dispatch_debug("select workaround used to read fd %d: 0x%lx", 1966 (int)kev->ident, (long)kev->data); 1967 } 1968 } 1969 return true; 1970 case EVFILT_WRITE: 1971 if ((kev->data == EINVAL || kev->data == ENOENT) && 1972 dispatch_assume(kev->ident < FD_SETSIZE)) { 1973 FD_SET((int)kev->ident, &_dispatch_wfds); 1974 if (slowpath(!_dispatch_wfd_ptrs)) { 1975 _dispatch_wfd_ptrs = _dispatch_calloc(FD_SETSIZE, 1976 sizeof(*_dispatch_wfd_ptrs)); 1977 } 1978 if (!_dispatch_wfd_ptrs[kev->ident]) { 1979 _dispatch_wfd_ptrs[kev->ident] = kev->udata; 1980 _dispatch_select_workaround++; 1981 _dispatch_debug("select workaround used to write fd %d: 0x%lx", 1982 (int)kev->ident, (long)kev->data); 1983 } 1984 } 1985 return true; 1986 } 1987 return false; 1988} 1989 1990DISPATCH_NOINLINE 1991static bool 1992_dispatch_select_unregister(const struct kevent64_s *kev) 1993{ 1994 // Must execute on manager queue 1995 DISPATCH_ASSERT_ON_MANAGER_QUEUE(); 1996 1997 switch (kev->filter) { 1998 case EVFILT_READ: 1999 if (_dispatch_rfd_ptrs && kev->ident < FD_SETSIZE && 2000 _dispatch_rfd_ptrs[kev->ident]) { 2001 FD_CLR((int)kev->ident, &_dispatch_rfds); 2002 _dispatch_rfd_ptrs[kev->ident] = 0; 2003 _dispatch_select_workaround--; 2004 return true; 2005 } 2006 break; 2007 case EVFILT_WRITE: 2008 if (_dispatch_wfd_ptrs && kev->ident < FD_SETSIZE && 2009 _dispatch_wfd_ptrs[kev->ident]) { 2010 FD_CLR((int)kev->ident, &_dispatch_wfds); 2011 _dispatch_wfd_ptrs[kev->ident] = 0; 2012 _dispatch_select_workaround--; 2013 return true; 2014 } 2015 break; 2016 } 2017 return false; 2018} 2019 2020DISPATCH_NOINLINE 2021static bool 2022_dispatch_mgr_select(bool poll) 2023{ 2024 static const struct timeval timeout_immediately = { 0, 0 }; 2025 fd_set tmp_rfds, tmp_wfds; 2026 struct kevent64_s kev; 2027 int err, i, r; 2028 bool kevent_avail = false; 2029 2030 FD_COPY(&_dispatch_rfds, &tmp_rfds); 2031 FD_COPY(&_dispatch_wfds, &tmp_wfds); 2032 2033 r = select(FD_SETSIZE, &tmp_rfds, &tmp_wfds, NULL, 2034 poll ? (struct timeval*)&timeout_immediately : NULL); 2035 if (slowpath(r == -1)) { 2036 err = errno; 2037 if (err != EBADF) { 2038 if (err != EINTR) { 2039 (void)dispatch_assume_zero(err); 2040 } 2041 return false; 2042 } 2043 for (i = 0; i < FD_SETSIZE; i++) { 2044 if (i == _dispatch_kq) { 2045 continue; 2046 } 2047 if (!FD_ISSET(i, &_dispatch_rfds) && !FD_ISSET(i, &_dispatch_wfds)){ 2048 continue; 2049 } 2050 r = dup(i); 2051 if (dispatch_assume(r != -1)) { 2052 close(r); 2053 } else { 2054 if (_dispatch_rfd_ptrs && _dispatch_rfd_ptrs[i]) { 2055 FD_CLR(i, &_dispatch_rfds); 2056 _dispatch_rfd_ptrs[i] = 0; 2057 _dispatch_select_workaround--; 2058 } 2059 if (_dispatch_wfd_ptrs && _dispatch_wfd_ptrs[i]) { 2060 FD_CLR(i, &_dispatch_wfds); 2061 _dispatch_wfd_ptrs[i] = 0; 2062 _dispatch_select_workaround--; 2063 } 2064 } 2065 } 2066 return false; 2067 } 2068 if (r > 0) { 2069 for (i = 0; i < FD_SETSIZE; i++) { 2070 if (FD_ISSET(i, &tmp_rfds)) { 2071 if (i == _dispatch_kq) { 2072 kevent_avail = true; 2073 continue; 2074 } 2075 FD_CLR(i, &_dispatch_rfds); // emulate EV_DISPATCH 2076 EV_SET64(&kev, i, EVFILT_READ, 2077 EV_ADD|EV_ENABLE|EV_DISPATCH, 0, 1, 2078 _dispatch_rfd_ptrs[i], 0, 0); 2079 _dispatch_kevent_drain(&kev); 2080 } 2081 if (FD_ISSET(i, &tmp_wfds)) { 2082 FD_CLR(i, &_dispatch_wfds); // emulate EV_DISPATCH 2083 EV_SET64(&kev, i, EVFILT_WRITE, 2084 EV_ADD|EV_ENABLE|EV_DISPATCH, 0, 1, 2085 _dispatch_wfd_ptrs[i], 0, 0); 2086 _dispatch_kevent_drain(&kev); 2087 } 2088 } 2089 } 2090 return kevent_avail; 2091} 2092 2093#pragma mark - 2094#pragma mark dispatch_kqueue 2095 2096static void 2097_dispatch_kq_init(void *context DISPATCH_UNUSED) 2098{ 2099 static const struct kevent64_s kev = { 2100 .ident = 1, 2101 .filter = EVFILT_USER, 2102 .flags = EV_ADD|EV_CLEAR, 2103 }; 2104 2105 _dispatch_safe_fork = false; 2106#if DISPATCH_USE_GUARDED_FD 2107 guardid_t guard = (uintptr_t)&kev; 2108 _dispatch_kq = guarded_kqueue_np(&guard, GUARD_CLOSE | GUARD_DUP); 2109#else 2110 _dispatch_kq = kqueue(); 2111#endif 2112 if (_dispatch_kq == -1) { 2113 int err = errno; 2114 switch (err) { 2115 case EMFILE: 2116 DISPATCH_CLIENT_CRASH("kqueue() failure: " 2117 "process is out of file descriptors"); 2118 break; 2119 case ENFILE: 2120 DISPATCH_CLIENT_CRASH("kqueue() failure: " 2121 "system is out of file descriptors"); 2122 break; 2123 case ENOMEM: 2124 DISPATCH_CLIENT_CRASH("kqueue() failure: " 2125 "kernel is out of memory"); 2126 break; 2127 default: 2128 (void)dispatch_assume_zero(err); 2129 DISPATCH_CRASH("kqueue() failure"); 2130 break; 2131 } 2132 } else if (dispatch_assume(_dispatch_kq < FD_SETSIZE)) { 2133 // in case we fall back to select() 2134 FD_SET(_dispatch_kq, &_dispatch_rfds); 2135 } 2136 2137 (void)dispatch_assume_zero(kevent64(_dispatch_kq, &kev, 1, NULL, 0, 0, 2138 NULL)); 2139 _dispatch_queue_push(_dispatch_mgr_q.do_targetq, &_dispatch_mgr_q, 0); 2140} 2141 2142static int 2143_dispatch_get_kq(void) 2144{ 2145 static dispatch_once_t pred; 2146 2147 dispatch_once_f(&pred, NULL, _dispatch_kq_init); 2148 2149 return _dispatch_kq; 2150} 2151 2152DISPATCH_NOINLINE 2153static long 2154_dispatch_kq_update(const struct kevent64_s *kev) 2155{ 2156 int r; 2157 struct kevent64_s kev_copy; 2158 2159 if (slowpath(_dispatch_select_workaround) && (kev->flags & EV_DELETE)) { 2160 if (_dispatch_select_unregister(kev)) { 2161 return 0; 2162 } 2163 } 2164 kev_copy = *kev; 2165 // This ensures we don't get a pending kevent back while registering 2166 // a new kevent 2167 kev_copy.flags |= EV_RECEIPT; 2168retry: 2169 r = dispatch_assume(kevent64(_dispatch_get_kq(), &kev_copy, 1, 2170 &kev_copy, 1, 0, NULL)); 2171 if (slowpath(r == -1)) { 2172 int err = errno; 2173 switch (err) { 2174 case EINTR: 2175 goto retry; 2176 case EBADF: 2177 DISPATCH_CLIENT_CRASH("Do not close random Unix descriptors"); 2178 break; 2179 default: 2180 (void)dispatch_assume_zero(err); 2181 break; 2182 } 2183 return err; 2184 } 2185 switch (kev_copy.data) { 2186 case 0: 2187 return 0; 2188 case EBADF: 2189 case EPERM: 2190 case EINVAL: 2191 case ENOENT: 2192 if ((kev->flags & (EV_ADD|EV_ENABLE)) && !(kev->flags & EV_DELETE)) { 2193 if (_dispatch_select_register(&kev_copy)) { 2194 return 0; 2195 } 2196 } 2197 // fall through 2198 default: 2199 kev_copy.flags |= kev->flags; 2200 _dispatch_kevent_drain(&kev_copy); 2201 break; 2202 } 2203 return (long)kev_copy.data; 2204} 2205 2206#pragma mark - 2207#pragma mark dispatch_mgr 2208 2209static struct kevent64_s *_dispatch_kevent_enable; 2210 2211static void inline 2212_dispatch_mgr_kevent_reenable(struct kevent64_s *ke) 2213{ 2214 dispatch_assert(!_dispatch_kevent_enable || _dispatch_kevent_enable == ke); 2215 _dispatch_kevent_enable = ke; 2216} 2217 2218unsigned long 2219_dispatch_mgr_wakeup(dispatch_queue_t dq DISPATCH_UNUSED) 2220{ 2221 if (_dispatch_queue_get_current() == &_dispatch_mgr_q) { 2222 return false; 2223 } 2224 2225 static const struct kevent64_s kev = { 2226 .ident = 1, 2227 .filter = EVFILT_USER, 2228 .fflags = NOTE_TRIGGER, 2229 }; 2230 2231#if DISPATCH_DEBUG && DISPATCH_MGR_QUEUE_DEBUG 2232 _dispatch_debug("waking up the dispatch manager queue: %p", dq); 2233#endif 2234 2235 _dispatch_kq_update(&kev); 2236 2237 return false; 2238} 2239 2240DISPATCH_NOINLINE 2241static void 2242_dispatch_mgr_init(void) 2243{ 2244 (void)dispatch_atomic_inc2o(&_dispatch_mgr_q, dq_running, relaxed); 2245 _dispatch_thread_setspecific(dispatch_queue_key, &_dispatch_mgr_q); 2246 _dispatch_queue_set_bound_thread(&_dispatch_mgr_q); 2247 _dispatch_mgr_priority_init(); 2248 _dispatch_kevent_init(); 2249 _dispatch_timers_init(); 2250 _dispatch_mach_recv_msg_buf_init(); 2251 _dispatch_memorystatus_init(); 2252} 2253 2254DISPATCH_NOINLINE DISPATCH_NORETURN 2255static void 2256_dispatch_mgr_invoke(void) 2257{ 2258 static const struct timespec timeout_immediately = { 0, 0 }; 2259 struct kevent64_s kev; 2260 bool poll; 2261 int r; 2262 2263 for (;;) { 2264 _dispatch_mgr_queue_drain(); 2265 poll = _dispatch_mgr_timers(); 2266 if (slowpath(_dispatch_select_workaround)) { 2267 poll = _dispatch_mgr_select(poll); 2268 if (!poll) continue; 2269 } 2270 poll = poll || _dispatch_queue_class_probe(&_dispatch_mgr_q); 2271 r = kevent64(_dispatch_kq, _dispatch_kevent_enable, 2272 _dispatch_kevent_enable ? 1 : 0, &kev, 1, 0, 2273 poll ? &timeout_immediately : NULL); 2274 _dispatch_kevent_enable = NULL; 2275 if (slowpath(r == -1)) { 2276 int err = errno; 2277 switch (err) { 2278 case EINTR: 2279 break; 2280 case EBADF: 2281 DISPATCH_CLIENT_CRASH("Do not close random Unix descriptors"); 2282 break; 2283 default: 2284 (void)dispatch_assume_zero(err); 2285 break; 2286 } 2287 } else if (r) { 2288 _dispatch_kevent_drain(&kev); 2289 } 2290 } 2291} 2292 2293DISPATCH_NORETURN 2294void 2295_dispatch_mgr_thread(dispatch_queue_t dq DISPATCH_UNUSED) 2296{ 2297 _dispatch_mgr_init(); 2298 // never returns, so burn bridges behind us & clear stack 2k ahead 2299 _dispatch_clear_stack(2048); 2300 _dispatch_mgr_invoke(); 2301} 2302 2303#pragma mark - 2304#pragma mark dispatch_memorystatus 2305 2306#if DISPATCH_USE_MEMORYSTATUS_SOURCE 2307#define DISPATCH_MEMORYSTATUS_SOURCE_TYPE DISPATCH_SOURCE_TYPE_MEMORYSTATUS 2308#define DISPATCH_MEMORYSTATUS_SOURCE_MASK ( \ 2309 DISPATCH_MEMORYSTATUS_PRESSURE_NORMAL | \ 2310 DISPATCH_MEMORYSTATUS_PRESSURE_WARN) 2311#elif DISPATCH_USE_VM_PRESSURE_SOURCE 2312#define DISPATCH_MEMORYSTATUS_SOURCE_TYPE DISPATCH_SOURCE_TYPE_VM 2313#define DISPATCH_MEMORYSTATUS_SOURCE_MASK DISPATCH_VM_PRESSURE 2314#endif 2315 2316#if DISPATCH_USE_MEMORYSTATUS_SOURCE || DISPATCH_USE_VM_PRESSURE_SOURCE 2317static dispatch_source_t _dispatch_memorystatus_source; 2318 2319static void 2320_dispatch_memorystatus_handler(void *context DISPATCH_UNUSED) 2321{ 2322#if DISPATCH_USE_MEMORYSTATUS_SOURCE 2323 unsigned long memorystatus; 2324 memorystatus = dispatch_source_get_data(_dispatch_memorystatus_source); 2325 if (memorystatus & DISPATCH_MEMORYSTATUS_PRESSURE_NORMAL) { 2326 _dispatch_continuation_cache_limit = DISPATCH_CONTINUATION_CACHE_LIMIT; 2327 _voucher_activity_heap_pressure_normal(); 2328 return; 2329 } 2330 _dispatch_continuation_cache_limit = 2331 DISPATCH_CONTINUATION_CACHE_LIMIT_MEMORYSTATUS_PRESSURE_WARN; 2332 _voucher_activity_heap_pressure_warn(); 2333#endif 2334 malloc_zone_pressure_relief(0,0); 2335} 2336 2337static void 2338_dispatch_memorystatus_init(void) 2339{ 2340 _dispatch_memorystatus_source = dispatch_source_create( 2341 DISPATCH_MEMORYSTATUS_SOURCE_TYPE, 0, 2342 DISPATCH_MEMORYSTATUS_SOURCE_MASK, 2343 _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT, true)); 2344 dispatch_source_set_event_handler_f(_dispatch_memorystatus_source, 2345 _dispatch_memorystatus_handler); 2346 dispatch_resume(_dispatch_memorystatus_source); 2347} 2348#else 2349static inline void _dispatch_memorystatus_init(void) {} 2350#endif // DISPATCH_USE_MEMORYSTATUS_SOURCE || DISPATCH_USE_VM_PRESSURE_SOURCE 2351 2352#pragma mark - 2353#pragma mark dispatch_mach 2354 2355#if HAVE_MACH 2356 2357#if DISPATCH_DEBUG && DISPATCH_MACHPORT_DEBUG 2358#define _dispatch_debug_machport(name) \ 2359 dispatch_debug_machport((name), __func__) 2360#else 2361#define _dispatch_debug_machport(name) ((void)(name)) 2362#endif 2363 2364// Flags for all notifications that are registered/unregistered when a 2365// send-possible notification is requested/delivered 2366#define _DISPATCH_MACH_SP_FLAGS (DISPATCH_MACH_SEND_POSSIBLE| \ 2367 DISPATCH_MACH_SEND_DEAD|DISPATCH_MACH_SEND_DELETED) 2368#define _DISPATCH_MACH_RECV_FLAGS (DISPATCH_MACH_RECV_MESSAGE| \ 2369 DISPATCH_MACH_RECV_MESSAGE_DIRECT| \ 2370 DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE) 2371#define _DISPATCH_MACH_RECV_DIRECT_FLAGS ( \ 2372 DISPATCH_MACH_RECV_MESSAGE_DIRECT| \ 2373 DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE) 2374 2375#define _DISPATCH_IS_POWER_OF_TWO(v) (!(v & (v - 1)) && v) 2376#define _DISPATCH_HASH(x, y) (_DISPATCH_IS_POWER_OF_TWO(y) ? \ 2377 (MACH_PORT_INDEX(x) & ((y) - 1)) : (MACH_PORT_INDEX(x) % (y))) 2378 2379#define _DISPATCH_MACHPORT_HASH_SIZE 32 2380#define _DISPATCH_MACHPORT_HASH(x) \ 2381 _DISPATCH_HASH((x), _DISPATCH_MACHPORT_HASH_SIZE) 2382 2383#ifndef MACH_RCV_LARGE_IDENTITY 2384#define MACH_RCV_LARGE_IDENTITY 0x00000008 2385#endif 2386#ifndef MACH_RCV_VOUCHER 2387#define MACH_RCV_VOUCHER 0x00000800 2388#endif 2389#define DISPATCH_MACH_RCV_TRAILER MACH_RCV_TRAILER_CTX 2390#define DISPATCH_MACH_RCV_OPTIONS ( \ 2391 MACH_RCV_MSG | MACH_RCV_LARGE | MACH_RCV_LARGE_IDENTITY | \ 2392 MACH_RCV_TRAILER_ELEMENTS(DISPATCH_MACH_RCV_TRAILER) | \ 2393 MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0)) | \ 2394 MACH_RCV_VOUCHER 2395 2396#define DISPATCH_MACH_KEVENT_ARMED(dk) ((dk)->dk_kevent.ext[0]) 2397 2398static void _dispatch_kevent_machport_drain(struct kevent64_s *ke); 2399static void _dispatch_kevent_mach_msg_drain(struct kevent64_s *ke); 2400static void _dispatch_kevent_mach_msg_recv(mach_msg_header_t *hdr); 2401static void _dispatch_kevent_mach_msg_destroy(mach_msg_header_t *hdr); 2402static void _dispatch_source_merge_mach_msg(dispatch_source_t ds, 2403 dispatch_source_refs_t dr, dispatch_kevent_t dk, 2404 mach_msg_header_t *hdr, mach_msg_size_t siz); 2405static kern_return_t _dispatch_mach_notify_update(dispatch_kevent_t dk, 2406 uint32_t new_flags, uint32_t del_flags, uint32_t mask, 2407 mach_msg_id_t notify_msgid, mach_port_mscount_t notify_sync); 2408static void _dispatch_mach_notify_source_invoke(mach_msg_header_t *hdr); 2409static void _dispatch_mach_reply_kevent_unregister(dispatch_mach_t dm, 2410 dispatch_mach_reply_refs_t dmr, bool disconnected); 2411static void _dispatch_mach_kevent_unregister(dispatch_mach_t dm); 2412static inline void _dispatch_mach_msg_set_options(dispatch_object_t dou, 2413 mach_msg_option_t options); 2414static void _dispatch_mach_msg_recv(dispatch_mach_t dm, 2415 dispatch_mach_reply_refs_t dmr, mach_msg_header_t *hdr, 2416 mach_msg_size_t siz); 2417static void _dispatch_mach_merge_kevent(dispatch_mach_t dm, 2418 const struct kevent64_s *ke); 2419static inline mach_msg_option_t _dispatch_mach_checkin_options(void); 2420 2421static const size_t _dispatch_mach_recv_msg_size = 2422 DISPATCH_MACH_RECEIVE_MAX_INLINE_MESSAGE_SIZE; 2423static const size_t dispatch_mach_trailer_size = 2424 sizeof(dispatch_mach_trailer_t); 2425static mach_msg_size_t _dispatch_mach_recv_msg_buf_size; 2426static mach_port_t _dispatch_mach_portset, _dispatch_mach_recv_portset; 2427static mach_port_t _dispatch_mach_notify_port; 2428static struct kevent64_s _dispatch_mach_recv_kevent = { 2429 .filter = EVFILT_MACHPORT, 2430 .flags = EV_ADD|EV_ENABLE|EV_DISPATCH, 2431 .fflags = DISPATCH_MACH_RCV_OPTIONS, 2432}; 2433static dispatch_source_t _dispatch_mach_notify_source; 2434static const 2435struct dispatch_source_type_s _dispatch_source_type_mach_recv_direct = { 2436 .ke = { 2437 .filter = EVFILT_MACHPORT, 2438 .flags = EV_CLEAR, 2439 .fflags = DISPATCH_MACH_RECV_MESSAGE_DIRECT, 2440 }, 2441}; 2442 2443static void 2444_dispatch_mach_recv_msg_buf_init(void) 2445{ 2446 mach_vm_size_t vm_size = mach_vm_round_page( 2447 _dispatch_mach_recv_msg_size + dispatch_mach_trailer_size); 2448 _dispatch_mach_recv_msg_buf_size = (mach_msg_size_t)vm_size; 2449 mach_vm_address_t vm_addr = vm_page_size; 2450 kern_return_t kr; 2451 2452 while (slowpath(kr = mach_vm_allocate(mach_task_self(), &vm_addr, vm_size, 2453 VM_FLAGS_ANYWHERE))) { 2454 if (kr != KERN_NO_SPACE) { 2455 (void)dispatch_assume_zero(kr); 2456 DISPATCH_CLIENT_CRASH("Could not allocate mach msg receive buffer"); 2457 } 2458 _dispatch_temporary_resource_shortage(); 2459 vm_addr = vm_page_size; 2460 } 2461 _dispatch_mach_recv_kevent.ext[0] = (uintptr_t)vm_addr; 2462 _dispatch_mach_recv_kevent.ext[1] = vm_size; 2463} 2464 2465static inline void* 2466_dispatch_get_mach_recv_msg_buf(void) 2467{ 2468 return (void*)_dispatch_mach_recv_kevent.ext[0]; 2469} 2470 2471static void 2472_dispatch_mach_recv_portset_init(void *context DISPATCH_UNUSED) 2473{ 2474 kern_return_t kr; 2475 2476 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET, 2477 &_dispatch_mach_recv_portset); 2478 DISPATCH_VERIFY_MIG(kr); 2479 if (dispatch_assume_zero(kr)) { 2480 DISPATCH_CLIENT_CRASH( 2481 "mach_port_allocate() failed: cannot create port set"); 2482 } 2483 dispatch_assert(_dispatch_get_mach_recv_msg_buf()); 2484 dispatch_assert(dispatch_mach_trailer_size == 2485 REQUESTED_TRAILER_SIZE_NATIVE(MACH_RCV_TRAILER_ELEMENTS( 2486 DISPATCH_MACH_RCV_TRAILER))); 2487 _dispatch_mach_recv_kevent.ident = _dispatch_mach_recv_portset; 2488 _dispatch_kq_update(&_dispatch_mach_recv_kevent); 2489 2490 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE, 2491 &_dispatch_mach_notify_port); 2492 DISPATCH_VERIFY_MIG(kr); 2493 if (dispatch_assume_zero(kr)) { 2494 DISPATCH_CLIENT_CRASH( 2495 "mach_port_allocate() failed: cannot create receive right"); 2496 } 2497 _dispatch_mach_notify_source = dispatch_source_create( 2498 &_dispatch_source_type_mach_recv_direct, 2499 _dispatch_mach_notify_port, 0, &_dispatch_mgr_q); 2500 static const struct dispatch_continuation_s dc = { 2501 .dc_func = (void*)_dispatch_mach_notify_source_invoke, 2502 }; 2503 _dispatch_mach_notify_source->ds_refs->ds_handler[DS_EVENT_HANDLER] = 2504 (dispatch_continuation_t)&dc; 2505 dispatch_assert(_dispatch_mach_notify_source); 2506 dispatch_resume(_dispatch_mach_notify_source); 2507} 2508 2509static mach_port_t 2510_dispatch_get_mach_recv_portset(void) 2511{ 2512 static dispatch_once_t pred; 2513 dispatch_once_f(&pred, NULL, _dispatch_mach_recv_portset_init); 2514 return _dispatch_mach_recv_portset; 2515} 2516 2517static void 2518_dispatch_mach_portset_init(void *context DISPATCH_UNUSED) 2519{ 2520 struct kevent64_s kev = { 2521 .filter = EVFILT_MACHPORT, 2522 .flags = EV_ADD, 2523 }; 2524 kern_return_t kr; 2525 2526 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET, 2527 &_dispatch_mach_portset); 2528 DISPATCH_VERIFY_MIG(kr); 2529 if (dispatch_assume_zero(kr)) { 2530 DISPATCH_CLIENT_CRASH( 2531 "mach_port_allocate() failed: cannot create port set"); 2532 } 2533 kev.ident = _dispatch_mach_portset; 2534 _dispatch_kq_update(&kev); 2535} 2536 2537static mach_port_t 2538_dispatch_get_mach_portset(void) 2539{ 2540 static dispatch_once_t pred; 2541 dispatch_once_f(&pred, NULL, _dispatch_mach_portset_init); 2542 return _dispatch_mach_portset; 2543} 2544 2545static kern_return_t 2546_dispatch_mach_portset_update(dispatch_kevent_t dk, mach_port_t mps) 2547{ 2548 mach_port_t mp = (mach_port_t)dk->dk_kevent.ident; 2549 kern_return_t kr; 2550 2551 _dispatch_debug_machport(mp); 2552 kr = mach_port_move_member(mach_task_self(), mp, mps); 2553 if (slowpath(kr)) { 2554 DISPATCH_VERIFY_MIG(kr); 2555 switch (kr) { 2556 case KERN_INVALID_RIGHT: 2557 if (mps) { 2558 _dispatch_bug_mach_client("_dispatch_kevent_machport_enable: " 2559 "mach_port_move_member() failed ", kr); 2560 break; 2561 } 2562 //fall through 2563 case KERN_INVALID_NAME: 2564#if DISPATCH_DEBUG 2565 _dispatch_log("Corruption: Mach receive right 0x%x destroyed " 2566 "prematurely", mp); 2567#endif 2568 break; 2569 default: 2570 (void)dispatch_assume_zero(kr); 2571 break; 2572 } 2573 } 2574 return mps ? kr : 0; 2575} 2576 2577static void 2578_dispatch_kevent_mach_recv_reenable(struct kevent64_s *ke DISPATCH_UNUSED) 2579{ 2580#if (TARGET_IPHONE_SIMULATOR && \ 2581 IPHONE_SIMULATOR_HOST_MIN_VERSION_REQUIRED < 1090) || \ 2582 (!TARGET_OS_IPHONE && __MAC_OS_X_VERSION_MIN_REQUIRED < 1090) 2583 // delete and re-add kevent to workaround <rdar://problem/13924256> 2584 if (ke->ext[1] != _dispatch_mach_recv_kevent.ext[1]) { 2585 struct kevent64_s kev = _dispatch_mach_recv_kevent; 2586 kev.flags = EV_DELETE; 2587 _dispatch_kq_update(&kev); 2588 } 2589#endif 2590 _dispatch_mgr_kevent_reenable(&_dispatch_mach_recv_kevent); 2591} 2592 2593static kern_return_t 2594_dispatch_kevent_machport_resume(dispatch_kevent_t dk, uint32_t new_flags, 2595 uint32_t del_flags) 2596{ 2597 kern_return_t kr = 0; 2598 dispatch_assert_zero(new_flags & del_flags); 2599 if ((new_flags & _DISPATCH_MACH_RECV_FLAGS) || 2600 (del_flags & _DISPATCH_MACH_RECV_FLAGS)) { 2601 mach_port_t mps; 2602 if (new_flags & _DISPATCH_MACH_RECV_DIRECT_FLAGS) { 2603 mps = _dispatch_get_mach_recv_portset(); 2604 } else if ((new_flags & DISPATCH_MACH_RECV_MESSAGE) || 2605 ((del_flags & _DISPATCH_MACH_RECV_DIRECT_FLAGS) && 2606 (dk->dk_kevent.fflags & DISPATCH_MACH_RECV_MESSAGE))) { 2607 mps = _dispatch_get_mach_portset(); 2608 } else { 2609 mps = MACH_PORT_NULL; 2610 } 2611 kr = _dispatch_mach_portset_update(dk, mps); 2612 } 2613 return kr; 2614} 2615 2616static kern_return_t 2617_dispatch_kevent_mach_notify_resume(dispatch_kevent_t dk, uint32_t new_flags, 2618 uint32_t del_flags) 2619{ 2620 kern_return_t kr = 0; 2621 dispatch_assert_zero(new_flags & del_flags); 2622 if ((new_flags & _DISPATCH_MACH_SP_FLAGS) || 2623 (del_flags & _DISPATCH_MACH_SP_FLAGS)) { 2624 // Requesting a (delayed) non-sync send-possible notification 2625 // registers for both immediate dead-name notification and delayed-arm 2626 // send-possible notification for the port. 2627 // The send-possible notification is armed when a mach_msg() with the 2628 // the MACH_SEND_NOTIFY to the port times out. 2629 // If send-possible is unavailable, fall back to immediate dead-name 2630 // registration rdar://problem/2527840&9008724 2631 kr = _dispatch_mach_notify_update(dk, new_flags, del_flags, 2632 _DISPATCH_MACH_SP_FLAGS, MACH_NOTIFY_SEND_POSSIBLE, 2633 MACH_NOTIFY_SEND_POSSIBLE == MACH_NOTIFY_DEAD_NAME ? 1 : 0); 2634 } 2635 return kr; 2636} 2637 2638static inline void 2639_dispatch_kevent_mach_portset(struct kevent64_s *ke) 2640{ 2641 if (ke->ident == _dispatch_mach_recv_portset) { 2642 return _dispatch_kevent_mach_msg_drain(ke); 2643 } else if (ke->ident == _dispatch_mach_portset) { 2644 return _dispatch_kevent_machport_drain(ke); 2645 } else { 2646 return _dispatch_kevent_error(ke); 2647 } 2648} 2649 2650DISPATCH_NOINLINE 2651static void 2652_dispatch_kevent_machport_drain(struct kevent64_s *ke) 2653{ 2654 mach_port_t name = (mach_port_name_t)ke->data; 2655 dispatch_kevent_t dk; 2656 struct kevent64_s kev; 2657 2658 _dispatch_debug_machport(name); 2659 dk = _dispatch_kevent_find(name, EVFILT_MACHPORT); 2660 if (!dispatch_assume(dk)) { 2661 return; 2662 } 2663 _dispatch_mach_portset_update(dk, MACH_PORT_NULL); // emulate EV_DISPATCH 2664 2665 EV_SET64(&kev, name, EVFILT_MACHPORT, EV_ADD|EV_ENABLE|EV_DISPATCH, 2666 DISPATCH_MACH_RECV_MESSAGE, 0, (uintptr_t)dk, 0, 0); 2667 _dispatch_kevent_debug(&kev, __func__); 2668 _dispatch_kevent_merge(&kev); 2669} 2670 2671DISPATCH_NOINLINE 2672static void 2673_dispatch_kevent_mach_msg_drain(struct kevent64_s *ke) 2674{ 2675 mach_msg_header_t *hdr = (mach_msg_header_t*)ke->ext[0]; 2676 mach_msg_size_t siz, msgsiz; 2677 mach_msg_return_t kr = (mach_msg_return_t)ke->fflags; 2678 2679 _dispatch_kevent_mach_recv_reenable(ke); 2680 if (!dispatch_assume(hdr)) { 2681 DISPATCH_CRASH("EVFILT_MACHPORT with no message"); 2682 } 2683 if (fastpath(!kr)) { 2684 return _dispatch_kevent_mach_msg_recv(hdr); 2685 } else if (kr != MACH_RCV_TOO_LARGE) { 2686 goto out; 2687 } 2688 if (!dispatch_assume(ke->ext[1] <= UINT_MAX - 2689 dispatch_mach_trailer_size)) { 2690 DISPATCH_CRASH("EVFILT_MACHPORT with overlarge message"); 2691 } 2692 siz = (mach_msg_size_t)ke->ext[1] + dispatch_mach_trailer_size; 2693 hdr = malloc(siz); 2694 if (ke->data) { 2695 if (!dispatch_assume(hdr)) { 2696 // Kernel will discard message too large to fit 2697 hdr = _dispatch_get_mach_recv_msg_buf(); 2698 siz = _dispatch_mach_recv_msg_buf_size; 2699 } 2700 mach_port_t name = (mach_port_name_t)ke->data; 2701 const mach_msg_option_t options = ((DISPATCH_MACH_RCV_OPTIONS | 2702 MACH_RCV_TIMEOUT) & ~MACH_RCV_LARGE); 2703 kr = mach_msg(hdr, options, 0, siz, name, MACH_MSG_TIMEOUT_NONE, 2704 MACH_PORT_NULL); 2705 if (fastpath(!kr)) { 2706 return _dispatch_kevent_mach_msg_recv(hdr); 2707 } else if (kr == MACH_RCV_TOO_LARGE) { 2708 _dispatch_log("BUG in libdispatch client: " 2709 "_dispatch_kevent_mach_msg_drain: dropped message too " 2710 "large to fit in memory: id = 0x%x, size = %lld", 2711 hdr->msgh_id, ke->ext[1]); 2712 kr = MACH_MSG_SUCCESS; 2713 } 2714 } else { 2715 // We don't know which port in the portset contains the large message, 2716 // so need to receive all messages pending on the portset to ensure the 2717 // large message is drained. <rdar://problem/13950432> 2718 bool received = false; 2719 for (;;) { 2720 if (!dispatch_assume(hdr)) { 2721 DISPATCH_CLIENT_CRASH("Message too large to fit in memory"); 2722 } 2723 const mach_msg_option_t options = (DISPATCH_MACH_RCV_OPTIONS | 2724 MACH_RCV_TIMEOUT); 2725 kr = mach_msg(hdr, options, 0, siz, _dispatch_mach_recv_portset, 2726 MACH_MSG_TIMEOUT_NONE, MACH_PORT_NULL); 2727 if ((!kr || kr == MACH_RCV_TOO_LARGE) && !dispatch_assume( 2728 hdr->msgh_size <= UINT_MAX - dispatch_mach_trailer_size)) { 2729 DISPATCH_CRASH("Overlarge message"); 2730 } 2731 if (fastpath(!kr)) { 2732 msgsiz = hdr->msgh_size + dispatch_mach_trailer_size; 2733 if (msgsiz < siz) { 2734 void *shrink = realloc(hdr, msgsiz); 2735 if (shrink) hdr = shrink; 2736 } 2737 _dispatch_kevent_mach_msg_recv(hdr); 2738 hdr = NULL; 2739 received = true; 2740 } else if (kr == MACH_RCV_TOO_LARGE) { 2741 siz = hdr->msgh_size + dispatch_mach_trailer_size; 2742 } else { 2743 if (kr == MACH_RCV_TIMED_OUT && received) { 2744 kr = MACH_MSG_SUCCESS; 2745 } 2746 break; 2747 } 2748 hdr = reallocf(hdr, siz); 2749 } 2750 } 2751 if (hdr != _dispatch_get_mach_recv_msg_buf()) { 2752 free(hdr); 2753 } 2754out: 2755 if (slowpath(kr)) { 2756 _dispatch_bug_mach_client("_dispatch_kevent_mach_msg_drain: " 2757 "message reception failed", kr); 2758 } 2759} 2760 2761static void 2762_dispatch_kevent_mach_msg_recv(mach_msg_header_t *hdr) 2763{ 2764 dispatch_source_refs_t dri; 2765 dispatch_kevent_t dk; 2766 mach_port_t name = hdr->msgh_local_port; 2767 mach_msg_size_t siz = hdr->msgh_size + dispatch_mach_trailer_size; 2768 2769 if (!dispatch_assume(hdr->msgh_size <= UINT_MAX - 2770 dispatch_mach_trailer_size)) { 2771 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: " 2772 "received overlarge message"); 2773 return _dispatch_kevent_mach_msg_destroy(hdr); 2774 } 2775 if (!dispatch_assume(name)) { 2776 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: " 2777 "received message with MACH_PORT_NULL port"); 2778 return _dispatch_kevent_mach_msg_destroy(hdr); 2779 } 2780 _dispatch_debug_machport(name); 2781 dk = _dispatch_kevent_find(name, EVFILT_MACHPORT); 2782 if (!dispatch_assume(dk)) { 2783 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: " 2784 "received message with unknown kevent"); 2785 return _dispatch_kevent_mach_msg_destroy(hdr); 2786 } 2787 _dispatch_kevent_debug(&dk->dk_kevent, __func__); 2788 TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) { 2789 dispatch_source_t dsi = _dispatch_source_from_refs(dri); 2790 if (dsi->ds_pending_data_mask & _DISPATCH_MACH_RECV_DIRECT_FLAGS) { 2791 return _dispatch_source_merge_mach_msg(dsi, dri, dk, hdr, siz); 2792 } 2793 } 2794 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: " 2795 "received message with no listeners"); 2796 return _dispatch_kevent_mach_msg_destroy(hdr); 2797} 2798 2799static void 2800_dispatch_kevent_mach_msg_destroy(mach_msg_header_t *hdr) 2801{ 2802 if (hdr) { 2803 mach_msg_destroy(hdr); 2804 if (hdr != _dispatch_get_mach_recv_msg_buf()) { 2805 free(hdr); 2806 } 2807 } 2808} 2809 2810static void 2811_dispatch_source_merge_mach_msg(dispatch_source_t ds, dispatch_source_refs_t dr, 2812 dispatch_kevent_t dk, mach_msg_header_t *hdr, mach_msg_size_t siz) 2813{ 2814 if (ds == _dispatch_mach_notify_source) { 2815 _dispatch_mach_notify_source_invoke(hdr); 2816 return _dispatch_kevent_mach_msg_destroy(hdr); 2817 } 2818 dispatch_mach_reply_refs_t dmr = NULL; 2819 if (dk->dk_kevent.fflags & DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE) { 2820 dmr = (dispatch_mach_reply_refs_t)dr; 2821 } 2822 return _dispatch_mach_msg_recv((dispatch_mach_t)ds, dmr, hdr, siz); 2823} 2824 2825DISPATCH_ALWAYS_INLINE 2826static inline void 2827_dispatch_mach_notify_merge(mach_port_t name, uint32_t flag, bool final) 2828{ 2829 dispatch_source_refs_t dri, dr_next; 2830 dispatch_kevent_t dk; 2831 struct kevent64_s kev; 2832 bool unreg; 2833 2834 dk = _dispatch_kevent_find(name, DISPATCH_EVFILT_MACH_NOTIFICATION); 2835 if (!dk) { 2836 return; 2837 } 2838 2839 // Update notification registration state. 2840 dk->dk_kevent.data &= ~_DISPATCH_MACH_SP_FLAGS; 2841 EV_SET64(&kev, name, DISPATCH_EVFILT_MACH_NOTIFICATION, EV_ADD|EV_ENABLE, 2842 flag, 0, (uintptr_t)dk, 0, 0); 2843 if (final) { 2844 // This can never happen again 2845 unreg = true; 2846 } else { 2847 // Re-register for notification before delivery 2848 unreg = _dispatch_kevent_resume(dk, flag, 0); 2849 } 2850 DISPATCH_MACH_KEVENT_ARMED(dk) = 0; 2851 TAILQ_FOREACH_SAFE(dri, &dk->dk_sources, dr_list, dr_next) { 2852 dispatch_source_t dsi = _dispatch_source_from_refs(dri); 2853 if (dx_type(dsi) == DISPATCH_MACH_CHANNEL_TYPE) { 2854 dispatch_mach_t dm = (dispatch_mach_t)dsi; 2855 _dispatch_mach_merge_kevent(dm, &kev); 2856 if (unreg && dm->dm_dkev) { 2857 _dispatch_mach_kevent_unregister(dm); 2858 } 2859 } else { 2860 _dispatch_source_merge_kevent(dsi, &kev); 2861 if (unreg) { 2862 _dispatch_source_kevent_unregister(dsi); 2863 } 2864 } 2865 if (!dr_next || DISPATCH_MACH_KEVENT_ARMED(dk)) { 2866 // current merge is last in list (dk might have been freed) 2867 // or it re-armed the notification 2868 return; 2869 } 2870 } 2871} 2872 2873static kern_return_t 2874_dispatch_mach_notify_update(dispatch_kevent_t dk, uint32_t new_flags, 2875 uint32_t del_flags, uint32_t mask, mach_msg_id_t notify_msgid, 2876 mach_port_mscount_t notify_sync) 2877{ 2878 mach_port_t previous, port = (mach_port_t)dk->dk_kevent.ident; 2879 typeof(dk->dk_kevent.data) prev = dk->dk_kevent.data; 2880 kern_return_t kr, krr = 0; 2881 2882 // Update notification registration state. 2883 dk->dk_kevent.data |= (new_flags | dk->dk_kevent.fflags) & mask; 2884 dk->dk_kevent.data &= ~(del_flags & mask); 2885 2886 _dispatch_debug_machport(port); 2887 if ((dk->dk_kevent.data & mask) && !(prev & mask)) { 2888 // initialize _dispatch_mach_notify_port: 2889 (void)_dispatch_get_mach_recv_portset(); 2890 _dispatch_debug("machport[0x%08x]: registering for send-possible " 2891 "notification", port); 2892 previous = MACH_PORT_NULL; 2893 krr = mach_port_request_notification(mach_task_self(), port, 2894 notify_msgid, notify_sync, _dispatch_mach_notify_port, 2895 MACH_MSG_TYPE_MAKE_SEND_ONCE, &previous); 2896 DISPATCH_VERIFY_MIG(krr); 2897 2898 switch(krr) { 2899 case KERN_INVALID_NAME: 2900 case KERN_INVALID_RIGHT: 2901 // Supress errors & clear registration state 2902 dk->dk_kevent.data &= ~mask; 2903 break; 2904 default: 2905 // Else, we dont expect any errors from mach. Log any errors 2906 if (dispatch_assume_zero(krr)) { 2907 // log the error & clear registration state 2908 dk->dk_kevent.data &= ~mask; 2909 } else if (dispatch_assume_zero(previous)) { 2910 // Another subsystem has beat libdispatch to requesting the 2911 // specified Mach notification on this port. We should 2912 // technically cache the previous port and message it when the 2913 // kernel messages our port. Or we can just say screw those 2914 // subsystems and deallocate the previous port. 2915 // They should adopt libdispatch :-P 2916 kr = mach_port_deallocate(mach_task_self(), previous); 2917 DISPATCH_VERIFY_MIG(kr); 2918 (void)dispatch_assume_zero(kr); 2919 previous = MACH_PORT_NULL; 2920 } 2921 } 2922 } else if (!(dk->dk_kevent.data & mask) && (prev & mask)) { 2923 _dispatch_debug("machport[0x%08x]: unregistering for send-possible " 2924 "notification", port); 2925 previous = MACH_PORT_NULL; 2926 kr = mach_port_request_notification(mach_task_self(), port, 2927 notify_msgid, notify_sync, MACH_PORT_NULL, 2928 MACH_MSG_TYPE_MOVE_SEND_ONCE, &previous); 2929 DISPATCH_VERIFY_MIG(kr); 2930 2931 switch (kr) { 2932 case KERN_INVALID_NAME: 2933 case KERN_INVALID_RIGHT: 2934 case KERN_INVALID_ARGUMENT: 2935 break; 2936 default: 2937 if (dispatch_assume_zero(kr)) { 2938 // log the error 2939 } 2940 } 2941 } else { 2942 return 0; 2943 } 2944 if (slowpath(previous)) { 2945 // the kernel has not consumed the send-once right yet 2946 (void)dispatch_assume_zero( 2947 _dispatch_send_consume_send_once_right(previous)); 2948 } 2949 return krr; 2950} 2951 2952static void 2953_dispatch_mach_host_notify_update(void *context DISPATCH_UNUSED) 2954{ 2955 (void)_dispatch_get_mach_recv_portset(); 2956 _dispatch_debug("registering for calendar-change notification"); 2957 kern_return_t kr = host_request_notification(_dispatch_get_mach_host_port(), 2958 HOST_NOTIFY_CALENDAR_CHANGE, _dispatch_mach_notify_port); 2959 DISPATCH_VERIFY_MIG(kr); 2960 (void)dispatch_assume_zero(kr); 2961} 2962 2963static void 2964_dispatch_mach_host_calendar_change_register(void) 2965{ 2966 static dispatch_once_t pred; 2967 dispatch_once_f(&pred, NULL, _dispatch_mach_host_notify_update); 2968} 2969 2970static void 2971_dispatch_mach_notify_source_invoke(mach_msg_header_t *hdr) 2972{ 2973 mig_reply_error_t reply; 2974 dispatch_assert(sizeof(mig_reply_error_t) == sizeof(union 2975 __ReplyUnion___dispatch_libdispatch_internal_protocol_subsystem)); 2976 dispatch_assert(sizeof(mig_reply_error_t) < _dispatch_mach_recv_msg_size); 2977 boolean_t success = libdispatch_internal_protocol_server(hdr, &reply.Head); 2978 if (!success && reply.RetCode == MIG_BAD_ID && hdr->msgh_id == 950) { 2979 // host_notify_reply.defs: host_calendar_changed 2980 _dispatch_debug("calendar-change notification"); 2981 _dispatch_timers_calendar_change(); 2982 _dispatch_mach_host_notify_update(NULL); 2983 success = TRUE; 2984 reply.RetCode = KERN_SUCCESS; 2985 } 2986 if (dispatch_assume(success) && reply.RetCode != MIG_NO_REPLY) { 2987 (void)dispatch_assume_zero(reply.RetCode); 2988 } 2989} 2990 2991kern_return_t 2992_dispatch_mach_notify_port_deleted(mach_port_t notify DISPATCH_UNUSED, 2993 mach_port_name_t name) 2994{ 2995#if DISPATCH_DEBUG 2996 _dispatch_log("Corruption: Mach send/send-once/dead-name right 0x%x " 2997 "deleted prematurely", name); 2998#endif 2999 3000 _dispatch_debug_machport(name); 3001 _dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_DELETED, true); 3002 3003 return KERN_SUCCESS; 3004} 3005 3006kern_return_t 3007_dispatch_mach_notify_dead_name(mach_port_t notify DISPATCH_UNUSED, 3008 mach_port_name_t name) 3009{ 3010 kern_return_t kr; 3011 3012 _dispatch_debug("machport[0x%08x]: dead-name notification", name); 3013 _dispatch_debug_machport(name); 3014 _dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_DEAD, true); 3015 3016 // the act of receiving a dead name notification allocates a dead-name 3017 // right that must be deallocated 3018 kr = mach_port_deallocate(mach_task_self(), name); 3019 DISPATCH_VERIFY_MIG(kr); 3020 //(void)dispatch_assume_zero(kr); 3021 3022 return KERN_SUCCESS; 3023} 3024 3025kern_return_t 3026_dispatch_mach_notify_send_possible(mach_port_t notify DISPATCH_UNUSED, 3027 mach_port_name_t name) 3028{ 3029 _dispatch_debug("machport[0x%08x]: send-possible notification", name); 3030 _dispatch_debug_machport(name); 3031 _dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_POSSIBLE, false); 3032 3033 return KERN_SUCCESS; 3034} 3035 3036#pragma mark - 3037#pragma mark dispatch_mach_t 3038 3039#define DISPATCH_MACH_NEVER_CONNECTED (UINT32_MAX/2) 3040#define DISPATCH_MACH_REGISTER_FOR_REPLY 0x2 3041#define DISPATCH_MACH_OPTIONS_MASK 0xffff 3042 3043static mach_port_t _dispatch_mach_msg_get_remote_port(dispatch_object_t dou); 3044static void _dispatch_mach_msg_disconnected(dispatch_mach_t dm, 3045 mach_port_t local_port, mach_port_t remote_port); 3046static dispatch_mach_msg_t _dispatch_mach_msg_create_reply_disconnected( 3047 dispatch_object_t dou, dispatch_mach_reply_refs_t dmr); 3048static bool _dispatch_mach_reconnect_invoke(dispatch_mach_t dm, 3049 dispatch_object_t dou); 3050static inline mach_msg_header_t* _dispatch_mach_msg_get_msg( 3051 dispatch_mach_msg_t dmsg); 3052static void _dispatch_mach_push(dispatch_object_t dm, dispatch_object_t dou, 3053 pthread_priority_t pp); 3054 3055static dispatch_mach_t 3056_dispatch_mach_create(const char *label, dispatch_queue_t q, void *context, 3057 dispatch_mach_handler_function_t handler, bool handler_is_block) 3058{ 3059 dispatch_mach_t dm; 3060 dispatch_mach_refs_t dr; 3061 3062 dm = _dispatch_alloc(DISPATCH_VTABLE(mach), 3063 sizeof(struct dispatch_mach_s)); 3064 _dispatch_queue_init((dispatch_queue_t)dm); 3065 dm->dq_label = label; 3066 3067 dm->do_ref_cnt++; // the reference _dispatch_mach_cancel_invoke holds 3068 dm->do_ref_cnt++; // since channel is created suspended 3069 dm->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_INTERVAL; 3070 dm->do_targetq = &_dispatch_mgr_q; 3071 3072 dr = _dispatch_calloc(1ul, sizeof(struct dispatch_mach_refs_s)); 3073 dr->dr_source_wref = _dispatch_ptr2wref(dm); 3074 dr->dm_handler_func = handler; 3075 dr->dm_handler_ctxt = context; 3076 dm->ds_refs = dr; 3077 dm->dm_handler_is_block = handler_is_block; 3078 3079 dm->dm_refs = _dispatch_calloc(1ul, 3080 sizeof(struct dispatch_mach_send_refs_s)); 3081 dm->dm_refs->dr_source_wref = _dispatch_ptr2wref(dm); 3082 dm->dm_refs->dm_disconnect_cnt = DISPATCH_MACH_NEVER_CONNECTED; 3083 TAILQ_INIT(&dm->dm_refs->dm_replies); 3084 3085 // First item on the channel sets the user-specified target queue 3086 dispatch_set_target_queue(dm, q); 3087 _dispatch_object_debug(dm, "%s", __func__); 3088 return dm; 3089} 3090 3091dispatch_mach_t 3092dispatch_mach_create(const char *label, dispatch_queue_t q, 3093 dispatch_mach_handler_t handler) 3094{ 3095 dispatch_block_t bb = _dispatch_Block_copy((void*)handler); 3096 return _dispatch_mach_create(label, q, bb, 3097 (dispatch_mach_handler_function_t)_dispatch_Block_invoke(bb), true); 3098} 3099 3100dispatch_mach_t 3101dispatch_mach_create_f(const char *label, dispatch_queue_t q, void *context, 3102 dispatch_mach_handler_function_t handler) 3103{ 3104 return _dispatch_mach_create(label, q, context, handler, false); 3105} 3106 3107void 3108_dispatch_mach_dispose(dispatch_mach_t dm) 3109{ 3110 _dispatch_object_debug(dm, "%s", __func__); 3111 dispatch_mach_refs_t dr = dm->ds_refs; 3112 if (dm->dm_handler_is_block && dr->dm_handler_ctxt) { 3113 Block_release(dr->dm_handler_ctxt); 3114 } 3115 free(dr); 3116 free(dm->dm_refs); 3117 _dispatch_queue_destroy(dm); 3118} 3119 3120void 3121dispatch_mach_connect(dispatch_mach_t dm, mach_port_t receive, 3122 mach_port_t send, dispatch_mach_msg_t checkin) 3123{ 3124 dispatch_mach_send_refs_t dr = dm->dm_refs; 3125 dispatch_kevent_t dk; 3126 3127 if (MACH_PORT_VALID(receive)) { 3128 dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s)); 3129 dk->dk_kevent = _dispatch_source_type_mach_recv_direct.ke; 3130 dk->dk_kevent.ident = receive; 3131 dk->dk_kevent.flags |= EV_ADD|EV_ENABLE; 3132 dk->dk_kevent.udata = (uintptr_t)dk; 3133 TAILQ_INIT(&dk->dk_sources); 3134 dm->ds_dkev = dk; 3135 dm->ds_pending_data_mask = dk->dk_kevent.fflags; 3136 _dispatch_retain(dm); // the reference the manager queue holds 3137 } 3138 dr->dm_send = send; 3139 if (MACH_PORT_VALID(send)) { 3140 if (checkin) { 3141 dispatch_retain(checkin); 3142 mach_msg_option_t options = _dispatch_mach_checkin_options(); 3143 _dispatch_mach_msg_set_options(checkin, options); 3144 dr->dm_checkin_port = _dispatch_mach_msg_get_remote_port(checkin); 3145 } 3146 dr->dm_checkin = checkin; 3147 } 3148 // monitor message reply ports 3149 dm->ds_pending_data_mask |= DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE; 3150 if (slowpath(!dispatch_atomic_cmpxchg2o(dr, dm_disconnect_cnt, 3151 DISPATCH_MACH_NEVER_CONNECTED, 0, release))) { 3152 DISPATCH_CLIENT_CRASH("Channel already connected"); 3153 } 3154 _dispatch_object_debug(dm, "%s", __func__); 3155 return dispatch_resume(dm); 3156} 3157 3158DISPATCH_NOINLINE 3159static void 3160_dispatch_mach_reply_kevent_unregister(dispatch_mach_t dm, 3161 dispatch_mach_reply_refs_t dmr, bool disconnected) 3162{ 3163 dispatch_mach_msg_t dmsgr = NULL; 3164 if (disconnected) { 3165 dmsgr = _dispatch_mach_msg_create_reply_disconnected(NULL, dmr); 3166 } 3167 dispatch_kevent_t dk = dmr->dmr_dkev; 3168 TAILQ_REMOVE(&dk->dk_sources, (dispatch_source_refs_t)dmr, dr_list); 3169 _dispatch_kevent_unregister(dk, DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE); 3170 TAILQ_REMOVE(&dm->dm_refs->dm_replies, dmr, dmr_list); 3171 if (dmr->dmr_voucher) _voucher_release(dmr->dmr_voucher); 3172 free(dmr); 3173 if (dmsgr) _dispatch_mach_push(dm, dmsgr, dmsgr->dmsg_priority); 3174} 3175 3176DISPATCH_NOINLINE 3177static void 3178_dispatch_mach_reply_kevent_register(dispatch_mach_t dm, mach_port_t reply, 3179 dispatch_mach_msg_t dmsg) 3180{ 3181 dispatch_kevent_t dk; 3182 dispatch_mach_reply_refs_t dmr; 3183 3184 dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s)); 3185 dk->dk_kevent = _dispatch_source_type_mach_recv_direct.ke; 3186 dk->dk_kevent.ident = reply; 3187 dk->dk_kevent.flags |= EV_ADD|EV_ENABLE; 3188 dk->dk_kevent.fflags = DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE; 3189 dk->dk_kevent.udata = (uintptr_t)dk; 3190 TAILQ_INIT(&dk->dk_sources); 3191 3192 dmr = _dispatch_calloc(1ul, sizeof(struct dispatch_mach_reply_refs_s)); 3193 dmr->dr_source_wref = _dispatch_ptr2wref(dm); 3194 dmr->dmr_dkev = dk; 3195 if (dmsg->dmsg_voucher) { 3196 dmr->dmr_voucher =_voucher_retain(dmsg->dmsg_voucher); 3197 } 3198 dmr->dmr_priority = dmsg->dmsg_priority; 3199 // make reply context visible to leaks rdar://11777199 3200 dmr->dmr_ctxt = dmsg->do_ctxt; 3201 3202 _dispatch_debug("machport[0x%08x]: registering for reply, ctxt %p", reply, 3203 dmsg->do_ctxt); 3204 uint32_t flags; 3205 bool do_resume = _dispatch_kevent_register(&dmr->dmr_dkev, &flags); 3206 TAILQ_INSERT_TAIL(&dmr->dmr_dkev->dk_sources, (dispatch_source_refs_t)dmr, 3207 dr_list); 3208 TAILQ_INSERT_TAIL(&dm->dm_refs->dm_replies, dmr, dmr_list); 3209 if (do_resume && _dispatch_kevent_resume(dmr->dmr_dkev, flags, 0)) { 3210 _dispatch_mach_reply_kevent_unregister(dm, dmr, true); 3211 } 3212} 3213 3214DISPATCH_NOINLINE 3215static void 3216_dispatch_mach_kevent_unregister(dispatch_mach_t dm) 3217{ 3218 dispatch_kevent_t dk = dm->dm_dkev; 3219 dm->dm_dkev = NULL; 3220 TAILQ_REMOVE(&dk->dk_sources, (dispatch_source_refs_t)dm->dm_refs, 3221 dr_list); 3222 dm->ds_pending_data_mask &= ~(unsigned long) 3223 (DISPATCH_MACH_SEND_POSSIBLE|DISPATCH_MACH_SEND_DEAD); 3224 _dispatch_kevent_unregister(dk, 3225 DISPATCH_MACH_SEND_POSSIBLE|DISPATCH_MACH_SEND_DEAD); 3226} 3227 3228DISPATCH_NOINLINE 3229static void 3230_dispatch_mach_kevent_register(dispatch_mach_t dm, mach_port_t send) 3231{ 3232 dispatch_kevent_t dk; 3233 3234 dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s)); 3235 dk->dk_kevent = _dispatch_source_type_mach_send.ke; 3236 dk->dk_kevent.ident = send; 3237 dk->dk_kevent.flags |= EV_ADD|EV_ENABLE; 3238 dk->dk_kevent.fflags = DISPATCH_MACH_SEND_POSSIBLE|DISPATCH_MACH_SEND_DEAD; 3239 dk->dk_kevent.udata = (uintptr_t)dk; 3240 TAILQ_INIT(&dk->dk_sources); 3241 3242 dm->ds_pending_data_mask |= dk->dk_kevent.fflags; 3243 3244 uint32_t flags; 3245 bool do_resume = _dispatch_kevent_register(&dk, &flags); 3246 TAILQ_INSERT_TAIL(&dk->dk_sources, 3247 (dispatch_source_refs_t)dm->dm_refs, dr_list); 3248 dm->dm_dkev = dk; 3249 if (do_resume && _dispatch_kevent_resume(dm->dm_dkev, flags, 0)) { 3250 _dispatch_mach_kevent_unregister(dm); 3251 } 3252} 3253 3254static inline void 3255_dispatch_mach_push(dispatch_object_t dm, dispatch_object_t dou, 3256 pthread_priority_t pp) 3257{ 3258 return _dispatch_queue_push(dm._dq, dou, pp); 3259} 3260 3261static inline void 3262_dispatch_mach_msg_set_options(dispatch_object_t dou, mach_msg_option_t options) 3263{ 3264 dou._do->do_suspend_cnt = (unsigned int)options; 3265} 3266 3267static inline mach_msg_option_t 3268_dispatch_mach_msg_get_options(dispatch_object_t dou) 3269{ 3270 mach_msg_option_t options = (mach_msg_option_t)dou._do->do_suspend_cnt; 3271 return options; 3272} 3273 3274static inline void 3275_dispatch_mach_msg_set_reason(dispatch_object_t dou, mach_error_t err, 3276 unsigned long reason) 3277{ 3278 dispatch_assert_zero(reason & ~(unsigned long)code_emask); 3279 dou._do->do_suspend_cnt = (unsigned int)((err || !reason) ? err : 3280 err_local|err_sub(0x3e0)|(mach_error_t)reason); 3281} 3282 3283static inline unsigned long 3284_dispatch_mach_msg_get_reason(dispatch_object_t dou, mach_error_t *err_ptr) 3285{ 3286 mach_error_t err = (mach_error_t)dou._do->do_suspend_cnt; 3287 dou._do->do_suspend_cnt = 0; 3288 if ((err & system_emask) == err_local && err_get_sub(err) == 0x3e0) { 3289 *err_ptr = 0; 3290 return err_get_code(err); 3291 } 3292 *err_ptr = err; 3293 return err ? DISPATCH_MACH_MESSAGE_SEND_FAILED : DISPATCH_MACH_MESSAGE_SENT; 3294} 3295 3296static void 3297_dispatch_mach_msg_recv(dispatch_mach_t dm, dispatch_mach_reply_refs_t dmr, 3298 mach_msg_header_t *hdr, mach_msg_size_t siz) 3299{ 3300 _dispatch_debug_machport(hdr->msgh_remote_port); 3301 _dispatch_debug("machport[0x%08x]: received msg id 0x%x, reply on 0x%08x", 3302 hdr->msgh_local_port, hdr->msgh_id, hdr->msgh_remote_port); 3303 if (slowpath(dm->ds_atomic_flags & DSF_CANCELED)) { 3304 return _dispatch_kevent_mach_msg_destroy(hdr); 3305 } 3306 dispatch_mach_msg_t dmsg; 3307 voucher_t voucher; 3308 pthread_priority_t priority; 3309 void *ctxt = NULL; 3310 if (dmr) { 3311 _voucher_mach_msg_clear(hdr, false); // deallocate reply message voucher 3312 voucher = dmr->dmr_voucher; 3313 dmr->dmr_voucher = NULL; // transfer reference 3314 priority = dmr->dmr_priority; 3315 ctxt = dmr->dmr_ctxt; 3316 _dispatch_mach_reply_kevent_unregister(dm, dmr, false); 3317 } else { 3318 voucher = voucher_create_with_mach_msg(hdr); 3319 priority = _voucher_get_priority(voucher); 3320 } 3321 dispatch_mach_msg_destructor_t destructor; 3322 destructor = (hdr == _dispatch_get_mach_recv_msg_buf()) ? 3323 DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT : 3324 DISPATCH_MACH_MSG_DESTRUCTOR_FREE; 3325 dmsg = dispatch_mach_msg_create(hdr, siz, destructor, NULL); 3326 dmsg->dmsg_voucher = voucher; 3327 dmsg->dmsg_priority = priority; 3328 dmsg->do_ctxt = ctxt; 3329 _dispatch_mach_msg_set_reason(dmsg, 0, DISPATCH_MACH_MESSAGE_RECEIVED); 3330 _dispatch_voucher_debug("mach-msg[%p] create", voucher, dmsg); 3331 _dispatch_voucher_ktrace_dmsg_push(dmsg); 3332 return _dispatch_mach_push(dm, dmsg, dmsg->dmsg_priority); 3333} 3334 3335static inline mach_port_t 3336_dispatch_mach_msg_get_remote_port(dispatch_object_t dou) 3337{ 3338 mach_msg_header_t *hdr = _dispatch_mach_msg_get_msg(dou._dmsg); 3339 mach_port_t remote = hdr->msgh_remote_port; 3340 return remote; 3341} 3342 3343static inline void 3344_dispatch_mach_msg_disconnected(dispatch_mach_t dm, mach_port_t local_port, 3345 mach_port_t remote_port) 3346{ 3347 mach_msg_header_t *hdr; 3348 dispatch_mach_msg_t dmsg; 3349 dmsg = dispatch_mach_msg_create(NULL, sizeof(mach_msg_header_t), 3350 DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT, &hdr); 3351 if (local_port) hdr->msgh_local_port = local_port; 3352 if (remote_port) hdr->msgh_remote_port = remote_port; 3353 _dispatch_mach_msg_set_reason(dmsg, 0, DISPATCH_MACH_DISCONNECTED); 3354 return _dispatch_mach_push(dm, dmsg, dmsg->dmsg_priority); 3355} 3356 3357static inline dispatch_mach_msg_t 3358_dispatch_mach_msg_create_reply_disconnected(dispatch_object_t dou, 3359 dispatch_mach_reply_refs_t dmr) 3360{ 3361 dispatch_mach_msg_t dmsg = dou._dmsg, dmsgr; 3362 if (dmsg && !dmsg->dmsg_reply) return NULL; 3363 mach_msg_header_t *hdr; 3364 dmsgr = dispatch_mach_msg_create(NULL, sizeof(mach_msg_header_t), 3365 DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT, &hdr); 3366 if (dmsg) { 3367 hdr->msgh_local_port = dmsg->dmsg_reply; 3368 if (dmsg->dmsg_voucher) { 3369 dmsgr->dmsg_voucher = _voucher_retain(dmsg->dmsg_voucher); 3370 } 3371 dmsgr->dmsg_priority = dmsg->dmsg_priority; 3372 dmsgr->do_ctxt = dmsg->do_ctxt; 3373 } else { 3374 hdr->msgh_local_port = (mach_port_t)dmr->dmr_dkev->dk_kevent.ident; 3375 dmsgr->dmsg_voucher = dmr->dmr_voucher; 3376 dmr->dmr_voucher = NULL; // transfer reference 3377 dmsgr->dmsg_priority = dmr->dmr_priority; 3378 dmsgr->do_ctxt = dmr->dmr_ctxt; 3379 } 3380 _dispatch_mach_msg_set_reason(dmsgr, 0, DISPATCH_MACH_DISCONNECTED); 3381 return dmsgr; 3382} 3383 3384DISPATCH_NOINLINE 3385static void 3386_dispatch_mach_msg_not_sent(dispatch_mach_t dm, dispatch_object_t dou) 3387{ 3388 dispatch_mach_msg_t dmsg = dou._dmsg, dmsgr; 3389 dmsgr = _dispatch_mach_msg_create_reply_disconnected(dmsg, NULL); 3390 _dispatch_mach_msg_set_reason(dmsg, 0, DISPATCH_MACH_MESSAGE_NOT_SENT); 3391 _dispatch_mach_push(dm, dmsg, dmsg->dmsg_priority); 3392 if (dmsgr) _dispatch_mach_push(dm, dmsgr, dmsgr->dmsg_priority); 3393} 3394 3395DISPATCH_NOINLINE 3396static dispatch_object_t 3397_dispatch_mach_msg_send(dispatch_mach_t dm, dispatch_object_t dou) 3398{ 3399 dispatch_mach_send_refs_t dr = dm->dm_refs; 3400 dispatch_mach_msg_t dmsg = dou._dmsg, dmsgr = NULL; 3401 voucher_t voucher = dmsg->dmsg_voucher; 3402 mach_voucher_t ipc_kvoucher = MACH_VOUCHER_NULL; 3403 bool clear_voucher = false, kvoucher_move_send = false; 3404 dr->dm_needs_mgr = 0; 3405 if (slowpath(dr->dm_checkin) && dmsg != dr->dm_checkin) { 3406 // send initial checkin message 3407 if (dm->dm_dkev && slowpath(_dispatch_queue_get_current() != 3408 &_dispatch_mgr_q)) { 3409 // send kevent must be uninstalled on the manager queue 3410 dr->dm_needs_mgr = 1; 3411 goto out; 3412 } 3413 dr->dm_checkin = _dispatch_mach_msg_send(dm, dr->dm_checkin)._dmsg; 3414 if (slowpath(dr->dm_checkin)) { 3415 goto out; 3416 } 3417 } 3418 mach_msg_header_t *msg = _dispatch_mach_msg_get_msg(dmsg); 3419 mach_msg_return_t kr = 0; 3420 mach_port_t reply = dmsg->dmsg_reply; 3421 mach_msg_option_t opts = 0, msg_opts = _dispatch_mach_msg_get_options(dmsg); 3422 if (!slowpath(msg_opts & DISPATCH_MACH_REGISTER_FOR_REPLY)) { 3423 opts = MACH_SEND_MSG | (msg_opts & ~DISPATCH_MACH_OPTIONS_MASK); 3424 if (MACH_MSGH_BITS_REMOTE(msg->msgh_bits) != 3425 MACH_MSG_TYPE_MOVE_SEND_ONCE) { 3426 if (dmsg != dr->dm_checkin) { 3427 msg->msgh_remote_port = dr->dm_send; 3428 } 3429 if (_dispatch_queue_get_current() == &_dispatch_mgr_q) { 3430 if (slowpath(!dm->dm_dkev)) { 3431 _dispatch_mach_kevent_register(dm, msg->msgh_remote_port); 3432 } 3433 if (fastpath(dm->dm_dkev)) { 3434 if (DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev)) { 3435 goto out; 3436 } 3437 opts |= MACH_SEND_NOTIFY; 3438 } 3439 } 3440 opts |= MACH_SEND_TIMEOUT; 3441 if (dmsg->dmsg_priority != _voucher_get_priority(voucher)) { 3442 ipc_kvoucher = _voucher_create_mach_voucher_with_priority( 3443 voucher, dmsg->dmsg_priority); 3444 } 3445 _dispatch_voucher_debug("mach-msg[%p] msg_set", voucher, dmsg); 3446 if (ipc_kvoucher) { 3447 kvoucher_move_send = true; 3448 clear_voucher = _voucher_mach_msg_set_mach_voucher(msg, 3449 ipc_kvoucher, kvoucher_move_send); 3450 } else { 3451 clear_voucher = _voucher_mach_msg_set(msg, voucher); 3452 } 3453 } 3454 _voucher_activity_trace_msg(voucher, msg, send); 3455 _dispatch_debug_machport(msg->msgh_remote_port); 3456 if (reply) _dispatch_debug_machport(reply); 3457 kr = mach_msg(msg, opts, msg->msgh_size, 0, MACH_PORT_NULL, 0, 3458 MACH_PORT_NULL); 3459 _dispatch_debug("machport[0x%08x]: sent msg id 0x%x, ctxt %p, " 3460 "opts 0x%x, msg_opts 0x%x, kvoucher 0x%08x, reply on 0x%08x: " 3461 "%s - 0x%x", msg->msgh_remote_port, msg->msgh_id, dmsg->do_ctxt, 3462 opts, msg_opts, msg->msgh_voucher_port, reply, 3463 mach_error_string(kr), kr); 3464 if (clear_voucher) { 3465 if (kr == MACH_SEND_INVALID_VOUCHER && msg->msgh_voucher_port) { 3466 DISPATCH_CRASH("Voucher port corruption"); 3467 } 3468 mach_voucher_t kv; 3469 kv = _voucher_mach_msg_clear(msg, kvoucher_move_send); 3470 if (kvoucher_move_send) ipc_kvoucher = kv; 3471 } 3472 } 3473 if (kr == MACH_SEND_TIMED_OUT && (opts & MACH_SEND_TIMEOUT)) { 3474 if (opts & MACH_SEND_NOTIFY) { 3475 _dispatch_debug("machport[0x%08x]: send-possible notification " 3476 "armed", (mach_port_t)dm->dm_dkev->dk_kevent.ident); 3477 DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev) = 1; 3478 } else { 3479 // send kevent must be installed on the manager queue 3480 dr->dm_needs_mgr = 1; 3481 } 3482 if (ipc_kvoucher) { 3483 _dispatch_kvoucher_debug("reuse on re-send", ipc_kvoucher); 3484 voucher_t ipc_voucher; 3485 ipc_voucher = _voucher_create_with_priority_and_mach_voucher( 3486 voucher, dmsg->dmsg_priority, ipc_kvoucher); 3487 _dispatch_voucher_debug("mach-msg[%p] replace voucher[%p]", 3488 ipc_voucher, dmsg, voucher); 3489 if (dmsg->dmsg_voucher) _voucher_release(dmsg->dmsg_voucher); 3490 dmsg->dmsg_voucher = ipc_voucher; 3491 } 3492 goto out; 3493 } else if (ipc_kvoucher && (kr || !kvoucher_move_send)) { 3494 _voucher_dealloc_mach_voucher(ipc_kvoucher); 3495 } 3496 if (fastpath(!kr) && reply && 3497 !(dm->ds_dkev && dm->ds_dkev->dk_kevent.ident == reply)) { 3498 if (_dispatch_queue_get_current() != &_dispatch_mgr_q) { 3499 // reply receive kevent must be installed on the manager queue 3500 dr->dm_needs_mgr = 1; 3501 _dispatch_mach_msg_set_options(dmsg, msg_opts | 3502 DISPATCH_MACH_REGISTER_FOR_REPLY); 3503 goto out; 3504 } 3505 _dispatch_mach_reply_kevent_register(dm, reply, dmsg); 3506 } 3507 if (slowpath(dmsg == dr->dm_checkin) && dm->dm_dkev) { 3508 _dispatch_mach_kevent_unregister(dm); 3509 } 3510 if (slowpath(kr)) { 3511 // Send failed, so reply was never connected <rdar://problem/14309159> 3512 dmsgr = _dispatch_mach_msg_create_reply_disconnected(dmsg, NULL); 3513 } 3514 _dispatch_mach_msg_set_reason(dmsg, kr, 0); 3515 _dispatch_mach_push(dm, dmsg, dmsg->dmsg_priority); 3516 if (dmsgr) _dispatch_mach_push(dm, dmsgr, dmsgr->dmsg_priority); 3517 dmsg = NULL; 3518out: 3519 return (dispatch_object_t)dmsg; 3520} 3521 3522DISPATCH_ALWAYS_INLINE 3523static inline void 3524_dispatch_mach_send_push_wakeup(dispatch_mach_t dm, dispatch_object_t dou, 3525 bool wakeup) 3526{ 3527 dispatch_mach_send_refs_t dr = dm->dm_refs; 3528 struct dispatch_object_s *prev, *dc = dou._do; 3529 dc->do_next = NULL; 3530 3531 prev = dispatch_atomic_xchg2o(dr, dm_tail, dc, release); 3532 if (fastpath(prev)) { 3533 prev->do_next = dc; 3534 } else { 3535 dr->dm_head = dc; 3536 } 3537 if (wakeup || !prev) { 3538 _dispatch_wakeup(dm); 3539 } 3540} 3541 3542DISPATCH_ALWAYS_INLINE 3543static inline void 3544_dispatch_mach_send_push(dispatch_mach_t dm, dispatch_object_t dou) 3545{ 3546 return _dispatch_mach_send_push_wakeup(dm, dou, false); 3547} 3548 3549DISPATCH_NOINLINE 3550static void 3551_dispatch_mach_send_drain(dispatch_mach_t dm) 3552{ 3553 dispatch_mach_send_refs_t dr = dm->dm_refs; 3554 struct dispatch_object_s *dc = NULL, *next_dc = NULL; 3555 while (dr->dm_tail) { 3556 _dispatch_wait_until(dc = fastpath(dr->dm_head)); 3557 do { 3558 next_dc = fastpath(dc->do_next); 3559 dr->dm_head = next_dc; 3560 if (!next_dc && !dispatch_atomic_cmpxchg2o(dr, dm_tail, dc, NULL, 3561 relaxed)) { 3562 _dispatch_wait_until(next_dc = fastpath(dc->do_next)); 3563 dr->dm_head = next_dc; 3564 } 3565 if (!DISPATCH_OBJ_IS_VTABLE(dc)) { 3566 if ((long)dc->do_vtable & DISPATCH_OBJ_BARRIER_BIT) { 3567 // send barrier 3568 // leave send queue locked until barrier has completed 3569 return _dispatch_mach_push(dm, dc, 3570 ((dispatch_continuation_t)dc)->dc_priority); 3571 } 3572#if DISPATCH_MACH_SEND_SYNC 3573 if (slowpath((long)dc->do_vtable & DISPATCH_OBJ_SYNC_SLOW_BIT)){ 3574 _dispatch_thread_semaphore_signal( 3575 (_dispatch_thread_semaphore_t)dc->do_ctxt); 3576 continue; 3577 } 3578#endif // DISPATCH_MACH_SEND_SYNC 3579 if (slowpath(!_dispatch_mach_reconnect_invoke(dm, dc))) { 3580 goto out; 3581 } 3582 continue; 3583 } 3584 _dispatch_voucher_ktrace_dmsg_pop((dispatch_mach_msg_t)dc); 3585 if (slowpath(dr->dm_disconnect_cnt) || 3586 slowpath(dm->ds_atomic_flags & DSF_CANCELED)) { 3587 _dispatch_mach_msg_not_sent(dm, dc); 3588 continue; 3589 } 3590 if (slowpath(dc = _dispatch_mach_msg_send(dm, dc)._do)) { 3591 goto out; 3592 } 3593 } while ((dc = next_dc)); 3594 } 3595out: 3596 // if this is not a complete drain, we must undo some things 3597 if (slowpath(dc)) { 3598 if (!next_dc && 3599 !dispatch_atomic_cmpxchg2o(dr, dm_tail, NULL, dc, relaxed)) { 3600 // wait for enqueue slow path to finish 3601 _dispatch_wait_until(next_dc = fastpath(dr->dm_head)); 3602 dc->do_next = next_dc; 3603 } 3604 dr->dm_head = dc; 3605 } 3606 (void)dispatch_atomic_dec2o(dr, dm_sending, release); 3607 _dispatch_wakeup(dm); 3608} 3609 3610static inline void 3611_dispatch_mach_send(dispatch_mach_t dm) 3612{ 3613 dispatch_mach_send_refs_t dr = dm->dm_refs; 3614 if (!fastpath(dr->dm_tail) || !fastpath(dispatch_atomic_cmpxchg2o(dr, 3615 dm_sending, 0, 1, acquire))) { 3616 return; 3617 } 3618 _dispatch_object_debug(dm, "%s", __func__); 3619 _dispatch_mach_send_drain(dm); 3620} 3621 3622DISPATCH_NOINLINE 3623static void 3624_dispatch_mach_merge_kevent(dispatch_mach_t dm, const struct kevent64_s *ke) 3625{ 3626 if (!(ke->fflags & dm->ds_pending_data_mask)) { 3627 return; 3628 } 3629 _dispatch_mach_send(dm); 3630} 3631 3632static inline mach_msg_option_t 3633_dispatch_mach_checkin_options(void) 3634{ 3635 mach_msg_option_t options = 0; 3636#if DISPATCH_USE_CHECKIN_NOIMPORTANCE 3637 options = MACH_SEND_NOIMPORTANCE; // <rdar://problem/16996737> 3638#endif 3639 return options; 3640} 3641 3642 3643static inline mach_msg_option_t 3644_dispatch_mach_send_options(void) 3645{ 3646 mach_msg_option_t options = 0; 3647 return options; 3648} 3649 3650DISPATCH_NOINLINE 3651void 3652dispatch_mach_send(dispatch_mach_t dm, dispatch_mach_msg_t dmsg, 3653 mach_msg_option_t options) 3654{ 3655 dispatch_mach_send_refs_t dr = dm->dm_refs; 3656 if (slowpath(dmsg->do_next != DISPATCH_OBJECT_LISTLESS)) { 3657 DISPATCH_CLIENT_CRASH("Message already enqueued"); 3658 } 3659 dispatch_retain(dmsg); 3660 dispatch_assert_zero(options & DISPATCH_MACH_OPTIONS_MASK); 3661 options |= _dispatch_mach_send_options(); 3662 _dispatch_mach_msg_set_options(dmsg, options & ~DISPATCH_MACH_OPTIONS_MASK); 3663 mach_msg_header_t *msg = _dispatch_mach_msg_get_msg(dmsg); 3664 dmsg->dmsg_reply = (MACH_MSGH_BITS_LOCAL(msg->msgh_bits) == 3665 MACH_MSG_TYPE_MAKE_SEND_ONCE && 3666 MACH_PORT_VALID(msg->msgh_local_port) ? msg->msgh_local_port : 3667 MACH_PORT_NULL); 3668 bool is_reply = (MACH_MSGH_BITS_REMOTE(msg->msgh_bits) == 3669 MACH_MSG_TYPE_MOVE_SEND_ONCE); 3670 dmsg->dmsg_priority = _dispatch_priority_propagate(); 3671 dmsg->dmsg_voucher = _voucher_copy(); 3672 _dispatch_voucher_debug("mach-msg[%p] set", dmsg->dmsg_voucher, dmsg); 3673 if ((!is_reply && slowpath(dr->dm_tail)) || 3674 slowpath(dr->dm_disconnect_cnt) || 3675 slowpath(dm->ds_atomic_flags & DSF_CANCELED) || 3676 slowpath(!dispatch_atomic_cmpxchg2o(dr, dm_sending, 0, 1, 3677 acquire))) { 3678 _dispatch_voucher_ktrace_dmsg_push(dmsg); 3679 return _dispatch_mach_send_push(dm, dmsg); 3680 } 3681 if (slowpath(dmsg = _dispatch_mach_msg_send(dm, dmsg)._dmsg)) { 3682 (void)dispatch_atomic_dec2o(dr, dm_sending, release); 3683 _dispatch_voucher_ktrace_dmsg_push(dmsg); 3684 return _dispatch_mach_send_push_wakeup(dm, dmsg, true); 3685 } 3686 if (!is_reply && slowpath(dr->dm_tail)) { 3687 return _dispatch_mach_send_drain(dm); 3688 } 3689 (void)dispatch_atomic_dec2o(dr, dm_sending, release); 3690 _dispatch_wakeup(dm); 3691} 3692 3693static void 3694_dispatch_mach_disconnect(dispatch_mach_t dm) 3695{ 3696 dispatch_mach_send_refs_t dr = dm->dm_refs; 3697 if (dm->dm_dkev) { 3698 _dispatch_mach_kevent_unregister(dm); 3699 } 3700 if (MACH_PORT_VALID(dr->dm_send)) { 3701 _dispatch_mach_msg_disconnected(dm, MACH_PORT_NULL, dr->dm_send); 3702 } 3703 dr->dm_send = MACH_PORT_NULL; 3704 if (dr->dm_checkin) { 3705 _dispatch_mach_msg_not_sent(dm, dr->dm_checkin); 3706 dr->dm_checkin = NULL; 3707 } 3708 if (!TAILQ_EMPTY(&dm->dm_refs->dm_replies)) { 3709 dispatch_mach_reply_refs_t dmr, tmp; 3710 TAILQ_FOREACH_SAFE(dmr, &dm->dm_refs->dm_replies, dmr_list, tmp){ 3711 _dispatch_mach_reply_kevent_unregister(dm, dmr, true); 3712 } 3713 } 3714} 3715 3716DISPATCH_NOINLINE 3717static bool 3718_dispatch_mach_cancel(dispatch_mach_t dm) 3719{ 3720 dispatch_mach_send_refs_t dr = dm->dm_refs; 3721 if (!fastpath(dispatch_atomic_cmpxchg2o(dr, dm_sending, 0, 1, acquire))) { 3722 return false; 3723 } 3724 _dispatch_object_debug(dm, "%s", __func__); 3725 _dispatch_mach_disconnect(dm); 3726 if (dm->ds_dkev) { 3727 mach_port_t local_port = (mach_port_t)dm->ds_dkev->dk_kevent.ident; 3728 _dispatch_source_kevent_unregister((dispatch_source_t)dm); 3729 _dispatch_mach_msg_disconnected(dm, local_port, MACH_PORT_NULL); 3730 } 3731 (void)dispatch_atomic_dec2o(dr, dm_sending, release); 3732 return true; 3733} 3734 3735DISPATCH_NOINLINE 3736static bool 3737_dispatch_mach_reconnect_invoke(dispatch_mach_t dm, dispatch_object_t dou) 3738{ 3739 if (dm->dm_dkev || !TAILQ_EMPTY(&dm->dm_refs->dm_replies)) { 3740 if (slowpath(_dispatch_queue_get_current() != &_dispatch_mgr_q)) { 3741 // send/reply kevents must be uninstalled on the manager queue 3742 return false; 3743 } 3744 } 3745 _dispatch_mach_disconnect(dm); 3746 dispatch_mach_send_refs_t dr = dm->dm_refs; 3747 dr->dm_checkin = dou._dc->dc_data; 3748 dr->dm_send = (mach_port_t)dou._dc->dc_other; 3749 _dispatch_continuation_free(dou._dc); 3750 (void)dispatch_atomic_dec2o(dr, dm_disconnect_cnt, relaxed); 3751 _dispatch_object_debug(dm, "%s", __func__); 3752 return true; 3753} 3754 3755DISPATCH_NOINLINE 3756void 3757dispatch_mach_reconnect(dispatch_mach_t dm, mach_port_t send, 3758 dispatch_mach_msg_t checkin) 3759{ 3760 dispatch_mach_send_refs_t dr = dm->dm_refs; 3761 (void)dispatch_atomic_inc2o(dr, dm_disconnect_cnt, relaxed); 3762 if (MACH_PORT_VALID(send) && checkin) { 3763 dispatch_retain(checkin); 3764 mach_msg_option_t options = _dispatch_mach_checkin_options(); 3765 _dispatch_mach_msg_set_options(checkin, options); 3766 dr->dm_checkin_port = _dispatch_mach_msg_get_remote_port(checkin); 3767 } else { 3768 checkin = NULL; 3769 dr->dm_checkin_port = MACH_PORT_NULL; 3770 } 3771 dispatch_continuation_t dc = _dispatch_continuation_alloc(); 3772 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT); 3773 dc->dc_func = (void*)_dispatch_mach_reconnect_invoke; 3774 dc->dc_ctxt = dc; 3775 dc->dc_data = checkin; 3776 dc->dc_other = (void*)(uintptr_t)send; 3777 return _dispatch_mach_send_push(dm, dc); 3778} 3779 3780#if DISPATCH_MACH_SEND_SYNC 3781DISPATCH_NOINLINE 3782static void 3783_dispatch_mach_send_sync_slow(dispatch_mach_t dm) 3784{ 3785 _dispatch_thread_semaphore_t sema = _dispatch_get_thread_semaphore(); 3786 struct dispatch_object_s dc = { 3787 .do_vtable = (void *)(DISPATCH_OBJ_SYNC_SLOW_BIT), 3788 .do_ctxt = (void*)sema, 3789 }; 3790 _dispatch_mach_send_push(dm, &dc); 3791 _dispatch_thread_semaphore_wait(sema); 3792 _dispatch_put_thread_semaphore(sema); 3793} 3794#endif // DISPATCH_MACH_SEND_SYNC 3795 3796DISPATCH_NOINLINE 3797mach_port_t 3798dispatch_mach_get_checkin_port(dispatch_mach_t dm) 3799{ 3800 dispatch_mach_send_refs_t dr = dm->dm_refs; 3801 if (slowpath(dm->ds_atomic_flags & DSF_CANCELED)) { 3802 return MACH_PORT_DEAD; 3803 } 3804 return dr->dm_checkin_port; 3805} 3806 3807DISPATCH_NOINLINE 3808static void 3809_dispatch_mach_connect_invoke(dispatch_mach_t dm) 3810{ 3811 dispatch_mach_refs_t dr = dm->ds_refs; 3812 _dispatch_client_callout4(dr->dm_handler_ctxt, 3813 DISPATCH_MACH_CONNECTED, NULL, 0, dr->dm_handler_func); 3814 dm->dm_connect_handler_called = 1; 3815} 3816 3817DISPATCH_NOINLINE 3818void 3819_dispatch_mach_msg_invoke(dispatch_mach_msg_t dmsg) 3820{ 3821 dispatch_mach_t dm = (dispatch_mach_t)_dispatch_queue_get_current(); 3822 dispatch_mach_refs_t dr = dm->ds_refs; 3823 mach_error_t err; 3824 unsigned long reason = _dispatch_mach_msg_get_reason(dmsg, &err); 3825 3826 dmsg->do_next = DISPATCH_OBJECT_LISTLESS; 3827 _dispatch_thread_setspecific(dispatch_queue_key, dm->do_targetq); 3828 _dispatch_voucher_ktrace_dmsg_pop(dmsg); 3829 _dispatch_voucher_debug("mach-msg[%p] adopt", dmsg->dmsg_voucher, dmsg); 3830 _dispatch_adopt_priority_and_replace_voucher(dmsg->dmsg_priority, 3831 dmsg->dmsg_voucher, DISPATCH_PRIORITY_ENFORCE); 3832 dmsg->dmsg_voucher = NULL; 3833 if (slowpath(!dm->dm_connect_handler_called)) { 3834 _dispatch_mach_connect_invoke(dm); 3835 } 3836 _dispatch_client_callout4(dr->dm_handler_ctxt, reason, dmsg, err, 3837 dr->dm_handler_func); 3838 _dispatch_thread_setspecific(dispatch_queue_key, (dispatch_queue_t)dm); 3839 _dispatch_introspection_queue_item_complete(dmsg); 3840 dispatch_release(dmsg); 3841} 3842 3843DISPATCH_NOINLINE 3844void 3845_dispatch_mach_barrier_invoke(void *ctxt) 3846{ 3847 dispatch_mach_t dm = (dispatch_mach_t)_dispatch_queue_get_current(); 3848 dispatch_mach_refs_t dr = dm->ds_refs; 3849 struct dispatch_continuation_s *dc = ctxt; 3850 void *context = dc->dc_data; 3851 dispatch_function_t barrier = dc->dc_other; 3852 bool send_barrier = ((long)dc->do_vtable & DISPATCH_OBJ_BARRIER_BIT); 3853 3854 _dispatch_thread_setspecific(dispatch_queue_key, dm->do_targetq); 3855 if (slowpath(!dm->dm_connect_handler_called)) { 3856 _dispatch_mach_connect_invoke(dm); 3857 } 3858 _dispatch_client_callout(context, barrier); 3859 _dispatch_client_callout4(dr->dm_handler_ctxt, 3860 DISPATCH_MACH_BARRIER_COMPLETED, NULL, 0, dr->dm_handler_func); 3861 _dispatch_thread_setspecific(dispatch_queue_key, (dispatch_queue_t)dm); 3862 if (send_barrier) { 3863 (void)dispatch_atomic_dec2o(dm->dm_refs, dm_sending, release); 3864 } 3865} 3866 3867DISPATCH_NOINLINE 3868void 3869dispatch_mach_send_barrier_f(dispatch_mach_t dm, void *context, 3870 dispatch_function_t barrier) 3871{ 3872 dispatch_continuation_t dc = _dispatch_continuation_alloc(); 3873 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT); 3874 dc->dc_func = _dispatch_mach_barrier_invoke; 3875 dc->dc_ctxt = dc; 3876 dc->dc_data = context; 3877 dc->dc_other = barrier; 3878 _dispatch_continuation_voucher_set(dc, 0); 3879 _dispatch_continuation_priority_set(dc, 0, 0); 3880 3881 dispatch_mach_send_refs_t dr = dm->dm_refs; 3882 if (slowpath(dr->dm_tail) || slowpath(!dispatch_atomic_cmpxchg2o(dr, 3883 dm_sending, 0, 1, acquire))) { 3884 return _dispatch_mach_send_push(dm, dc); 3885 } 3886 // leave send queue locked until barrier has completed 3887 return _dispatch_mach_push(dm, dc, dc->dc_priority); 3888} 3889 3890DISPATCH_NOINLINE 3891void 3892dispatch_mach_receive_barrier_f(dispatch_mach_t dm, void *context, 3893 dispatch_function_t barrier) 3894{ 3895 dispatch_continuation_t dc = _dispatch_continuation_alloc(); 3896 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT); 3897 dc->dc_func = _dispatch_mach_barrier_invoke; 3898 dc->dc_ctxt = dc; 3899 dc->dc_data = context; 3900 dc->dc_other = barrier; 3901 _dispatch_continuation_voucher_set(dc, 0); 3902 _dispatch_continuation_priority_set(dc, 0, 0); 3903 3904 return _dispatch_mach_push(dm, dc, dc->dc_priority); 3905} 3906 3907DISPATCH_NOINLINE 3908void 3909dispatch_mach_send_barrier(dispatch_mach_t dm, dispatch_block_t barrier) 3910{ 3911 dispatch_mach_send_barrier_f(dm, _dispatch_Block_copy(barrier), 3912 _dispatch_call_block_and_release); 3913} 3914 3915DISPATCH_NOINLINE 3916void 3917dispatch_mach_receive_barrier(dispatch_mach_t dm, dispatch_block_t barrier) 3918{ 3919 dispatch_mach_receive_barrier_f(dm, _dispatch_Block_copy(barrier), 3920 _dispatch_call_block_and_release); 3921} 3922 3923DISPATCH_NOINLINE 3924static void 3925_dispatch_mach_cancel_invoke(dispatch_mach_t dm) 3926{ 3927 dispatch_mach_refs_t dr = dm->ds_refs; 3928 if (slowpath(!dm->dm_connect_handler_called)) { 3929 _dispatch_mach_connect_invoke(dm); 3930 } 3931 _dispatch_client_callout4(dr->dm_handler_ctxt, 3932 DISPATCH_MACH_CANCELED, NULL, 0, dr->dm_handler_func); 3933 dm->dm_cancel_handler_called = 1; 3934 _dispatch_release(dm); // the retain is done at creation time 3935} 3936 3937DISPATCH_NOINLINE 3938void 3939dispatch_mach_cancel(dispatch_mach_t dm) 3940{ 3941 dispatch_source_cancel((dispatch_source_t)dm); 3942} 3943 3944DISPATCH_ALWAYS_INLINE 3945static inline dispatch_queue_t 3946_dispatch_mach_invoke2(dispatch_object_t dou, 3947 _dispatch_thread_semaphore_t *sema_ptr DISPATCH_UNUSED) 3948{ 3949 dispatch_mach_t dm = dou._dm; 3950 3951 // This function performs all mach channel actions. Each action is 3952 // responsible for verifying that it takes place on the appropriate queue. 3953 // If the current queue is not the correct queue for this action, the 3954 // correct queue will be returned and the invoke will be re-driven on that 3955 // queue. 3956 3957 // The order of tests here in invoke and in probe should be consistent. 3958 3959 dispatch_queue_t dq = _dispatch_queue_get_current(); 3960 dispatch_mach_send_refs_t dr = dm->dm_refs; 3961 3962 if (slowpath(!dm->ds_is_installed)) { 3963 // The channel needs to be installed on the manager queue. 3964 if (dq != &_dispatch_mgr_q) { 3965 return &_dispatch_mgr_q; 3966 } 3967 if (dm->ds_dkev) { 3968 _dispatch_source_kevent_register((dispatch_source_t)dm); 3969 } 3970 dm->ds_is_installed = true; 3971 _dispatch_mach_send(dm); 3972 // Apply initial target queue change 3973 _dispatch_queue_drain(dou); 3974 if (dm->dq_items_tail) { 3975 return dm->do_targetq; 3976 } 3977 } else if (dm->dq_items_tail) { 3978 // The channel has pending messages to deliver to the target queue. 3979 if (dq != dm->do_targetq) { 3980 return dm->do_targetq; 3981 } 3982 dispatch_queue_t tq = dm->do_targetq; 3983 if (slowpath(_dispatch_queue_drain(dou))) { 3984 DISPATCH_CLIENT_CRASH("Sync onto mach channel"); 3985 } 3986 if (slowpath(tq != dm->do_targetq)) { 3987 // An item on the channel changed the target queue 3988 return dm->do_targetq; 3989 } 3990 } else if (dr->dm_sending) { 3991 // Sending and uninstallation below require the send lock, the channel 3992 // will be woken up when the lock is dropped <rdar://15132939&15203957> 3993 return NULL; 3994 } else if (dr->dm_tail) { 3995 if (slowpath(dr->dm_needs_mgr) || (slowpath(dr->dm_disconnect_cnt) && 3996 (dm->dm_dkev || !TAILQ_EMPTY(&dm->dm_refs->dm_replies)))) { 3997 // Send/reply kevents need to be installed or uninstalled 3998 if (dq != &_dispatch_mgr_q) { 3999 return &_dispatch_mgr_q; 4000 } 4001 } 4002 if (!(dm->dm_dkev && DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev)) || 4003 (dm->ds_atomic_flags & DSF_CANCELED) || dr->dm_disconnect_cnt) { 4004 // The channel has pending messages to send. 4005 _dispatch_mach_send(dm); 4006 } 4007 } else if (dm->ds_atomic_flags & DSF_CANCELED){ 4008 // The channel has been cancelled and needs to be uninstalled from the 4009 // manager queue. After uninstallation, the cancellation handler needs 4010 // to be delivered to the target queue. 4011 if (dm->ds_dkev || dm->dm_dkev || dr->dm_send || 4012 !TAILQ_EMPTY(&dm->dm_refs->dm_replies)) { 4013 if (dq != &_dispatch_mgr_q) { 4014 return &_dispatch_mgr_q; 4015 } 4016 if (!_dispatch_mach_cancel(dm)) { 4017 return NULL; 4018 } 4019 } 4020 if (!dm->dm_cancel_handler_called) { 4021 if (dq != dm->do_targetq) { 4022 return dm->do_targetq; 4023 } 4024 _dispatch_mach_cancel_invoke(dm); 4025 } 4026 } 4027 return NULL; 4028} 4029 4030DISPATCH_NOINLINE 4031void 4032_dispatch_mach_invoke(dispatch_mach_t dm) 4033{ 4034 _dispatch_queue_class_invoke(dm, _dispatch_mach_invoke2); 4035} 4036 4037unsigned long 4038_dispatch_mach_probe(dispatch_mach_t dm) 4039{ 4040 // This function determines whether the mach channel needs to be invoked. 4041 // The order of tests here in probe and in invoke should be consistent. 4042 4043 dispatch_mach_send_refs_t dr = dm->dm_refs; 4044 4045 if (slowpath(!dm->ds_is_installed)) { 4046 // The channel needs to be installed on the manager queue. 4047 return true; 4048 } else if (_dispatch_queue_class_probe(dm)) { 4049 // The source has pending messages to deliver to the target queue. 4050 return true; 4051 } else if (dr->dm_sending) { 4052 // Sending and uninstallation below require the send lock, the channel 4053 // will be woken up when the lock is dropped <rdar://15132939&15203957> 4054 return false; 4055 } else if (dr->dm_tail && 4056 (!(dm->dm_dkev && DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev)) || 4057 (dm->ds_atomic_flags & DSF_CANCELED) || dr->dm_disconnect_cnt)) { 4058 // The channel has pending messages to send. 4059 return true; 4060 } else if (dm->ds_atomic_flags & DSF_CANCELED) { 4061 if (dm->ds_dkev || dm->dm_dkev || dr->dm_send || 4062 !TAILQ_EMPTY(&dm->dm_refs->dm_replies) || 4063 !dm->dm_cancel_handler_called) { 4064 // The channel needs to be uninstalled from the manager queue, or 4065 // the cancellation handler needs to be delivered to the target 4066 // queue. 4067 return true; 4068 } 4069 } 4070 // Nothing to do. 4071 return false; 4072} 4073 4074#pragma mark - 4075#pragma mark dispatch_mach_msg_t 4076 4077dispatch_mach_msg_t 4078dispatch_mach_msg_create(mach_msg_header_t *msg, size_t size, 4079 dispatch_mach_msg_destructor_t destructor, mach_msg_header_t **msg_ptr) 4080{ 4081 if (slowpath(size < sizeof(mach_msg_header_t)) || 4082 slowpath(destructor && !msg)) { 4083 DISPATCH_CLIENT_CRASH("Empty message"); 4084 } 4085 dispatch_mach_msg_t dmsg = _dispatch_alloc(DISPATCH_VTABLE(mach_msg), 4086 sizeof(struct dispatch_mach_msg_s) + 4087 (destructor ? 0 : size - sizeof(dmsg->dmsg_msg))); 4088 if (destructor) { 4089 dmsg->dmsg_msg = msg; 4090 } else if (msg) { 4091 memcpy(dmsg->dmsg_buf, msg, size); 4092 } 4093 dmsg->do_next = DISPATCH_OBJECT_LISTLESS; 4094 dmsg->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT, 4095 false); 4096 dmsg->dmsg_destructor = destructor; 4097 dmsg->dmsg_size = size; 4098 if (msg_ptr) { 4099 *msg_ptr = _dispatch_mach_msg_get_msg(dmsg); 4100 } 4101 return dmsg; 4102} 4103 4104void 4105_dispatch_mach_msg_dispose(dispatch_mach_msg_t dmsg) 4106{ 4107 if (dmsg->dmsg_voucher) { 4108 _voucher_release(dmsg->dmsg_voucher); 4109 dmsg->dmsg_voucher = NULL; 4110 } 4111 switch (dmsg->dmsg_destructor) { 4112 case DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT: 4113 break; 4114 case DISPATCH_MACH_MSG_DESTRUCTOR_FREE: 4115 free(dmsg->dmsg_msg); 4116 break; 4117 case DISPATCH_MACH_MSG_DESTRUCTOR_VM_DEALLOCATE: { 4118 mach_vm_size_t vm_size = dmsg->dmsg_size; 4119 mach_vm_address_t vm_addr = (uintptr_t)dmsg->dmsg_msg; 4120 (void)dispatch_assume_zero(mach_vm_deallocate(mach_task_self(), 4121 vm_addr, vm_size)); 4122 break; 4123 }} 4124} 4125 4126static inline mach_msg_header_t* 4127_dispatch_mach_msg_get_msg(dispatch_mach_msg_t dmsg) 4128{ 4129 return dmsg->dmsg_destructor ? dmsg->dmsg_msg : 4130 (mach_msg_header_t*)dmsg->dmsg_buf; 4131} 4132 4133mach_msg_header_t* 4134dispatch_mach_msg_get_msg(dispatch_mach_msg_t dmsg, size_t *size_ptr) 4135{ 4136 if (size_ptr) { 4137 *size_ptr = dmsg->dmsg_size; 4138 } 4139 return _dispatch_mach_msg_get_msg(dmsg); 4140} 4141 4142size_t 4143_dispatch_mach_msg_debug(dispatch_mach_msg_t dmsg, char* buf, size_t bufsiz) 4144{ 4145 size_t offset = 0; 4146 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ", 4147 dx_kind(dmsg), dmsg); 4148 offset += dsnprintf(&buf[offset], bufsiz - offset, "xrefcnt = 0x%x, " 4149 "refcnt = 0x%x, ", dmsg->do_xref_cnt + 1, dmsg->do_ref_cnt + 1); 4150 offset += dsnprintf(&buf[offset], bufsiz - offset, "opts/err = 0x%x, " 4151 "msgh[%p] = { ", dmsg->do_suspend_cnt, dmsg->dmsg_buf); 4152 mach_msg_header_t *hdr = _dispatch_mach_msg_get_msg(dmsg); 4153 if (hdr->msgh_id) { 4154 offset += dsnprintf(&buf[offset], bufsiz - offset, "id 0x%x, ", 4155 hdr->msgh_id); 4156 } 4157 if (hdr->msgh_size) { 4158 offset += dsnprintf(&buf[offset], bufsiz - offset, "size %u, ", 4159 hdr->msgh_size); 4160 } 4161 if (hdr->msgh_bits) { 4162 offset += dsnprintf(&buf[offset], bufsiz - offset, "bits <l %u, r %u", 4163 MACH_MSGH_BITS_LOCAL(hdr->msgh_bits), 4164 MACH_MSGH_BITS_REMOTE(hdr->msgh_bits)); 4165 if (MACH_MSGH_BITS_OTHER(hdr->msgh_bits)) { 4166 offset += dsnprintf(&buf[offset], bufsiz - offset, ", o 0x%x", 4167 MACH_MSGH_BITS_OTHER(hdr->msgh_bits)); 4168 } 4169 offset += dsnprintf(&buf[offset], bufsiz - offset, ">, "); 4170 } 4171 if (hdr->msgh_local_port && hdr->msgh_remote_port) { 4172 offset += dsnprintf(&buf[offset], bufsiz - offset, "local 0x%x, " 4173 "remote 0x%x", hdr->msgh_local_port, hdr->msgh_remote_port); 4174 } else if (hdr->msgh_local_port) { 4175 offset += dsnprintf(&buf[offset], bufsiz - offset, "local 0x%x", 4176 hdr->msgh_local_port); 4177 } else if (hdr->msgh_remote_port) { 4178 offset += dsnprintf(&buf[offset], bufsiz - offset, "remote 0x%x", 4179 hdr->msgh_remote_port); 4180 } else { 4181 offset += dsnprintf(&buf[offset], bufsiz - offset, "no ports"); 4182 } 4183 offset += dsnprintf(&buf[offset], bufsiz - offset, " } }"); 4184 return offset; 4185} 4186 4187#pragma mark - 4188#pragma mark dispatch_mig_server 4189 4190mach_msg_return_t 4191dispatch_mig_server(dispatch_source_t ds, size_t maxmsgsz, 4192 dispatch_mig_callback_t callback) 4193{ 4194 mach_msg_options_t options = MACH_RCV_MSG | MACH_RCV_TIMEOUT 4195 | MACH_RCV_TRAILER_ELEMENTS(MACH_RCV_TRAILER_CTX) 4196 | MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0) | MACH_RCV_VOUCHER; 4197 mach_msg_options_t tmp_options; 4198 mig_reply_error_t *bufTemp, *bufRequest, *bufReply; 4199 mach_msg_return_t kr = 0; 4200 uint64_t assertion_token = 0; 4201 unsigned int cnt = 1000; // do not stall out serial queues 4202 boolean_t demux_success; 4203 bool received = false; 4204 size_t rcv_size = maxmsgsz + MAX_TRAILER_SIZE; 4205 4206 // XXX FIXME -- allocate these elsewhere 4207 bufRequest = alloca(rcv_size); 4208 bufReply = alloca(rcv_size); 4209 bufReply->Head.msgh_size = 0; 4210 bufRequest->RetCode = 0; 4211 4212#if DISPATCH_DEBUG 4213 options |= MACH_RCV_LARGE; // rdar://problem/8422992 4214#endif 4215 tmp_options = options; 4216 // XXX FIXME -- change this to not starve out the target queue 4217 for (;;) { 4218 if (DISPATCH_OBJECT_SUSPENDED(ds) || (--cnt == 0)) { 4219 options &= ~MACH_RCV_MSG; 4220 tmp_options &= ~MACH_RCV_MSG; 4221 4222 if (!(tmp_options & MACH_SEND_MSG)) { 4223 goto out; 4224 } 4225 } 4226 kr = mach_msg(&bufReply->Head, tmp_options, bufReply->Head.msgh_size, 4227 (mach_msg_size_t)rcv_size, (mach_port_t)ds->ds_ident_hack, 0,0); 4228 4229 tmp_options = options; 4230 4231 if (slowpath(kr)) { 4232 switch (kr) { 4233 case MACH_SEND_INVALID_DEST: 4234 case MACH_SEND_TIMED_OUT: 4235 if (bufReply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX) { 4236 mach_msg_destroy(&bufReply->Head); 4237 } 4238 break; 4239 case MACH_RCV_TIMED_OUT: 4240 // Don't return an error if a message was sent this time or 4241 // a message was successfully received previously 4242 // rdar://problems/7363620&7791738 4243 if(bufReply->Head.msgh_remote_port || received) { 4244 kr = MACH_MSG_SUCCESS; 4245 } 4246 break; 4247 case MACH_RCV_INVALID_NAME: 4248 break; 4249#if DISPATCH_DEBUG 4250 case MACH_RCV_TOO_LARGE: 4251 // receive messages that are too large and log their id and size 4252 // rdar://problem/8422992 4253 tmp_options &= ~MACH_RCV_LARGE; 4254 size_t large_size = bufReply->Head.msgh_size + MAX_TRAILER_SIZE; 4255 void *large_buf = malloc(large_size); 4256 if (large_buf) { 4257 rcv_size = large_size; 4258 bufReply = large_buf; 4259 } 4260 if (!mach_msg(&bufReply->Head, tmp_options, 0, 4261 (mach_msg_size_t)rcv_size, 4262 (mach_port_t)ds->ds_ident_hack, 0, 0)) { 4263 _dispatch_log("BUG in libdispatch client: " 4264 "dispatch_mig_server received message larger than " 4265 "requested size %zd: id = 0x%x, size = %d", 4266 maxmsgsz, bufReply->Head.msgh_id, 4267 bufReply->Head.msgh_size); 4268 } 4269 if (large_buf) { 4270 free(large_buf); 4271 } 4272 // fall through 4273#endif 4274 default: 4275 _dispatch_bug_mach_client( 4276 "dispatch_mig_server: mach_msg() failed", kr); 4277 break; 4278 } 4279 goto out; 4280 } 4281 4282 if (!(tmp_options & MACH_RCV_MSG)) { 4283 goto out; 4284 } 4285 4286 if (assertion_token) { 4287#if DISPATCH_USE_IMPORTANCE_ASSERTION 4288 int r = proc_importance_assertion_complete(assertion_token); 4289 (void)dispatch_assume_zero(r); 4290#endif 4291 assertion_token = 0; 4292 } 4293 received = true; 4294 4295 bufTemp = bufRequest; 4296 bufRequest = bufReply; 4297 bufReply = bufTemp; 4298 4299#if DISPATCH_USE_IMPORTANCE_ASSERTION 4300 int r = proc_importance_assertion_begin_with_msg(&bufRequest->Head, 4301 NULL, &assertion_token); 4302 if (r && slowpath(r != EIO)) { 4303 (void)dispatch_assume_zero(r); 4304 } 4305#endif 4306 _voucher_replace(voucher_create_with_mach_msg(&bufRequest->Head)); 4307 demux_success = callback(&bufRequest->Head, &bufReply->Head); 4308 4309 if (!demux_success) { 4310 // destroy the request - but not the reply port 4311 bufRequest->Head.msgh_remote_port = 0; 4312 mach_msg_destroy(&bufRequest->Head); 4313 } else if (!(bufReply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX)) { 4314 // if MACH_MSGH_BITS_COMPLEX is _not_ set, then bufReply->RetCode 4315 // is present 4316 if (slowpath(bufReply->RetCode)) { 4317 if (bufReply->RetCode == MIG_NO_REPLY) { 4318 continue; 4319 } 4320 4321 // destroy the request - but not the reply port 4322 bufRequest->Head.msgh_remote_port = 0; 4323 mach_msg_destroy(&bufRequest->Head); 4324 } 4325 } 4326 4327 if (bufReply->Head.msgh_remote_port) { 4328 tmp_options |= MACH_SEND_MSG; 4329 if (MACH_MSGH_BITS_REMOTE(bufReply->Head.msgh_bits) != 4330 MACH_MSG_TYPE_MOVE_SEND_ONCE) { 4331 tmp_options |= MACH_SEND_TIMEOUT; 4332 } 4333 } 4334 } 4335 4336out: 4337 if (assertion_token) { 4338#if DISPATCH_USE_IMPORTANCE_ASSERTION 4339 int r = proc_importance_assertion_complete(assertion_token); 4340 (void)dispatch_assume_zero(r); 4341#endif 4342 } 4343 4344 return kr; 4345} 4346 4347#endif /* HAVE_MACH */ 4348 4349#pragma mark - 4350#pragma mark dispatch_source_debug 4351 4352DISPATCH_NOINLINE 4353static const char * 4354_evfiltstr(short filt) 4355{ 4356 switch (filt) { 4357#define _evfilt2(f) case (f): return #f 4358 _evfilt2(EVFILT_READ); 4359 _evfilt2(EVFILT_WRITE); 4360 _evfilt2(EVFILT_AIO); 4361 _evfilt2(EVFILT_VNODE); 4362 _evfilt2(EVFILT_PROC); 4363 _evfilt2(EVFILT_SIGNAL); 4364 _evfilt2(EVFILT_TIMER); 4365#ifdef EVFILT_VM 4366 _evfilt2(EVFILT_VM); 4367#endif 4368#ifdef EVFILT_MEMORYSTATUS 4369 _evfilt2(EVFILT_MEMORYSTATUS); 4370#endif 4371#if HAVE_MACH 4372 _evfilt2(EVFILT_MACHPORT); 4373 _evfilt2(DISPATCH_EVFILT_MACH_NOTIFICATION); 4374#endif 4375 _evfilt2(EVFILT_FS); 4376 _evfilt2(EVFILT_USER); 4377 4378 _evfilt2(DISPATCH_EVFILT_TIMER); 4379 _evfilt2(DISPATCH_EVFILT_CUSTOM_ADD); 4380 _evfilt2(DISPATCH_EVFILT_CUSTOM_OR); 4381 default: 4382 return "EVFILT_missing"; 4383 } 4384} 4385 4386static size_t 4387_dispatch_source_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz) 4388{ 4389 dispatch_queue_t target = ds->do_targetq; 4390 return dsnprintf(buf, bufsiz, "target = %s[%p], ident = 0x%lx, " 4391 "pending_data = 0x%lx, pending_data_mask = 0x%lx, ", 4392 target && target->dq_label ? target->dq_label : "", target, 4393 ds->ds_ident_hack, ds->ds_pending_data, ds->ds_pending_data_mask); 4394} 4395 4396static size_t 4397_dispatch_timer_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz) 4398{ 4399 dispatch_source_refs_t dr = ds->ds_refs; 4400 return dsnprintf(buf, bufsiz, "timer = { target = 0x%llx, deadline = 0x%llx," 4401 " last_fire = 0x%llx, interval = 0x%llx, flags = 0x%lx }, ", 4402 ds_timer(dr).target, ds_timer(dr).deadline, ds_timer(dr).last_fire, 4403 ds_timer(dr).interval, ds_timer(dr).flags); 4404} 4405 4406size_t 4407_dispatch_source_debug(dispatch_source_t ds, char* buf, size_t bufsiz) 4408{ 4409 size_t offset = 0; 4410 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ", 4411 dx_kind(ds), ds); 4412 offset += _dispatch_object_debug_attr(ds, &buf[offset], bufsiz - offset); 4413 offset += _dispatch_source_debug_attr(ds, &buf[offset], bufsiz - offset); 4414 if (ds->ds_is_timer) { 4415 offset += _dispatch_timer_debug_attr(ds, &buf[offset], bufsiz - offset); 4416 } 4417 offset += dsnprintf(&buf[offset], bufsiz - offset, "filter = %s }", 4418 ds->ds_dkev ? _evfiltstr(ds->ds_dkev->dk_kevent.filter) : "????"); 4419 return offset; 4420} 4421 4422static size_t 4423_dispatch_mach_debug_attr(dispatch_mach_t dm, char* buf, size_t bufsiz) 4424{ 4425 dispatch_queue_t target = dm->do_targetq; 4426 return dsnprintf(buf, bufsiz, "target = %s[%p], receive = 0x%x, " 4427 "send = 0x%x, send-possible = 0x%x%s, checkin = 0x%x%s, " 4428 "sending = %d, disconnected = %d, canceled = %d ", 4429 target && target->dq_label ? target->dq_label : "", target, 4430 dm->ds_dkev ?(mach_port_t)dm->ds_dkev->dk_kevent.ident:0, 4431 dm->dm_refs->dm_send, 4432 dm->dm_dkev ?(mach_port_t)dm->dm_dkev->dk_kevent.ident:0, 4433 dm->dm_dkev && DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev) ? 4434 " (armed)" : "", dm->dm_refs->dm_checkin_port, 4435 dm->dm_refs->dm_checkin ? " (pending)" : "", 4436 dm->dm_refs->dm_sending, dm->dm_refs->dm_disconnect_cnt, 4437 (bool)(dm->ds_atomic_flags & DSF_CANCELED)); 4438} 4439size_t 4440_dispatch_mach_debug(dispatch_mach_t dm, char* buf, size_t bufsiz) 4441{ 4442 size_t offset = 0; 4443 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ", 4444 dm->dq_label && !dm->dm_cancel_handler_called ? dm->dq_label : 4445 dx_kind(dm), dm); 4446 offset += _dispatch_object_debug_attr(dm, &buf[offset], bufsiz - offset); 4447 offset += _dispatch_mach_debug_attr(dm, &buf[offset], bufsiz - offset); 4448 offset += dsnprintf(&buf[offset], bufsiz - offset, "}"); 4449 return offset; 4450} 4451 4452#if DISPATCH_DEBUG 4453static void 4454_dispatch_kevent_debug(struct kevent64_s* kev, const char* str) 4455{ 4456 _dispatch_log("kevent[%p] = { ident = 0x%llx, filter = %s, flags = 0x%x, " 4457 "fflags = 0x%x, data = 0x%llx, udata = 0x%llx, ext[0] = 0x%llx, " 4458 "ext[1] = 0x%llx }: %s", kev, kev->ident, _evfiltstr(kev->filter), 4459 kev->flags, kev->fflags, kev->data, kev->udata, kev->ext[0], 4460 kev->ext[1], str); 4461} 4462 4463static void 4464_dispatch_kevent_debugger2(void *context) 4465{ 4466 struct sockaddr sa; 4467 socklen_t sa_len = sizeof(sa); 4468 int c, fd = (int)(long)context; 4469 unsigned int i; 4470 dispatch_kevent_t dk; 4471 dispatch_source_t ds; 4472 dispatch_source_refs_t dr; 4473 FILE *debug_stream; 4474 4475 c = accept(fd, &sa, &sa_len); 4476 if (c == -1) { 4477 if (errno != EAGAIN) { 4478 (void)dispatch_assume_zero(errno); 4479 } 4480 return; 4481 } 4482#if 0 4483 int r = fcntl(c, F_SETFL, 0); // disable non-blocking IO 4484 if (r == -1) { 4485 (void)dispatch_assume_zero(errno); 4486 } 4487#endif 4488 debug_stream = fdopen(c, "a"); 4489 if (!dispatch_assume(debug_stream)) { 4490 close(c); 4491 return; 4492 } 4493 4494 fprintf(debug_stream, "HTTP/1.0 200 OK\r\n"); 4495 fprintf(debug_stream, "Content-type: text/html\r\n"); 4496 fprintf(debug_stream, "Pragma: nocache\r\n"); 4497 fprintf(debug_stream, "\r\n"); 4498 fprintf(debug_stream, "<html>\n"); 4499 fprintf(debug_stream, "<head><title>PID %u</title></head>\n", getpid()); 4500 fprintf(debug_stream, "<body>\n<ul>\n"); 4501 4502 //fprintf(debug_stream, "<tr><td>DK</td><td>DK</td><td>DK</td><td>DK</td>" 4503 // "<td>DK</td><td>DK</td><td>DK</td></tr>\n"); 4504 4505 for (i = 0; i < DSL_HASH_SIZE; i++) { 4506 if (TAILQ_EMPTY(&_dispatch_sources[i])) { 4507 continue; 4508 } 4509 TAILQ_FOREACH(dk, &_dispatch_sources[i], dk_list) { 4510 fprintf(debug_stream, "\t<br><li>DK %p ident %lu filter %s flags " 4511 "0x%hx fflags 0x%x data 0x%lx udata %p\n", 4512 dk, (unsigned long)dk->dk_kevent.ident, 4513 _evfiltstr(dk->dk_kevent.filter), dk->dk_kevent.flags, 4514 dk->dk_kevent.fflags, (unsigned long)dk->dk_kevent.data, 4515 (void*)dk->dk_kevent.udata); 4516 fprintf(debug_stream, "\t\t<ul>\n"); 4517 TAILQ_FOREACH(dr, &dk->dk_sources, dr_list) { 4518 ds = _dispatch_source_from_refs(dr); 4519 fprintf(debug_stream, "\t\t\t<li>DS %p refcnt 0x%x suspend " 4520 "0x%x data 0x%lx mask 0x%lx flags 0x%x</li>\n", 4521 ds, ds->do_ref_cnt + 1, ds->do_suspend_cnt, 4522 ds->ds_pending_data, ds->ds_pending_data_mask, 4523 ds->ds_atomic_flags); 4524 if (ds->do_suspend_cnt == DISPATCH_OBJECT_SUSPEND_LOCK) { 4525 dispatch_queue_t dq = ds->do_targetq; 4526 fprintf(debug_stream, "\t\t<br>DQ: %p refcnt 0x%x suspend " 4527 "0x%x label: %s\n", dq, dq->do_ref_cnt + 1, 4528 dq->do_suspend_cnt, dq->dq_label ? dq->dq_label:""); 4529 } 4530 } 4531 fprintf(debug_stream, "\t\t</ul>\n"); 4532 fprintf(debug_stream, "\t</li>\n"); 4533 } 4534 } 4535 fprintf(debug_stream, "</ul>\n</body>\n</html>\n"); 4536 fflush(debug_stream); 4537 fclose(debug_stream); 4538} 4539 4540static void 4541_dispatch_kevent_debugger2_cancel(void *context) 4542{ 4543 int ret, fd = (int)(long)context; 4544 4545 ret = close(fd); 4546 if (ret != -1) { 4547 (void)dispatch_assume_zero(errno); 4548 } 4549} 4550 4551static void 4552_dispatch_kevent_debugger(void *context DISPATCH_UNUSED) 4553{ 4554 union { 4555 struct sockaddr_in sa_in; 4556 struct sockaddr sa; 4557 } sa_u = { 4558 .sa_in = { 4559 .sin_family = AF_INET, 4560 .sin_addr = { htonl(INADDR_LOOPBACK), }, 4561 }, 4562 }; 4563 dispatch_source_t ds; 4564 const char *valstr; 4565 int val, r, fd, sock_opt = 1; 4566 socklen_t slen = sizeof(sa_u); 4567 4568 if (issetugid()) { 4569 return; 4570 } 4571 valstr = getenv("LIBDISPATCH_DEBUGGER"); 4572 if (!valstr) { 4573 return; 4574 } 4575 val = atoi(valstr); 4576 if (val == 2) { 4577 sa_u.sa_in.sin_addr.s_addr = 0; 4578 } 4579 fd = socket(PF_INET, SOCK_STREAM, 0); 4580 if (fd == -1) { 4581 (void)dispatch_assume_zero(errno); 4582 return; 4583 } 4584 r = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&sock_opt, 4585 (socklen_t) sizeof sock_opt); 4586 if (r == -1) { 4587 (void)dispatch_assume_zero(errno); 4588 goto out_bad; 4589 } 4590#if 0 4591 r = fcntl(fd, F_SETFL, O_NONBLOCK); 4592 if (r == -1) { 4593 (void)dispatch_assume_zero(errno); 4594 goto out_bad; 4595 } 4596#endif 4597 r = bind(fd, &sa_u.sa, sizeof(sa_u)); 4598 if (r == -1) { 4599 (void)dispatch_assume_zero(errno); 4600 goto out_bad; 4601 } 4602 r = listen(fd, SOMAXCONN); 4603 if (r == -1) { 4604 (void)dispatch_assume_zero(errno); 4605 goto out_bad; 4606 } 4607 r = getsockname(fd, &sa_u.sa, &slen); 4608 if (r == -1) { 4609 (void)dispatch_assume_zero(errno); 4610 goto out_bad; 4611 } 4612 4613 ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, (uintptr_t)fd, 0, 4614 &_dispatch_mgr_q); 4615 if (dispatch_assume(ds)) { 4616 _dispatch_log("LIBDISPATCH: debug port: %hu", 4617 (in_port_t)ntohs(sa_u.sa_in.sin_port)); 4618 4619 /* ownership of fd transfers to ds */ 4620 dispatch_set_context(ds, (void *)(long)fd); 4621 dispatch_source_set_event_handler_f(ds, _dispatch_kevent_debugger2); 4622 dispatch_source_set_cancel_handler_f(ds, 4623 _dispatch_kevent_debugger2_cancel); 4624 dispatch_resume(ds); 4625 4626 return; 4627 } 4628out_bad: 4629 close(fd); 4630} 4631 4632#if HAVE_MACH 4633 4634#ifndef MACH_PORT_TYPE_SPREQUEST 4635#define MACH_PORT_TYPE_SPREQUEST 0x40000000 4636#endif 4637 4638DISPATCH_NOINLINE 4639void 4640dispatch_debug_machport(mach_port_t name, const char* str) 4641{ 4642 mach_port_type_t type; 4643 mach_msg_bits_t ns = 0, nr = 0, nso = 0, nd = 0; 4644 unsigned int dnreqs = 0, dnrsiz; 4645 kern_return_t kr = mach_port_type(mach_task_self(), name, &type); 4646 if (kr) { 4647 _dispatch_log("machport[0x%08x] = { error(0x%x) \"%s\" }: %s", name, 4648 kr, mach_error_string(kr), str); 4649 return; 4650 } 4651 if (type & MACH_PORT_TYPE_SEND) { 4652 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name, 4653 MACH_PORT_RIGHT_SEND, &ns)); 4654 } 4655 if (type & MACH_PORT_TYPE_SEND_ONCE) { 4656 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name, 4657 MACH_PORT_RIGHT_SEND_ONCE, &nso)); 4658 } 4659 if (type & MACH_PORT_TYPE_DEAD_NAME) { 4660 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name, 4661 MACH_PORT_RIGHT_DEAD_NAME, &nd)); 4662 } 4663 if (type & (MACH_PORT_TYPE_RECEIVE|MACH_PORT_TYPE_SEND)) { 4664 kr = mach_port_dnrequest_info(mach_task_self(), name, &dnrsiz, &dnreqs); 4665 if (kr != KERN_INVALID_RIGHT) (void)dispatch_assume_zero(kr); 4666 } 4667 if (type & MACH_PORT_TYPE_RECEIVE) { 4668 mach_port_status_t status = { .mps_pset = 0, }; 4669 mach_msg_type_number_t cnt = MACH_PORT_RECEIVE_STATUS_COUNT; 4670 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name, 4671 MACH_PORT_RIGHT_RECEIVE, &nr)); 4672 (void)dispatch_assume_zero(mach_port_get_attributes(mach_task_self(), 4673 name, MACH_PORT_RECEIVE_STATUS, (void*)&status, &cnt)); 4674 _dispatch_log("machport[0x%08x] = { R(%03u) S(%03u) SO(%03u) D(%03u) " 4675 "dnreqs(%03u) spreq(%s) nsreq(%s) pdreq(%s) srights(%s) " 4676 "sorights(%03u) qlim(%03u) msgcount(%03u) mkscount(%03u) " 4677 "seqno(%03u) }: %s", name, nr, ns, nso, nd, dnreqs, 4678 type & MACH_PORT_TYPE_SPREQUEST ? "Y":"N", 4679 status.mps_nsrequest ? "Y":"N", status.mps_pdrequest ? "Y":"N", 4680 status.mps_srights ? "Y":"N", status.mps_sorights, 4681 status.mps_qlimit, status.mps_msgcount, status.mps_mscount, 4682 status.mps_seqno, str); 4683 } else if (type & (MACH_PORT_TYPE_SEND|MACH_PORT_TYPE_SEND_ONCE| 4684 MACH_PORT_TYPE_DEAD_NAME)) { 4685 _dispatch_log("machport[0x%08x] = { R(%03u) S(%03u) SO(%03u) D(%03u) " 4686 "dnreqs(%03u) spreq(%s) }: %s", name, nr, ns, nso, nd, dnreqs, 4687 type & MACH_PORT_TYPE_SPREQUEST ? "Y":"N", str); 4688 } else { 4689 _dispatch_log("machport[0x%08x] = { type(0x%08x) }: %s", name, type, 4690 str); 4691 } 4692} 4693 4694#endif // HAVE_MACH 4695 4696#endif // DISPATCH_DEBUG 4697