1/* 2 * Copyright (c) 2008-2013 Apple Inc. All rights reserved. 3 * 4 * @APPLE_APACHE_LICENSE_HEADER_START@ 5 * 6 * Licensed under the Apache License, Version 2.0 (the "License"); 7 * you may not use this file except in compliance with the License. 8 * You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 * 18 * @APPLE_APACHE_LICENSE_HEADER_END@ 19 */ 20 21#include "internal.h" 22#if HAVE_MACH 23#include "protocol.h" 24#include "protocolServer.h" 25#endif 26#include <sys/mount.h> 27 28static void _dispatch_source_merge_kevent(dispatch_source_t ds, 29 const struct kevent64_s *ke); 30static bool _dispatch_kevent_register(dispatch_kevent_t *dkp, uint32_t *flgp); 31static void _dispatch_kevent_unregister(dispatch_kevent_t dk, uint32_t flg); 32static bool _dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags, 33 uint32_t del_flags); 34static void _dispatch_kevent_drain(struct kevent64_s *ke); 35static void _dispatch_kevent_merge(struct kevent64_s *ke); 36static void _dispatch_timers_kevent(struct kevent64_s *ke); 37static void _dispatch_timers_unregister(dispatch_source_t ds, 38 dispatch_kevent_t dk); 39static void _dispatch_timers_update(dispatch_source_t ds); 40static void _dispatch_timer_aggregates_check(void); 41static void _dispatch_timer_aggregates_register(dispatch_source_t ds); 42static void _dispatch_timer_aggregates_update(dispatch_source_t ds, 43 unsigned int tidx); 44static void _dispatch_timer_aggregates_unregister(dispatch_source_t ds, 45 unsigned int tidx); 46static inline unsigned long _dispatch_source_timer_data( 47 dispatch_source_refs_t dr, unsigned long prev); 48static long _dispatch_kq_update(const struct kevent64_s *); 49static void _dispatch_memorystatus_init(void); 50#if HAVE_MACH 51static void _dispatch_mach_host_calendar_change_register(void); 52static void _dispatch_mach_recv_msg_buf_init(void); 53static kern_return_t _dispatch_kevent_machport_resume(dispatch_kevent_t dk, 54 uint32_t new_flags, uint32_t del_flags); 55static kern_return_t _dispatch_kevent_mach_notify_resume(dispatch_kevent_t dk, 56 uint32_t new_flags, uint32_t del_flags); 57static inline void _dispatch_kevent_mach_portset(struct kevent64_s *ke); 58#else 59static inline void _dispatch_mach_host_calendar_change_register(void) {} 60static inline void _dispatch_mach_recv_msg_buf_init(void) {} 61#endif 62static const char * _evfiltstr(short filt); 63#if DISPATCH_DEBUG 64static void _dispatch_kevent_debug(struct kevent64_s* kev, const char* str); 65static void _dispatch_kevent_debugger(void *context); 66#define DISPATCH_ASSERT_ON_MANAGER_QUEUE() \ 67 dispatch_assert(_dispatch_queue_get_current() == &_dispatch_mgr_q) 68#else 69static inline void 70_dispatch_kevent_debug(struct kevent64_s* kev DISPATCH_UNUSED, 71 const char* str DISPATCH_UNUSED) {} 72#define DISPATCH_ASSERT_ON_MANAGER_QUEUE() 73#endif 74 75#pragma mark - 76#pragma mark dispatch_source_t 77 78dispatch_source_t 79dispatch_source_create(dispatch_source_type_t type, 80 uintptr_t handle, 81 unsigned long mask, 82 dispatch_queue_t q) 83{ 84 const struct kevent64_s *proto_kev = &type->ke; 85 dispatch_source_t ds; 86 dispatch_kevent_t dk; 87 88 // input validation 89 if (type == NULL || (mask & ~type->mask)) { 90 return NULL; 91 } 92 93 switch (type->ke.filter) { 94 case EVFILT_SIGNAL: 95 if (handle >= NSIG) { 96 return NULL; 97 } 98 break; 99 case EVFILT_FS: 100#if DISPATCH_USE_VM_PRESSURE 101 case EVFILT_VM: 102#endif 103#if DISPATCH_USE_MEMORYSTATUS 104 case EVFILT_MEMORYSTATUS: 105#endif 106 case DISPATCH_EVFILT_CUSTOM_ADD: 107 case DISPATCH_EVFILT_CUSTOM_OR: 108 if (handle) { 109 return NULL; 110 } 111 break; 112 case DISPATCH_EVFILT_TIMER: 113 if (!!handle ^ !!type->ke.ident) { 114 return NULL; 115 } 116 break; 117 default: 118 break; 119 } 120 121 ds = _dispatch_alloc(DISPATCH_VTABLE(source), 122 sizeof(struct dispatch_source_s)); 123 // Initialize as a queue first, then override some settings below. 124 _dispatch_queue_init((dispatch_queue_t)ds); 125 ds->dq_label = "source"; 126 127 ds->do_ref_cnt++; // the reference the manager queue holds 128 ds->do_ref_cnt++; // since source is created suspended 129 ds->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_INTERVAL; 130 // The initial target queue is the manager queue, in order to get 131 // the source installed. <rdar://problem/8928171> 132 ds->do_targetq = &_dispatch_mgr_q; 133 134 dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s)); 135 dk->dk_kevent = *proto_kev; 136 dk->dk_kevent.ident = handle; 137 dk->dk_kevent.flags |= EV_ADD|EV_ENABLE; 138 dk->dk_kevent.fflags |= (uint32_t)mask; 139 dk->dk_kevent.udata = (uintptr_t)dk; 140 TAILQ_INIT(&dk->dk_sources); 141 142 ds->ds_dkev = dk; 143 ds->ds_pending_data_mask = dk->dk_kevent.fflags; 144 ds->ds_ident_hack = (uintptr_t)dk->dk_kevent.ident; 145 if ((EV_DISPATCH|EV_ONESHOT) & proto_kev->flags) { 146 ds->ds_is_level = true; 147 ds->ds_needs_rearm = true; 148 } else if (!(EV_CLEAR & proto_kev->flags)) { 149 // we cheat and use EV_CLEAR to mean a "flag thingy" 150 ds->ds_is_adder = true; 151 } 152 // Some sources require special processing 153 if (type->init != NULL) { 154 type->init(ds, type, handle, mask, q); 155 } 156 dispatch_assert(!(ds->ds_is_level && ds->ds_is_adder)); 157 158 if (fastpath(!ds->ds_refs)) { 159 ds->ds_refs = _dispatch_calloc(1ul, 160 sizeof(struct dispatch_source_refs_s)); 161 } 162 ds->ds_refs->dr_source_wref = _dispatch_ptr2wref(ds); 163 164 // First item on the queue sets the user-specified target queue 165 dispatch_set_target_queue(ds, q); 166 _dispatch_object_debug(ds, "%s", __func__); 167 return ds; 168} 169 170void 171_dispatch_source_dispose(dispatch_source_t ds) 172{ 173 _dispatch_object_debug(ds, "%s", __func__); 174 free(ds->ds_refs); 175 _dispatch_queue_destroy(ds); 176} 177 178void 179_dispatch_source_xref_dispose(dispatch_source_t ds) 180{ 181 _dispatch_wakeup(ds); 182} 183 184void 185dispatch_source_cancel(dispatch_source_t ds) 186{ 187 _dispatch_object_debug(ds, "%s", __func__); 188 // Right after we set the cancel flag, someone else 189 // could potentially invoke the source, do the cancelation, 190 // unregister the source, and deallocate it. We would 191 // need to therefore retain/release before setting the bit 192 193 _dispatch_retain(ds); 194 (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_CANCELED, relaxed); 195 _dispatch_wakeup(ds); 196 _dispatch_release(ds); 197} 198 199long 200dispatch_source_testcancel(dispatch_source_t ds) 201{ 202 return (bool)(ds->ds_atomic_flags & DSF_CANCELED); 203} 204 205 206unsigned long 207dispatch_source_get_mask(dispatch_source_t ds) 208{ 209 return ds->ds_pending_data_mask; 210} 211 212uintptr_t 213dispatch_source_get_handle(dispatch_source_t ds) 214{ 215 return (unsigned int)ds->ds_ident_hack; 216} 217 218unsigned long 219dispatch_source_get_data(dispatch_source_t ds) 220{ 221 return ds->ds_data; 222} 223 224void 225dispatch_source_merge_data(dispatch_source_t ds, unsigned long val) 226{ 227 struct kevent64_s kev = { 228 .fflags = (typeof(kev.fflags))val, 229 .data = (typeof(kev.data))val, 230 }; 231 232 dispatch_assert( 233 ds->ds_dkev->dk_kevent.filter == DISPATCH_EVFILT_CUSTOM_ADD || 234 ds->ds_dkev->dk_kevent.filter == DISPATCH_EVFILT_CUSTOM_OR); 235 236 _dispatch_source_merge_kevent(ds, &kev); 237} 238 239#pragma mark - 240#pragma mark dispatch_source_handler 241 242#ifdef __BLOCKS__ 243// 6618342 Contact the team that owns the Instrument DTrace probe before 244// renaming this symbol 245static void 246_dispatch_source_set_event_handler2(void *context) 247{ 248 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current(); 249 dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE); 250 dispatch_source_refs_t dr = ds->ds_refs; 251 252 if (ds->ds_handler_is_block && dr->ds_handler_ctxt) { 253 Block_release(dr->ds_handler_ctxt); 254 } 255 dr->ds_handler_func = context ? _dispatch_Block_invoke(context) : NULL; 256 dr->ds_handler_ctxt = context; 257 ds->ds_handler_is_block = true; 258} 259 260void 261dispatch_source_set_event_handler(dispatch_source_t ds, 262 dispatch_block_t handler) 263{ 264 handler = _dispatch_Block_copy(handler); 265 _dispatch_barrier_trysync_f((dispatch_queue_t)ds, handler, 266 _dispatch_source_set_event_handler2); 267} 268#endif /* __BLOCKS__ */ 269 270static void 271_dispatch_source_set_event_handler_f(void *context) 272{ 273 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current(); 274 dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE); 275 dispatch_source_refs_t dr = ds->ds_refs; 276 277#ifdef __BLOCKS__ 278 if (ds->ds_handler_is_block && dr->ds_handler_ctxt) { 279 Block_release(dr->ds_handler_ctxt); 280 } 281#endif 282 dr->ds_handler_func = context; 283 dr->ds_handler_ctxt = ds->do_ctxt; 284 ds->ds_handler_is_block = false; 285} 286 287void 288dispatch_source_set_event_handler_f(dispatch_source_t ds, 289 dispatch_function_t handler) 290{ 291 _dispatch_barrier_trysync_f((dispatch_queue_t)ds, handler, 292 _dispatch_source_set_event_handler_f); 293} 294 295#ifdef __BLOCKS__ 296// 6618342 Contact the team that owns the Instrument DTrace probe before 297// renaming this symbol 298static void 299_dispatch_source_set_cancel_handler2(void *context) 300{ 301 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current(); 302 dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE); 303 dispatch_source_refs_t dr = ds->ds_refs; 304 305 if (ds->ds_cancel_is_block && dr->ds_cancel_handler) { 306 Block_release(dr->ds_cancel_handler); 307 } 308 dr->ds_cancel_handler = context; 309 ds->ds_cancel_is_block = true; 310} 311 312void 313dispatch_source_set_cancel_handler(dispatch_source_t ds, 314 dispatch_block_t handler) 315{ 316 handler = _dispatch_Block_copy(handler); 317 _dispatch_barrier_trysync_f((dispatch_queue_t)ds, handler, 318 _dispatch_source_set_cancel_handler2); 319} 320#endif /* __BLOCKS__ */ 321 322static void 323_dispatch_source_set_cancel_handler_f(void *context) 324{ 325 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current(); 326 dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE); 327 dispatch_source_refs_t dr = ds->ds_refs; 328 329#ifdef __BLOCKS__ 330 if (ds->ds_cancel_is_block && dr->ds_cancel_handler) { 331 Block_release(dr->ds_cancel_handler); 332 } 333#endif 334 dr->ds_cancel_handler = context; 335 ds->ds_cancel_is_block = false; 336} 337 338void 339dispatch_source_set_cancel_handler_f(dispatch_source_t ds, 340 dispatch_function_t handler) 341{ 342 _dispatch_barrier_trysync_f((dispatch_queue_t)ds, handler, 343 _dispatch_source_set_cancel_handler_f); 344} 345 346#ifdef __BLOCKS__ 347static void 348_dispatch_source_set_registration_handler2(void *context) 349{ 350 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current(); 351 dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE); 352 dispatch_source_refs_t dr = ds->ds_refs; 353 354 if (ds->ds_registration_is_block && dr->ds_registration_handler) { 355 Block_release(dr->ds_registration_handler); 356 } 357 dr->ds_registration_handler = context; 358 ds->ds_registration_is_block = true; 359} 360 361void 362dispatch_source_set_registration_handler(dispatch_source_t ds, 363 dispatch_block_t handler) 364{ 365 handler = _dispatch_Block_copy(handler); 366 _dispatch_barrier_trysync_f((dispatch_queue_t)ds, handler, 367 _dispatch_source_set_registration_handler2); 368} 369#endif /* __BLOCKS__ */ 370 371static void 372_dispatch_source_set_registration_handler_f(void *context) 373{ 374 dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current(); 375 dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE); 376 dispatch_source_refs_t dr = ds->ds_refs; 377 378#ifdef __BLOCKS__ 379 if (ds->ds_registration_is_block && dr->ds_registration_handler) { 380 Block_release(dr->ds_registration_handler); 381 } 382#endif 383 dr->ds_registration_handler = context; 384 ds->ds_registration_is_block = false; 385} 386 387void 388dispatch_source_set_registration_handler_f(dispatch_source_t ds, 389 dispatch_function_t handler) 390{ 391 _dispatch_barrier_trysync_f((dispatch_queue_t)ds, handler, 392 _dispatch_source_set_registration_handler_f); 393} 394 395#pragma mark - 396#pragma mark dispatch_source_invoke 397 398static void 399_dispatch_source_registration_callout(dispatch_source_t ds) 400{ 401 dispatch_source_refs_t dr = ds->ds_refs; 402 403 if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)) { 404 // no registration callout if source is canceled rdar://problem/8955246 405#ifdef __BLOCKS__ 406 if (ds->ds_registration_is_block) { 407 Block_release(dr->ds_registration_handler); 408 } 409 } else if (ds->ds_registration_is_block) { 410 dispatch_block_t b = dr->ds_registration_handler; 411 _dispatch_client_callout_block(b); 412 Block_release(dr->ds_registration_handler); 413#endif 414 } else { 415 dispatch_function_t f = dr->ds_registration_handler; 416 _dispatch_client_callout(ds->do_ctxt, f); 417 } 418 ds->ds_registration_is_block = false; 419 dr->ds_registration_handler = NULL; 420} 421 422static void 423_dispatch_source_cancel_callout(dispatch_source_t ds) 424{ 425 dispatch_source_refs_t dr = ds->ds_refs; 426 427 ds->ds_pending_data_mask = 0; 428 ds->ds_pending_data = 0; 429 ds->ds_data = 0; 430 431#ifdef __BLOCKS__ 432 if (ds->ds_handler_is_block) { 433 Block_release(dr->ds_handler_ctxt); 434 ds->ds_handler_is_block = false; 435 dr->ds_handler_func = NULL; 436 dr->ds_handler_ctxt = NULL; 437 } 438 if (ds->ds_registration_is_block) { 439 Block_release(dr->ds_registration_handler); 440 ds->ds_registration_is_block = false; 441 dr->ds_registration_handler = NULL; 442 } 443#endif 444 445 if (!dr->ds_cancel_handler) { 446 return; 447 } 448 if (ds->ds_cancel_is_block) { 449#ifdef __BLOCKS__ 450 dispatch_block_t b = dr->ds_cancel_handler; 451 if (ds->ds_atomic_flags & DSF_CANCELED) { 452 _dispatch_client_callout_block(b); 453 } 454 Block_release(dr->ds_cancel_handler); 455 ds->ds_cancel_is_block = false; 456#endif 457 } else { 458 dispatch_function_t f = dr->ds_cancel_handler; 459 if (ds->ds_atomic_flags & DSF_CANCELED) { 460 _dispatch_client_callout(ds->do_ctxt, f); 461 } 462 } 463 dr->ds_cancel_handler = NULL; 464} 465 466static void 467_dispatch_source_latch_and_call(dispatch_source_t ds) 468{ 469 unsigned long prev; 470 471 if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)) { 472 return; 473 } 474 dispatch_source_refs_t dr = ds->ds_refs; 475 prev = dispatch_atomic_xchg2o(ds, ds_pending_data, 0, relaxed); 476 if (ds->ds_is_level) { 477 ds->ds_data = ~prev; 478 } else if (ds->ds_is_timer && ds_timer(dr).target && prev) { 479 ds->ds_data = _dispatch_source_timer_data(dr, prev); 480 } else { 481 ds->ds_data = prev; 482 } 483 if (dispatch_assume(prev) && dr->ds_handler_func) { 484 _dispatch_client_callout(dr->ds_handler_ctxt, dr->ds_handler_func); 485 } 486} 487 488static void 489_dispatch_source_kevent_unregister(dispatch_source_t ds) 490{ 491 _dispatch_object_debug(ds, "%s", __func__); 492 dispatch_kevent_t dk = ds->ds_dkev; 493 ds->ds_dkev = NULL; 494 switch (dk->dk_kevent.filter) { 495 case DISPATCH_EVFILT_TIMER: 496 _dispatch_timers_unregister(ds, dk); 497 break; 498 default: 499 TAILQ_REMOVE(&dk->dk_sources, ds->ds_refs, dr_list); 500 _dispatch_kevent_unregister(dk, (uint32_t)ds->ds_pending_data_mask); 501 break; 502 } 503 504 (void)dispatch_atomic_and2o(ds, ds_atomic_flags, ~DSF_ARMED, relaxed); 505 ds->ds_needs_rearm = false; // re-arm is pointless and bad now 506 _dispatch_release(ds); // the retain is done at creation time 507} 508 509static void 510_dispatch_source_kevent_resume(dispatch_source_t ds, uint32_t new_flags) 511{ 512 switch (ds->ds_dkev->dk_kevent.filter) { 513 case DISPATCH_EVFILT_TIMER: 514 return _dispatch_timers_update(ds); 515 case EVFILT_MACHPORT: 516 if (ds->ds_pending_data_mask & DISPATCH_MACH_RECV_MESSAGE) { 517 new_flags |= DISPATCH_MACH_RECV_MESSAGE; // emulate EV_DISPATCH 518 } 519 break; 520 } 521 if (_dispatch_kevent_resume(ds->ds_dkev, new_flags, 0)) { 522 _dispatch_source_kevent_unregister(ds); 523 } 524} 525 526static void 527_dispatch_source_kevent_register(dispatch_source_t ds) 528{ 529 dispatch_assert_zero(ds->ds_is_installed); 530 switch (ds->ds_dkev->dk_kevent.filter) { 531 case DISPATCH_EVFILT_TIMER: 532 return _dispatch_timers_update(ds); 533 } 534 uint32_t flags; 535 bool do_resume = _dispatch_kevent_register(&ds->ds_dkev, &flags); 536 TAILQ_INSERT_TAIL(&ds->ds_dkev->dk_sources, ds->ds_refs, dr_list); 537 if (do_resume || ds->ds_needs_rearm) { 538 _dispatch_source_kevent_resume(ds, flags); 539 } 540 (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, relaxed); 541 _dispatch_object_debug(ds, "%s", __func__); 542} 543 544DISPATCH_ALWAYS_INLINE 545static inline dispatch_queue_t 546_dispatch_source_invoke2(dispatch_object_t dou, 547 _dispatch_thread_semaphore_t *sema_ptr DISPATCH_UNUSED) 548{ 549 dispatch_source_t ds = dou._ds; 550 if (slowpath(_dispatch_queue_drain(ds))) { 551 DISPATCH_CLIENT_CRASH("Sync onto source"); 552 } 553 554 // This function performs all source actions. Each action is responsible 555 // for verifying that it takes place on the appropriate queue. If the 556 // current queue is not the correct queue for this action, the correct queue 557 // will be returned and the invoke will be re-driven on that queue. 558 559 // The order of tests here in invoke and in probe should be consistent. 560 561 dispatch_queue_t dq = _dispatch_queue_get_current(); 562 dispatch_source_refs_t dr = ds->ds_refs; 563 564 if (!ds->ds_is_installed) { 565 // The source needs to be installed on the manager queue. 566 if (dq != &_dispatch_mgr_q) { 567 return &_dispatch_mgr_q; 568 } 569 _dispatch_source_kevent_register(ds); 570 ds->ds_is_installed = true; 571 if (dr->ds_registration_handler) { 572 return ds->do_targetq; 573 } 574 if (slowpath(ds->do_xref_cnt == -1)) { 575 return &_dispatch_mgr_q; // rdar://problem/9558246 576 } 577 } else if (slowpath(DISPATCH_OBJECT_SUSPENDED(ds))) { 578 // Source suspended by an item drained from the source queue. 579 return NULL; 580 } else if (dr->ds_registration_handler) { 581 // The source has been registered and the registration handler needs 582 // to be delivered on the target queue. 583 if (dq != ds->do_targetq) { 584 return ds->do_targetq; 585 } 586 // clears ds_registration_handler 587 _dispatch_source_registration_callout(ds); 588 if (slowpath(ds->do_xref_cnt == -1)) { 589 return &_dispatch_mgr_q; // rdar://problem/9558246 590 } 591 } else if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)){ 592 // The source has been cancelled and needs to be uninstalled from the 593 // manager queue. After uninstallation, the cancellation handler needs 594 // to be delivered to the target queue. 595 if (ds->ds_dkev) { 596 if (dq != &_dispatch_mgr_q) { 597 return &_dispatch_mgr_q; 598 } 599 _dispatch_source_kevent_unregister(ds); 600 } 601 if (dr->ds_cancel_handler || ds->ds_handler_is_block || 602 ds->ds_registration_is_block) { 603 if (dq != ds->do_targetq) { 604 return ds->do_targetq; 605 } 606 } 607 _dispatch_source_cancel_callout(ds); 608 } else if (ds->ds_pending_data) { 609 // The source has pending data to deliver via the event handler callback 610 // on the target queue. Some sources need to be rearmed on the manager 611 // queue after event delivery. 612 if (dq != ds->do_targetq) { 613 return ds->do_targetq; 614 } 615 _dispatch_source_latch_and_call(ds); 616 if (ds->ds_needs_rearm) { 617 return &_dispatch_mgr_q; 618 } 619 } else if (ds->ds_needs_rearm && !(ds->ds_atomic_flags & DSF_ARMED)) { 620 // The source needs to be rearmed on the manager queue. 621 if (dq != &_dispatch_mgr_q) { 622 return &_dispatch_mgr_q; 623 } 624 _dispatch_source_kevent_resume(ds, 0); 625 (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, relaxed); 626 } 627 628 return NULL; 629} 630 631DISPATCH_NOINLINE 632void 633_dispatch_source_invoke(dispatch_source_t ds) 634{ 635 _dispatch_queue_class_invoke(ds, _dispatch_source_invoke2); 636} 637 638unsigned long 639_dispatch_source_probe(dispatch_source_t ds) 640{ 641 // This function determines whether the source needs to be invoked. 642 // The order of tests here in probe and in invoke should be consistent. 643 644 dispatch_source_refs_t dr = ds->ds_refs; 645 if (!ds->ds_is_installed) { 646 // The source needs to be installed on the manager queue. 647 return true; 648 } else if (dr->ds_registration_handler) { 649 // The registration handler needs to be delivered to the target queue. 650 return true; 651 } else if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)){ 652 // The source needs to be uninstalled from the manager queue, or the 653 // cancellation handler needs to be delivered to the target queue. 654 // Note: cancellation assumes installation. 655 if (ds->ds_dkev || dr->ds_cancel_handler 656#ifdef __BLOCKS__ 657 || ds->ds_handler_is_block || ds->ds_registration_is_block 658#endif 659 ) { 660 return true; 661 } 662 } else if (ds->ds_pending_data) { 663 // The source has pending data to deliver to the target queue. 664 return true; 665 } else if (ds->ds_needs_rearm && !(ds->ds_atomic_flags & DSF_ARMED)) { 666 // The source needs to be rearmed on the manager queue. 667 return true; 668 } 669 return (ds->dq_items_tail != NULL); 670} 671 672static void 673_dispatch_source_merge_kevent(dispatch_source_t ds, const struct kevent64_s *ke) 674{ 675 if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)) { 676 return; 677 } 678 if (ds->ds_is_level) { 679 // ke->data is signed and "negative available data" makes no sense 680 // zero bytes happens when EV_EOF is set 681 // 10A268 does not fail this assert with EVFILT_READ and a 10 GB file 682 dispatch_assert(ke->data >= 0l); 683 dispatch_atomic_store2o(ds, ds_pending_data, ~(unsigned long)ke->data, 684 relaxed); 685 } else if (ds->ds_is_adder) { 686 (void)dispatch_atomic_add2o(ds, ds_pending_data, 687 (unsigned long)ke->data, relaxed); 688 } else if (ke->fflags & ds->ds_pending_data_mask) { 689 (void)dispatch_atomic_or2o(ds, ds_pending_data, 690 ke->fflags & ds->ds_pending_data_mask, relaxed); 691 } 692 // EV_DISPATCH and EV_ONESHOT sources are no longer armed after delivery 693 if (ds->ds_needs_rearm) { 694 (void)dispatch_atomic_and2o(ds, ds_atomic_flags, ~DSF_ARMED, relaxed); 695 } 696 697 _dispatch_wakeup(ds); 698} 699 700#pragma mark - 701#pragma mark dispatch_kevent_t 702 703#if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD 704static void _dispatch_kevent_guard(dispatch_kevent_t dk); 705static void _dispatch_kevent_unguard(dispatch_kevent_t dk); 706#else 707static inline void _dispatch_kevent_guard(dispatch_kevent_t dk) { (void)dk; } 708static inline void _dispatch_kevent_unguard(dispatch_kevent_t dk) { (void)dk; } 709#endif 710 711static struct dispatch_kevent_s _dispatch_kevent_data_or = { 712 .dk_kevent = { 713 .filter = DISPATCH_EVFILT_CUSTOM_OR, 714 .flags = EV_CLEAR, 715 }, 716 .dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_or.dk_sources), 717}; 718static struct dispatch_kevent_s _dispatch_kevent_data_add = { 719 .dk_kevent = { 720 .filter = DISPATCH_EVFILT_CUSTOM_ADD, 721 }, 722 .dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_add.dk_sources), 723}; 724 725#define DSL_HASH(x) ((x) & (DSL_HASH_SIZE - 1)) 726 727DISPATCH_CACHELINE_ALIGN 728static TAILQ_HEAD(, dispatch_kevent_s) _dispatch_sources[DSL_HASH_SIZE]; 729 730static void 731_dispatch_kevent_init() 732{ 733 unsigned int i; 734 for (i = 0; i < DSL_HASH_SIZE; i++) { 735 TAILQ_INIT(&_dispatch_sources[i]); 736 } 737 738 TAILQ_INSERT_TAIL(&_dispatch_sources[0], 739 &_dispatch_kevent_data_or, dk_list); 740 TAILQ_INSERT_TAIL(&_dispatch_sources[0], 741 &_dispatch_kevent_data_add, dk_list); 742 _dispatch_kevent_data_or.dk_kevent.udata = 743 (uintptr_t)&_dispatch_kevent_data_or; 744 _dispatch_kevent_data_add.dk_kevent.udata = 745 (uintptr_t)&_dispatch_kevent_data_add; 746} 747 748static inline uintptr_t 749_dispatch_kevent_hash(uint64_t ident, short filter) 750{ 751 uint64_t value; 752#if HAVE_MACH 753 value = (filter == EVFILT_MACHPORT || 754 filter == DISPATCH_EVFILT_MACH_NOTIFICATION ? 755 MACH_PORT_INDEX(ident) : ident); 756#else 757 value = ident; 758#endif 759 return DSL_HASH((uintptr_t)value); 760} 761 762static dispatch_kevent_t 763_dispatch_kevent_find(uint64_t ident, short filter) 764{ 765 uintptr_t hash = _dispatch_kevent_hash(ident, filter); 766 dispatch_kevent_t dki; 767 768 TAILQ_FOREACH(dki, &_dispatch_sources[hash], dk_list) { 769 if (dki->dk_kevent.ident == ident && dki->dk_kevent.filter == filter) { 770 break; 771 } 772 } 773 return dki; 774} 775 776static void 777_dispatch_kevent_insert(dispatch_kevent_t dk) 778{ 779 _dispatch_kevent_guard(dk); 780 uintptr_t hash = _dispatch_kevent_hash(dk->dk_kevent.ident, 781 dk->dk_kevent.filter); 782 TAILQ_INSERT_TAIL(&_dispatch_sources[hash], dk, dk_list); 783} 784 785// Find existing kevents, and merge any new flags if necessary 786static bool 787_dispatch_kevent_register(dispatch_kevent_t *dkp, uint32_t *flgp) 788{ 789 dispatch_kevent_t dk, ds_dkev = *dkp; 790 uint32_t new_flags; 791 bool do_resume = false; 792 793 dk = _dispatch_kevent_find(ds_dkev->dk_kevent.ident, 794 ds_dkev->dk_kevent.filter); 795 if (dk) { 796 // If an existing dispatch kevent is found, check to see if new flags 797 // need to be added to the existing kevent 798 new_flags = ~dk->dk_kevent.fflags & ds_dkev->dk_kevent.fflags; 799 dk->dk_kevent.fflags |= ds_dkev->dk_kevent.fflags; 800 free(ds_dkev); 801 *dkp = dk; 802 do_resume = new_flags; 803 } else { 804 dk = ds_dkev; 805 _dispatch_kevent_insert(dk); 806 new_flags = dk->dk_kevent.fflags; 807 do_resume = true; 808 } 809 // Re-register the kevent with the kernel if new flags were added 810 // by the dispatch kevent 811 if (do_resume) { 812 dk->dk_kevent.flags |= EV_ADD; 813 } 814 *flgp = new_flags; 815 return do_resume; 816} 817 818static bool 819_dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags, 820 uint32_t del_flags) 821{ 822 long r; 823 switch (dk->dk_kevent.filter) { 824 case DISPATCH_EVFILT_TIMER: 825 case DISPATCH_EVFILT_CUSTOM_ADD: 826 case DISPATCH_EVFILT_CUSTOM_OR: 827 // these types not registered with kevent 828 return 0; 829#if HAVE_MACH 830 case EVFILT_MACHPORT: 831 return _dispatch_kevent_machport_resume(dk, new_flags, del_flags); 832 case DISPATCH_EVFILT_MACH_NOTIFICATION: 833 return _dispatch_kevent_mach_notify_resume(dk, new_flags, del_flags); 834#endif 835 case EVFILT_PROC: 836 if (dk->dk_kevent.flags & EV_ONESHOT) { 837 return 0; 838 } 839 // fall through 840 default: 841 r = _dispatch_kq_update(&dk->dk_kevent); 842 if (dk->dk_kevent.flags & EV_DISPATCH) { 843 dk->dk_kevent.flags &= ~EV_ADD; 844 } 845 return r; 846 } 847} 848 849static void 850_dispatch_kevent_dispose(dispatch_kevent_t dk) 851{ 852 uintptr_t hash; 853 854 switch (dk->dk_kevent.filter) { 855 case DISPATCH_EVFILT_TIMER: 856 case DISPATCH_EVFILT_CUSTOM_ADD: 857 case DISPATCH_EVFILT_CUSTOM_OR: 858 // these sources live on statically allocated lists 859 return; 860#if HAVE_MACH 861 case EVFILT_MACHPORT: 862 _dispatch_kevent_machport_resume(dk, 0, dk->dk_kevent.fflags); 863 break; 864 case DISPATCH_EVFILT_MACH_NOTIFICATION: 865 _dispatch_kevent_mach_notify_resume(dk, 0, dk->dk_kevent.fflags); 866 break; 867#endif 868 case EVFILT_PROC: 869 if (dk->dk_kevent.flags & EV_ONESHOT) { 870 break; // implicitly deleted 871 } 872 // fall through 873 default: 874 if (~dk->dk_kevent.flags & EV_DELETE) { 875 dk->dk_kevent.flags |= EV_DELETE; 876 dk->dk_kevent.flags &= ~(EV_ADD|EV_ENABLE); 877 _dispatch_kq_update(&dk->dk_kevent); 878 } 879 break; 880 } 881 882 hash = _dispatch_kevent_hash(dk->dk_kevent.ident, 883 dk->dk_kevent.filter); 884 TAILQ_REMOVE(&_dispatch_sources[hash], dk, dk_list); 885 _dispatch_kevent_unguard(dk); 886 free(dk); 887} 888 889static void 890_dispatch_kevent_unregister(dispatch_kevent_t dk, uint32_t flg) 891{ 892 dispatch_source_refs_t dri; 893 uint32_t del_flags, fflags = 0; 894 895 if (TAILQ_EMPTY(&dk->dk_sources)) { 896 _dispatch_kevent_dispose(dk); 897 } else { 898 TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) { 899 dispatch_source_t dsi = _dispatch_source_from_refs(dri); 900 uint32_t mask = (uint32_t)dsi->ds_pending_data_mask; 901 fflags |= mask; 902 } 903 del_flags = flg & ~fflags; 904 if (del_flags) { 905 dk->dk_kevent.flags |= EV_ADD; 906 dk->dk_kevent.fflags = fflags; 907 _dispatch_kevent_resume(dk, 0, del_flags); 908 } 909 } 910} 911 912DISPATCH_NOINLINE 913static void 914_dispatch_kevent_proc_exit(struct kevent64_s *ke) 915{ 916 // EVFILT_PROC may fail with ESRCH when the process exists but is a zombie 917 // <rdar://problem/5067725>. As a workaround, we simulate an exit event for 918 // any EVFILT_PROC with an invalid pid <rdar://problem/6626350>. 919 struct kevent64_s fake; 920 fake = *ke; 921 fake.flags &= ~EV_ERROR; 922 fake.fflags = NOTE_EXIT; 923 fake.data = 0; 924 _dispatch_kevent_drain(&fake); 925} 926 927DISPATCH_NOINLINE 928static void 929_dispatch_kevent_error(struct kevent64_s *ke) 930{ 931 _dispatch_kevent_debug(ke, __func__); 932 if (ke->data) { 933 // log the unexpected error 934 _dispatch_bug_kevent_client("kevent", _evfiltstr(ke->filter), 935 ke->flags & EV_DELETE ? "delete" : 936 ke->flags & EV_ADD ? "add" : 937 ke->flags & EV_ENABLE ? "enable" : "monitor", 938 (int)ke->data); 939 } 940} 941 942static void 943_dispatch_kevent_drain(struct kevent64_s *ke) 944{ 945#if DISPATCH_DEBUG 946 static dispatch_once_t pred; 947 dispatch_once_f(&pred, NULL, _dispatch_kevent_debugger); 948#endif 949 if (ke->filter == EVFILT_USER) { 950 return; 951 } 952 if (slowpath(ke->flags & EV_ERROR)) { 953 if (ke->filter == EVFILT_PROC) { 954 if (ke->flags & EV_DELETE) { 955 // Process exited while monitored 956 return; 957 } else if (ke->data == ESRCH) { 958 return _dispatch_kevent_proc_exit(ke); 959 } 960#if DISPATCH_USE_VM_PRESSURE 961 } else if (ke->filter == EVFILT_VM && ke->data == ENOTSUP) { 962 // Memory pressure kevent is not supported on all platforms 963 // <rdar://problem/8636227> 964 return; 965#endif 966#if DISPATCH_USE_MEMORYSTATUS 967 } else if (ke->filter == EVFILT_MEMORYSTATUS && 968 (ke->data == EINVAL || ke->data == ENOTSUP)) { 969 // Memory status kevent is not supported on all platforms 970 return; 971#endif 972 } 973 return _dispatch_kevent_error(ke); 974 } 975 _dispatch_kevent_debug(ke, __func__); 976 if (ke->filter == EVFILT_TIMER) { 977 return _dispatch_timers_kevent(ke); 978 } 979#if HAVE_MACH 980 if (ke->filter == EVFILT_MACHPORT) { 981 return _dispatch_kevent_mach_portset(ke); 982 } 983#endif 984 return _dispatch_kevent_merge(ke); 985} 986 987DISPATCH_NOINLINE 988static void 989_dispatch_kevent_merge(struct kevent64_s *ke) 990{ 991 dispatch_kevent_t dk; 992 dispatch_source_refs_t dri; 993 994 dk = (void*)ke->udata; 995 dispatch_assert(dk); 996 997 if (ke->flags & EV_ONESHOT) { 998 dk->dk_kevent.flags |= EV_ONESHOT; 999 } 1000 TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) { 1001 _dispatch_source_merge_kevent(_dispatch_source_from_refs(dri), ke); 1002 } 1003} 1004 1005#if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD 1006static void 1007_dispatch_kevent_guard(dispatch_kevent_t dk) 1008{ 1009 guardid_t guard; 1010 const unsigned int guard_flags = GUARD_CLOSE; 1011 int r, fd_flags = 0; 1012 switch (dk->dk_kevent.filter) { 1013 case EVFILT_READ: 1014 case EVFILT_WRITE: 1015 case EVFILT_VNODE: 1016 guard = &dk->dk_kevent; 1017 r = change_fdguard_np((int)dk->dk_kevent.ident, NULL, 0, 1018 &guard, guard_flags, &fd_flags); 1019 if (slowpath(r == -1)) { 1020 int err = errno; 1021 if (err != EPERM) { 1022 (void)dispatch_assume_zero(err); 1023 } 1024 return; 1025 } 1026 dk->dk_kevent.ext[0] = guard_flags; 1027 dk->dk_kevent.ext[1] = fd_flags; 1028 break; 1029 } 1030} 1031 1032static void 1033_dispatch_kevent_unguard(dispatch_kevent_t dk) 1034{ 1035 guardid_t guard; 1036 unsigned int guard_flags; 1037 int r, fd_flags; 1038 switch (dk->dk_kevent.filter) { 1039 case EVFILT_READ: 1040 case EVFILT_WRITE: 1041 case EVFILT_VNODE: 1042 guard_flags = (unsigned int)dk->dk_kevent.ext[0]; 1043 if (!guard_flags) { 1044 return; 1045 } 1046 guard = &dk->dk_kevent; 1047 fd_flags = (int)dk->dk_kevent.ext[1]; 1048 r = change_fdguard_np((int)dk->dk_kevent.ident, &guard, 1049 guard_flags, NULL, 0, &fd_flags); 1050 if (slowpath(r == -1)) { 1051 (void)dispatch_assume_zero(errno); 1052 return; 1053 } 1054 dk->dk_kevent.ext[0] = 0; 1055 break; 1056 } 1057} 1058#endif // DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD 1059 1060#pragma mark - 1061#pragma mark dispatch_source_timer 1062 1063#if DISPATCH_USE_DTRACE && DISPATCH_USE_DTRACE_INTROSPECTION 1064static dispatch_source_refs_t 1065 _dispatch_trace_next_timer[DISPATCH_TIMER_QOS_COUNT]; 1066#define _dispatch_trace_next_timer_set(x, q) \ 1067 _dispatch_trace_next_timer[(q)] = (x) 1068#define _dispatch_trace_next_timer_program(d, q) \ 1069 _dispatch_trace_timer_program(_dispatch_trace_next_timer[(q)], (d)) 1070#define _dispatch_trace_next_timer_wake(q) \ 1071 _dispatch_trace_timer_wake(_dispatch_trace_next_timer[(q)]) 1072#else 1073#define _dispatch_trace_next_timer_set(x, q) 1074#define _dispatch_trace_next_timer_program(d, q) 1075#define _dispatch_trace_next_timer_wake(q) 1076#endif 1077 1078#define _dispatch_source_timer_telemetry_enabled() false 1079 1080DISPATCH_NOINLINE 1081static void 1082_dispatch_source_timer_telemetry_slow(dispatch_source_t ds, 1083 uintptr_t ident, struct dispatch_timer_source_s *values) 1084{ 1085 if (_dispatch_trace_timer_configure_enabled()) { 1086 _dispatch_trace_timer_configure(ds, ident, values); 1087 } 1088} 1089 1090DISPATCH_ALWAYS_INLINE 1091static inline void 1092_dispatch_source_timer_telemetry(dispatch_source_t ds, uintptr_t ident, 1093 struct dispatch_timer_source_s *values) 1094{ 1095 if (_dispatch_trace_timer_configure_enabled() || 1096 _dispatch_source_timer_telemetry_enabled()) { 1097 _dispatch_source_timer_telemetry_slow(ds, ident, values); 1098 asm(""); // prevent tailcall 1099 } 1100} 1101 1102// approx 1 year (60s * 60m * 24h * 365d) 1103#define FOREVER_NSEC 31536000000000000ull 1104 1105DISPATCH_ALWAYS_INLINE 1106static inline uint64_t 1107_dispatch_source_timer_now(uint64_t nows[], unsigned int tidx) 1108{ 1109 unsigned int tk = DISPATCH_TIMER_KIND(tidx); 1110 if (nows && fastpath(nows[tk])) { 1111 return nows[tk]; 1112 } 1113 uint64_t now; 1114 switch (tk) { 1115 case DISPATCH_TIMER_KIND_MACH: 1116 now = _dispatch_absolute_time(); 1117 break; 1118 case DISPATCH_TIMER_KIND_WALL: 1119 now = _dispatch_get_nanoseconds(); 1120 break; 1121 } 1122 if (nows) { 1123 nows[tk] = now; 1124 } 1125 return now; 1126} 1127 1128static inline unsigned long 1129_dispatch_source_timer_data(dispatch_source_refs_t dr, unsigned long prev) 1130{ 1131 // calculate the number of intervals since last fire 1132 unsigned long data, missed; 1133 uint64_t now; 1134 now = _dispatch_source_timer_now(NULL, _dispatch_source_timer_idx(dr)); 1135 missed = (unsigned long)((now - ds_timer(dr).last_fire) / 1136 ds_timer(dr).interval); 1137 // correct for missed intervals already delivered last time 1138 data = prev - ds_timer(dr).missed + missed; 1139 ds_timer(dr).missed = missed; 1140 return data; 1141} 1142 1143struct dispatch_set_timer_params { 1144 dispatch_source_t ds; 1145 uintptr_t ident; 1146 struct dispatch_timer_source_s values; 1147}; 1148 1149static void 1150_dispatch_source_set_timer3(void *context) 1151{ 1152 // Called on the _dispatch_mgr_q 1153 struct dispatch_set_timer_params *params = context; 1154 dispatch_source_t ds = params->ds; 1155 ds->ds_ident_hack = params->ident; 1156 ds_timer(ds->ds_refs) = params->values; 1157 // Clear any pending data that might have accumulated on 1158 // older timer params <rdar://problem/8574886> 1159 ds->ds_pending_data = 0; 1160 // Re-arm in case we got disarmed because of pending set_timer suspension 1161 (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, release); 1162 dispatch_resume(ds); 1163 // Must happen after resume to avoid getting disarmed due to suspension 1164 _dispatch_timers_update(ds); 1165 dispatch_release(ds); 1166 if (params->values.flags & DISPATCH_TIMER_WALL_CLOCK) { 1167 _dispatch_mach_host_calendar_change_register(); 1168 } 1169 free(params); 1170} 1171 1172static void 1173_dispatch_source_set_timer2(void *context) 1174{ 1175 // Called on the source queue 1176 struct dispatch_set_timer_params *params = context; 1177 dispatch_suspend(params->ds); 1178 dispatch_barrier_async_f(&_dispatch_mgr_q, params, 1179 _dispatch_source_set_timer3); 1180} 1181 1182DISPATCH_NOINLINE 1183static struct dispatch_set_timer_params * 1184_dispatch_source_timer_params(dispatch_source_t ds, dispatch_time_t start, 1185 uint64_t interval, uint64_t leeway) 1186{ 1187 struct dispatch_set_timer_params *params; 1188 params = _dispatch_calloc(1ul, sizeof(struct dispatch_set_timer_params)); 1189 params->ds = ds; 1190 params->values.flags = ds_timer(ds->ds_refs).flags; 1191 1192 if (interval == 0) { 1193 // we use zero internally to mean disabled 1194 interval = 1; 1195 } else if ((int64_t)interval < 0) { 1196 // 6866347 - make sure nanoseconds won't overflow 1197 interval = INT64_MAX; 1198 } 1199 if ((int64_t)leeway < 0) { 1200 leeway = INT64_MAX; 1201 } 1202 if (start == DISPATCH_TIME_NOW) { 1203 start = _dispatch_absolute_time(); 1204 } else if (start == DISPATCH_TIME_FOREVER) { 1205 start = INT64_MAX; 1206 } 1207 1208 if ((int64_t)start < 0) { 1209 // wall clock 1210 start = (dispatch_time_t)-((int64_t)start); 1211 params->values.flags |= DISPATCH_TIMER_WALL_CLOCK; 1212 } else { 1213 // absolute clock 1214 interval = _dispatch_time_nano2mach(interval); 1215 if (interval < 1) { 1216 // rdar://problem/7287561 interval must be at least one in 1217 // in order to avoid later division by zero when calculating 1218 // the missed interval count. (NOTE: the wall clock's 1219 // interval is already "fixed" to be 1 or more) 1220 interval = 1; 1221 } 1222 leeway = _dispatch_time_nano2mach(leeway); 1223 params->values.flags &= ~(unsigned long)DISPATCH_TIMER_WALL_CLOCK; 1224 } 1225 params->ident = DISPATCH_TIMER_IDENT(params->values.flags); 1226 params->values.target = start; 1227 params->values.deadline = (start < UINT64_MAX - leeway) ? 1228 start + leeway : UINT64_MAX; 1229 params->values.interval = interval; 1230 params->values.leeway = (interval == INT64_MAX || leeway < interval / 2) ? 1231 leeway : interval / 2; 1232 return params; 1233} 1234 1235DISPATCH_ALWAYS_INLINE 1236static inline void 1237_dispatch_source_set_timer(dispatch_source_t ds, dispatch_time_t start, 1238 uint64_t interval, uint64_t leeway, bool source_sync) 1239{ 1240 if (slowpath(!ds->ds_is_timer) || 1241 slowpath(ds_timer(ds->ds_refs).flags & DISPATCH_TIMER_INTERVAL)) { 1242 DISPATCH_CLIENT_CRASH("Attempt to set timer on a non-timer source"); 1243 } 1244 1245 struct dispatch_set_timer_params *params; 1246 params = _dispatch_source_timer_params(ds, start, interval, leeway); 1247 1248 _dispatch_source_timer_telemetry(ds, params->ident, ¶ms->values); 1249 // Suspend the source so that it doesn't fire with pending changes 1250 // The use of suspend/resume requires the external retain/release 1251 dispatch_retain(ds); 1252 if (source_sync) { 1253 return _dispatch_barrier_trysync_f((dispatch_queue_t)ds, params, 1254 _dispatch_source_set_timer2); 1255 } else { 1256 return _dispatch_source_set_timer2(params); 1257 } 1258} 1259 1260void 1261dispatch_source_set_timer(dispatch_source_t ds, dispatch_time_t start, 1262 uint64_t interval, uint64_t leeway) 1263{ 1264 _dispatch_source_set_timer(ds, start, interval, leeway, true); 1265} 1266 1267void 1268_dispatch_source_set_runloop_timer_4CF(dispatch_source_t ds, 1269 dispatch_time_t start, uint64_t interval, uint64_t leeway) 1270{ 1271 // Don't serialize through the source queue for CF timers <rdar://13833190> 1272 _dispatch_source_set_timer(ds, start, interval, leeway, false); 1273} 1274 1275void 1276_dispatch_source_set_interval(dispatch_source_t ds, uint64_t interval) 1277{ 1278 dispatch_source_refs_t dr = ds->ds_refs; 1279 #define NSEC_PER_FRAME (NSEC_PER_SEC/60) 1280 const bool animation = ds_timer(dr).flags & DISPATCH_INTERVAL_UI_ANIMATION; 1281 if (fastpath(interval <= (animation ? FOREVER_NSEC/NSEC_PER_FRAME : 1282 FOREVER_NSEC/NSEC_PER_MSEC))) { 1283 interval *= animation ? NSEC_PER_FRAME : NSEC_PER_MSEC; 1284 } else { 1285 interval = FOREVER_NSEC; 1286 } 1287 interval = _dispatch_time_nano2mach(interval); 1288 uint64_t target = _dispatch_absolute_time() + interval; 1289 target = (target / interval) * interval; 1290 const uint64_t leeway = animation ? 1291 _dispatch_time_nano2mach(NSEC_PER_FRAME) : interval / 2; 1292 ds_timer(dr).target = target; 1293 ds_timer(dr).deadline = target + leeway; 1294 ds_timer(dr).interval = interval; 1295 ds_timer(dr).leeway = leeway; 1296 _dispatch_source_timer_telemetry(ds, ds->ds_ident_hack, &ds_timer(dr)); 1297} 1298 1299#pragma mark - 1300#pragma mark dispatch_timers 1301 1302#define DISPATCH_TIMER_STRUCT(refs) \ 1303 uint64_t target, deadline; \ 1304 TAILQ_HEAD(, refs) dt_sources 1305 1306typedef struct dispatch_timer_s { 1307 DISPATCH_TIMER_STRUCT(dispatch_timer_source_refs_s); 1308} *dispatch_timer_t; 1309 1310#define DISPATCH_TIMER_INITIALIZER(tidx) \ 1311 [tidx] = { \ 1312 .target = UINT64_MAX, \ 1313 .deadline = UINT64_MAX, \ 1314 .dt_sources = TAILQ_HEAD_INITIALIZER( \ 1315 _dispatch_timer[tidx].dt_sources), \ 1316 } 1317#define DISPATCH_TIMER_INIT(kind, qos) \ 1318 DISPATCH_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX( \ 1319 DISPATCH_TIMER_KIND_##kind, DISPATCH_TIMER_QOS_##qos)) 1320 1321struct dispatch_timer_s _dispatch_timer[] = { 1322 DISPATCH_TIMER_INIT(WALL, NORMAL), 1323 DISPATCH_TIMER_INIT(WALL, CRITICAL), 1324 DISPATCH_TIMER_INIT(WALL, BACKGROUND), 1325 DISPATCH_TIMER_INIT(MACH, NORMAL), 1326 DISPATCH_TIMER_INIT(MACH, CRITICAL), 1327 DISPATCH_TIMER_INIT(MACH, BACKGROUND), 1328}; 1329#define DISPATCH_TIMER_COUNT \ 1330 ((sizeof(_dispatch_timer) / sizeof(_dispatch_timer[0]))) 1331 1332#define DISPATCH_KEVENT_TIMER_UDATA(tidx) \ 1333 (uintptr_t)&_dispatch_kevent_timer[tidx] 1334#ifdef __LP64__ 1335#define DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx) \ 1336 .udata = DISPATCH_KEVENT_TIMER_UDATA(tidx) 1337#else // __LP64__ 1338// dynamic initialization in _dispatch_timers_init() 1339#define DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx) \ 1340 .udata = 0 1341#endif // __LP64__ 1342#define DISPATCH_KEVENT_TIMER_INITIALIZER(tidx) \ 1343 [tidx] = { \ 1344 .dk_kevent = { \ 1345 .ident = tidx, \ 1346 .filter = DISPATCH_EVFILT_TIMER, \ 1347 DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx), \ 1348 }, \ 1349 .dk_sources = TAILQ_HEAD_INITIALIZER( \ 1350 _dispatch_kevent_timer[tidx].dk_sources), \ 1351 } 1352#define DISPATCH_KEVENT_TIMER_INIT(kind, qos) \ 1353 DISPATCH_KEVENT_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX( \ 1354 DISPATCH_TIMER_KIND_##kind, DISPATCH_TIMER_QOS_##qos)) 1355 1356struct dispatch_kevent_s _dispatch_kevent_timer[] = { 1357 DISPATCH_KEVENT_TIMER_INIT(WALL, NORMAL), 1358 DISPATCH_KEVENT_TIMER_INIT(WALL, CRITICAL), 1359 DISPATCH_KEVENT_TIMER_INIT(WALL, BACKGROUND), 1360 DISPATCH_KEVENT_TIMER_INIT(MACH, NORMAL), 1361 DISPATCH_KEVENT_TIMER_INIT(MACH, CRITICAL), 1362 DISPATCH_KEVENT_TIMER_INIT(MACH, BACKGROUND), 1363 DISPATCH_KEVENT_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX_DISARM), 1364}; 1365#define DISPATCH_KEVENT_TIMER_COUNT \ 1366 ((sizeof(_dispatch_kevent_timer) / sizeof(_dispatch_kevent_timer[0]))) 1367 1368#define DISPATCH_KEVENT_TIMEOUT_IDENT_MASK (~0ull << 8) 1369#define DISPATCH_KEVENT_TIMEOUT_INITIALIZER(qos, note) \ 1370 [qos] = { \ 1371 .ident = DISPATCH_KEVENT_TIMEOUT_IDENT_MASK|(qos), \ 1372 .filter = EVFILT_TIMER, \ 1373 .flags = EV_ONESHOT, \ 1374 .fflags = NOTE_ABSOLUTE|NOTE_NSECONDS|NOTE_LEEWAY|(note), \ 1375 } 1376#define DISPATCH_KEVENT_TIMEOUT_INIT(qos, note) \ 1377 DISPATCH_KEVENT_TIMEOUT_INITIALIZER(DISPATCH_TIMER_QOS_##qos, note) 1378 1379struct kevent64_s _dispatch_kevent_timeout[] = { 1380 DISPATCH_KEVENT_TIMEOUT_INIT(NORMAL, 0), 1381 DISPATCH_KEVENT_TIMEOUT_INIT(CRITICAL, NOTE_CRITICAL), 1382 DISPATCH_KEVENT_TIMEOUT_INIT(BACKGROUND, NOTE_BACKGROUND), 1383}; 1384 1385#define DISPATCH_KEVENT_COALESCING_WINDOW_INIT(qos, ms) \ 1386 [DISPATCH_TIMER_QOS_##qos] = 2ull * (ms) * NSEC_PER_MSEC 1387 1388static const uint64_t _dispatch_kevent_coalescing_window[] = { 1389 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(NORMAL, 75), 1390 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(CRITICAL, 1), 1391 DISPATCH_KEVENT_COALESCING_WINDOW_INIT(BACKGROUND, 100), 1392}; 1393 1394#define _dispatch_timers_insert(tidx, dra, dr, dr_list, dta, dt, dt_list) ({ \ 1395 typeof(dr) dri = NULL; typeof(dt) dti; \ 1396 if (tidx != DISPATCH_TIMER_INDEX_DISARM) { \ 1397 TAILQ_FOREACH(dri, &dra[tidx].dk_sources, dr_list) { \ 1398 if (ds_timer(dr).target < ds_timer(dri).target) { \ 1399 break; \ 1400 } \ 1401 } \ 1402 TAILQ_FOREACH(dti, &dta[tidx].dt_sources, dt_list) { \ 1403 if (ds_timer(dt).deadline < ds_timer(dti).deadline) { \ 1404 break; \ 1405 } \ 1406 } \ 1407 if (dti) { \ 1408 TAILQ_INSERT_BEFORE(dti, dt, dt_list); \ 1409 } else { \ 1410 TAILQ_INSERT_TAIL(&dta[tidx].dt_sources, dt, dt_list); \ 1411 } \ 1412 } \ 1413 if (dri) { \ 1414 TAILQ_INSERT_BEFORE(dri, dr, dr_list); \ 1415 } else { \ 1416 TAILQ_INSERT_TAIL(&dra[tidx].dk_sources, dr, dr_list); \ 1417 } \ 1418 }) 1419 1420#define _dispatch_timers_remove(tidx, dk, dra, dr, dr_list, dta, dt, dt_list) \ 1421 ({ \ 1422 if (tidx != DISPATCH_TIMER_INDEX_DISARM) { \ 1423 TAILQ_REMOVE(&dta[tidx].dt_sources, dt, dt_list); \ 1424 } \ 1425 TAILQ_REMOVE(dk ? &(*(dk)).dk_sources : &dra[tidx].dk_sources, dr, \ 1426 dr_list); }) 1427 1428#define _dispatch_timers_check(dra, dta) ({ \ 1429 unsigned int qosm = _dispatch_timers_qos_mask; \ 1430 bool update = false; \ 1431 unsigned int tidx; \ 1432 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) { \ 1433 if (!(qosm & 1 << DISPATCH_TIMER_QOS(tidx))){ \ 1434 continue; \ 1435 } \ 1436 dispatch_timer_source_refs_t dr = (dispatch_timer_source_refs_t) \ 1437 TAILQ_FIRST(&dra[tidx].dk_sources); \ 1438 dispatch_timer_source_refs_t dt = (dispatch_timer_source_refs_t) \ 1439 TAILQ_FIRST(&dta[tidx].dt_sources); \ 1440 uint64_t target = dr ? ds_timer(dr).target : UINT64_MAX; \ 1441 uint64_t deadline = dr ? ds_timer(dt).deadline : UINT64_MAX; \ 1442 if (target != dta[tidx].target) { \ 1443 dta[tidx].target = target; \ 1444 update = true; \ 1445 } \ 1446 if (deadline != dta[tidx].deadline) { \ 1447 dta[tidx].deadline = deadline; \ 1448 update = true; \ 1449 } \ 1450 } \ 1451 update; }) 1452 1453static bool _dispatch_timers_reconfigure, _dispatch_timer_expired; 1454static unsigned int _dispatch_timers_qos_mask; 1455static bool _dispatch_timers_force_max_leeway; 1456 1457static void 1458_dispatch_timers_init(void) 1459{ 1460#ifndef __LP64__ 1461 unsigned int tidx; 1462 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) { 1463 _dispatch_kevent_timer[tidx].dk_kevent.udata = \ 1464 DISPATCH_KEVENT_TIMER_UDATA(tidx); 1465 } 1466#endif // __LP64__ 1467 _dispatch_timers_force_max_leeway = 1468 getenv("LIBDISPATCH_TIMERS_FORCE_MAX_LEEWAY"); 1469} 1470 1471static inline void 1472_dispatch_timers_unregister(dispatch_source_t ds, dispatch_kevent_t dk) 1473{ 1474 dispatch_source_refs_t dr = ds->ds_refs; 1475 unsigned int tidx = (unsigned int)dk->dk_kevent.ident; 1476 1477 if (slowpath(ds_timer_aggregate(ds))) { 1478 _dispatch_timer_aggregates_unregister(ds, tidx); 1479 } 1480 _dispatch_timers_remove(tidx, dk, _dispatch_kevent_timer, dr, dr_list, 1481 _dispatch_timer, (dispatch_timer_source_refs_t)dr, dt_list); 1482 if (tidx != DISPATCH_TIMER_INDEX_DISARM) { 1483 _dispatch_timers_reconfigure = true; 1484 _dispatch_timers_qos_mask |= 1 << DISPATCH_TIMER_QOS(tidx); 1485 } 1486} 1487 1488// Updates the ordered list of timers based on next fire date for changes to ds. 1489// Should only be called from the context of _dispatch_mgr_q. 1490static void 1491_dispatch_timers_update(dispatch_source_t ds) 1492{ 1493 dispatch_kevent_t dk = ds->ds_dkev; 1494 dispatch_source_refs_t dr = ds->ds_refs; 1495 unsigned int tidx; 1496 1497 DISPATCH_ASSERT_ON_MANAGER_QUEUE(); 1498 1499 // Do not reschedule timers unregistered with _dispatch_kevent_unregister() 1500 if (slowpath(!dk)) { 1501 return; 1502 } 1503 // Move timers that are disabled, suspended or have missed intervals to the 1504 // disarmed list, rearm after resume resp. source invoke will reenable them 1505 if (!ds_timer(dr).target || DISPATCH_OBJECT_SUSPENDED(ds) || 1506 ds->ds_pending_data) { 1507 tidx = DISPATCH_TIMER_INDEX_DISARM; 1508 (void)dispatch_atomic_and2o(ds, ds_atomic_flags, ~DSF_ARMED, relaxed); 1509 } else { 1510 tidx = _dispatch_source_timer_idx(dr); 1511 } 1512 if (slowpath(ds_timer_aggregate(ds))) { 1513 _dispatch_timer_aggregates_register(ds); 1514 } 1515 if (slowpath(!ds->ds_is_installed)) { 1516 ds->ds_is_installed = true; 1517 if (tidx != DISPATCH_TIMER_INDEX_DISARM) { 1518 (void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, relaxed); 1519 } 1520 free(dk); 1521 _dispatch_object_debug(ds, "%s", __func__); 1522 } else { 1523 _dispatch_timers_unregister(ds, dk); 1524 } 1525 if (tidx != DISPATCH_TIMER_INDEX_DISARM) { 1526 _dispatch_timers_reconfigure = true; 1527 _dispatch_timers_qos_mask |= 1 << DISPATCH_TIMER_QOS(tidx); 1528 } 1529 if (dk != &_dispatch_kevent_timer[tidx]){ 1530 ds->ds_dkev = &_dispatch_kevent_timer[tidx]; 1531 } 1532 _dispatch_timers_insert(tidx, _dispatch_kevent_timer, dr, dr_list, 1533 _dispatch_timer, (dispatch_timer_source_refs_t)dr, dt_list); 1534 if (slowpath(ds_timer_aggregate(ds))) { 1535 _dispatch_timer_aggregates_update(ds, tidx); 1536 } 1537} 1538 1539static inline void 1540_dispatch_timers_run2(uint64_t nows[], unsigned int tidx) 1541{ 1542 dispatch_source_refs_t dr; 1543 dispatch_source_t ds; 1544 uint64_t now, missed; 1545 1546 now = _dispatch_source_timer_now(nows, tidx); 1547 while ((dr = TAILQ_FIRST(&_dispatch_kevent_timer[tidx].dk_sources))) { 1548 ds = _dispatch_source_from_refs(dr); 1549 // We may find timers on the wrong list due to a pending update from 1550 // dispatch_source_set_timer. Force an update of the list in that case. 1551 if (tidx != ds->ds_ident_hack) { 1552 _dispatch_timers_update(ds); 1553 continue; 1554 } 1555 if (!ds_timer(dr).target) { 1556 // No configured timers on the list 1557 break; 1558 } 1559 if (ds_timer(dr).target > now) { 1560 // Done running timers for now. 1561 break; 1562 } 1563 // Remove timers that are suspended or have missed intervals from the 1564 // list, rearm after resume resp. source invoke will reenable them 1565 if (DISPATCH_OBJECT_SUSPENDED(ds) || ds->ds_pending_data) { 1566 _dispatch_timers_update(ds); 1567 continue; 1568 } 1569 // Calculate number of missed intervals. 1570 missed = (now - ds_timer(dr).target) / ds_timer(dr).interval; 1571 if (++missed > INT_MAX) { 1572 missed = INT_MAX; 1573 } 1574 if (ds_timer(dr).interval < INT64_MAX) { 1575 ds_timer(dr).target += missed * ds_timer(dr).interval; 1576 ds_timer(dr).deadline = ds_timer(dr).target + ds_timer(dr).leeway; 1577 } else { 1578 ds_timer(dr).target = UINT64_MAX; 1579 ds_timer(dr).deadline = UINT64_MAX; 1580 } 1581 _dispatch_timers_update(ds); 1582 ds_timer(dr).last_fire = now; 1583 1584 unsigned long data; 1585 data = dispatch_atomic_add2o(ds, ds_pending_data, 1586 (unsigned long)missed, relaxed); 1587 _dispatch_trace_timer_fire(dr, data, (unsigned long)missed); 1588 _dispatch_wakeup(ds); 1589 } 1590} 1591 1592DISPATCH_NOINLINE 1593static void 1594_dispatch_timers_run(uint64_t nows[]) 1595{ 1596 unsigned int tidx; 1597 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) { 1598 if (!TAILQ_EMPTY(&_dispatch_kevent_timer[tidx].dk_sources)) { 1599 _dispatch_timers_run2(nows, tidx); 1600 } 1601 } 1602} 1603 1604static inline unsigned int 1605_dispatch_timers_get_delay(uint64_t nows[], struct dispatch_timer_s timer[], 1606 uint64_t *delay, uint64_t *leeway, int qos) 1607{ 1608 unsigned int tidx, ridx = DISPATCH_TIMER_COUNT; 1609 uint64_t tmp, delta = UINT64_MAX, dldelta = UINT64_MAX; 1610 1611 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) { 1612 if (qos >= 0 && qos != DISPATCH_TIMER_QOS(tidx)){ 1613 continue; 1614 } 1615 uint64_t target = timer[tidx].target; 1616 if (target == UINT64_MAX) { 1617 continue; 1618 } 1619 uint64_t deadline = timer[tidx].deadline; 1620 if (qos >= 0) { 1621 // Timer pre-coalescing <rdar://problem/13222034> 1622 uint64_t window = _dispatch_kevent_coalescing_window[qos]; 1623 uint64_t latest = deadline > window ? deadline - window : 0; 1624 dispatch_source_refs_t dri; 1625 TAILQ_FOREACH(dri, &_dispatch_kevent_timer[tidx].dk_sources, 1626 dr_list) { 1627 tmp = ds_timer(dri).target; 1628 if (tmp > latest) break; 1629 target = tmp; 1630 } 1631 } 1632 uint64_t now = _dispatch_source_timer_now(nows, tidx); 1633 if (target <= now) { 1634 delta = 0; 1635 break; 1636 } 1637 tmp = target - now; 1638 if (DISPATCH_TIMER_KIND(tidx) != DISPATCH_TIMER_KIND_WALL) { 1639 tmp = _dispatch_time_mach2nano(tmp); 1640 } 1641 if (tmp < INT64_MAX && tmp < delta) { 1642 ridx = tidx; 1643 delta = tmp; 1644 } 1645 dispatch_assert(target <= deadline); 1646 tmp = deadline - now; 1647 if (DISPATCH_TIMER_KIND(tidx) != DISPATCH_TIMER_KIND_WALL) { 1648 tmp = _dispatch_time_mach2nano(tmp); 1649 } 1650 if (tmp < INT64_MAX && tmp < dldelta) { 1651 dldelta = tmp; 1652 } 1653 } 1654 *delay = delta; 1655 *leeway = delta && delta < UINT64_MAX ? dldelta - delta : UINT64_MAX; 1656 return ridx; 1657} 1658 1659static bool 1660_dispatch_timers_program2(uint64_t nows[], struct kevent64_s *ke, 1661 unsigned int qos) 1662{ 1663 unsigned int tidx; 1664 bool poll; 1665 uint64_t delay, leeway; 1666 1667 tidx = _dispatch_timers_get_delay(nows, _dispatch_timer, &delay, &leeway, 1668 (int)qos); 1669 poll = (delay == 0); 1670 if (poll || delay == UINT64_MAX) { 1671 _dispatch_trace_next_timer_set(NULL, qos); 1672 if (!ke->data) { 1673 return poll; 1674 } 1675 ke->data = 0; 1676 ke->flags |= EV_DELETE; 1677 ke->flags &= ~(EV_ADD|EV_ENABLE); 1678 } else { 1679 _dispatch_trace_next_timer_set( 1680 TAILQ_FIRST(&_dispatch_kevent_timer[tidx].dk_sources), qos); 1681 _dispatch_trace_next_timer_program(delay, qos); 1682 delay += _dispatch_source_timer_now(nows, DISPATCH_TIMER_KIND_WALL); 1683 if (slowpath(_dispatch_timers_force_max_leeway)) { 1684 ke->data = (int64_t)(delay + leeway); 1685 ke->ext[1] = 0; 1686 } else { 1687 ke->data = (int64_t)delay; 1688 ke->ext[1] = leeway; 1689 } 1690 ke->flags |= EV_ADD|EV_ENABLE; 1691 ke->flags &= ~EV_DELETE; 1692 } 1693 _dispatch_kq_update(ke); 1694 return poll; 1695} 1696 1697DISPATCH_NOINLINE 1698static bool 1699_dispatch_timers_program(uint64_t nows[]) 1700{ 1701 bool poll = false; 1702 unsigned int qos, qosm = _dispatch_timers_qos_mask; 1703 for (qos = 0; qos < DISPATCH_TIMER_QOS_COUNT; qos++) { 1704 if (!(qosm & 1 << qos)){ 1705 continue; 1706 } 1707 poll |= _dispatch_timers_program2(nows, &_dispatch_kevent_timeout[qos], 1708 qos); 1709 } 1710 return poll; 1711} 1712 1713DISPATCH_NOINLINE 1714static bool 1715_dispatch_timers_configure(void) 1716{ 1717 _dispatch_timer_aggregates_check(); 1718 // Find out if there is a new target/deadline on the timer lists 1719 return _dispatch_timers_check(_dispatch_kevent_timer, _dispatch_timer); 1720} 1721 1722static void 1723_dispatch_timers_calendar_change(void) 1724{ 1725 // calendar change may have gone past the wallclock deadline 1726 _dispatch_timer_expired = true; 1727 _dispatch_timers_qos_mask = ~0u; 1728} 1729 1730static void 1731_dispatch_timers_kevent(struct kevent64_s *ke) 1732{ 1733 dispatch_assert(ke->data > 0); 1734 dispatch_assert((ke->ident & DISPATCH_KEVENT_TIMEOUT_IDENT_MASK) == 1735 DISPATCH_KEVENT_TIMEOUT_IDENT_MASK); 1736 unsigned int qos = ke->ident & ~DISPATCH_KEVENT_TIMEOUT_IDENT_MASK; 1737 dispatch_assert(qos < DISPATCH_TIMER_QOS_COUNT); 1738 dispatch_assert(_dispatch_kevent_timeout[qos].data); 1739 _dispatch_kevent_timeout[qos].data = 0; // kevent deleted via EV_ONESHOT 1740 _dispatch_timer_expired = true; 1741 _dispatch_timers_qos_mask |= 1 << qos; 1742 _dispatch_trace_next_timer_wake(qos); 1743} 1744 1745static inline bool 1746_dispatch_mgr_timers(void) 1747{ 1748 uint64_t nows[DISPATCH_TIMER_KIND_COUNT] = {}; 1749 bool expired = slowpath(_dispatch_timer_expired); 1750 if (expired) { 1751 _dispatch_timers_run(nows); 1752 } 1753 bool reconfigure = slowpath(_dispatch_timers_reconfigure); 1754 if (reconfigure || expired) { 1755 if (reconfigure) { 1756 reconfigure = _dispatch_timers_configure(); 1757 _dispatch_timers_reconfigure = false; 1758 } 1759 if (reconfigure || expired) { 1760 expired = _dispatch_timer_expired = _dispatch_timers_program(nows); 1761 expired = expired || _dispatch_mgr_q.dq_items_tail; 1762 } 1763 _dispatch_timers_qos_mask = 0; 1764 } 1765 return expired; 1766} 1767 1768#pragma mark - 1769#pragma mark dispatch_timer_aggregate 1770 1771typedef struct { 1772 TAILQ_HEAD(, dispatch_timer_source_aggregate_refs_s) dk_sources; 1773} dispatch_timer_aggregate_refs_s; 1774 1775typedef struct dispatch_timer_aggregate_s { 1776 DISPATCH_STRUCT_HEADER(queue); 1777 DISPATCH_QUEUE_HEADER; 1778 TAILQ_ENTRY(dispatch_timer_aggregate_s) dta_list; 1779 dispatch_timer_aggregate_refs_s 1780 dta_kevent_timer[DISPATCH_KEVENT_TIMER_COUNT]; 1781 struct { 1782 DISPATCH_TIMER_STRUCT(dispatch_timer_source_aggregate_refs_s); 1783 } dta_timer[DISPATCH_TIMER_COUNT]; 1784 struct dispatch_timer_s dta_timer_data[DISPATCH_TIMER_COUNT]; 1785 unsigned int dta_refcount; 1786} dispatch_timer_aggregate_s; 1787 1788typedef TAILQ_HEAD(, dispatch_timer_aggregate_s) dispatch_timer_aggregates_s; 1789static dispatch_timer_aggregates_s _dispatch_timer_aggregates = 1790 TAILQ_HEAD_INITIALIZER(_dispatch_timer_aggregates); 1791 1792dispatch_timer_aggregate_t 1793dispatch_timer_aggregate_create(void) 1794{ 1795 unsigned int tidx; 1796 dispatch_timer_aggregate_t dta = _dispatch_alloc(DISPATCH_VTABLE(queue), 1797 sizeof(struct dispatch_timer_aggregate_s)); 1798 _dispatch_queue_init((dispatch_queue_t)dta); 1799 dta->do_targetq = _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 1800 true); 1801 dta->dq_width = UINT32_MAX; 1802 //FIXME: aggregates need custom vtable 1803 //dta->dq_label = "timer-aggregate"; 1804 for (tidx = 0; tidx < DISPATCH_KEVENT_TIMER_COUNT; tidx++) { 1805 TAILQ_INIT(&dta->dta_kevent_timer[tidx].dk_sources); 1806 } 1807 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) { 1808 TAILQ_INIT(&dta->dta_timer[tidx].dt_sources); 1809 dta->dta_timer[tidx].target = UINT64_MAX; 1810 dta->dta_timer[tidx].deadline = UINT64_MAX; 1811 dta->dta_timer_data[tidx].target = UINT64_MAX; 1812 dta->dta_timer_data[tidx].deadline = UINT64_MAX; 1813 } 1814 return (dispatch_timer_aggregate_t)_dispatch_introspection_queue_create( 1815 (dispatch_queue_t)dta); 1816} 1817 1818typedef struct dispatch_timer_delay_s { 1819 dispatch_timer_t timer; 1820 uint64_t delay, leeway; 1821} *dispatch_timer_delay_t; 1822 1823static void 1824_dispatch_timer_aggregate_get_delay(void *ctxt) 1825{ 1826 dispatch_timer_delay_t dtd = ctxt; 1827 struct { uint64_t nows[DISPATCH_TIMER_KIND_COUNT]; } dtn = {}; 1828 _dispatch_timers_get_delay(dtn.nows, dtd->timer, &dtd->delay, &dtd->leeway, 1829 -1); 1830} 1831 1832uint64_t 1833dispatch_timer_aggregate_get_delay(dispatch_timer_aggregate_t dta, 1834 uint64_t *leeway_ptr) 1835{ 1836 struct dispatch_timer_delay_s dtd = { 1837 .timer = dta->dta_timer_data, 1838 }; 1839 dispatch_sync_f((dispatch_queue_t)dta, &dtd, 1840 _dispatch_timer_aggregate_get_delay); 1841 if (leeway_ptr) { 1842 *leeway_ptr = dtd.leeway; 1843 } 1844 return dtd.delay; 1845} 1846 1847static void 1848_dispatch_timer_aggregate_update(void *ctxt) 1849{ 1850 dispatch_timer_aggregate_t dta = (void*)_dispatch_queue_get_current(); 1851 dispatch_timer_t dtau = ctxt; 1852 unsigned int tidx; 1853 for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) { 1854 dta->dta_timer_data[tidx].target = dtau[tidx].target; 1855 dta->dta_timer_data[tidx].deadline = dtau[tidx].deadline; 1856 } 1857 free(dtau); 1858} 1859 1860DISPATCH_NOINLINE 1861static void 1862_dispatch_timer_aggregates_configure(void) 1863{ 1864 dispatch_timer_aggregate_t dta; 1865 dispatch_timer_t dtau; 1866 TAILQ_FOREACH(dta, &_dispatch_timer_aggregates, dta_list) { 1867 if (!_dispatch_timers_check(dta->dta_kevent_timer, dta->dta_timer)) { 1868 continue; 1869 } 1870 dtau = _dispatch_calloc(DISPATCH_TIMER_COUNT, sizeof(*dtau)); 1871 memcpy(dtau, dta->dta_timer, sizeof(dta->dta_timer)); 1872 dispatch_barrier_async_f((dispatch_queue_t)dta, dtau, 1873 _dispatch_timer_aggregate_update); 1874 } 1875} 1876 1877static inline void 1878_dispatch_timer_aggregates_check(void) 1879{ 1880 if (fastpath(TAILQ_EMPTY(&_dispatch_timer_aggregates))) { 1881 return; 1882 } 1883 _dispatch_timer_aggregates_configure(); 1884} 1885 1886static void 1887_dispatch_timer_aggregates_register(dispatch_source_t ds) 1888{ 1889 dispatch_timer_aggregate_t dta = ds_timer_aggregate(ds); 1890 if (!dta->dta_refcount++) { 1891 TAILQ_INSERT_TAIL(&_dispatch_timer_aggregates, dta, dta_list); 1892 } 1893} 1894 1895DISPATCH_NOINLINE 1896static void 1897_dispatch_timer_aggregates_update(dispatch_source_t ds, unsigned int tidx) 1898{ 1899 dispatch_timer_aggregate_t dta = ds_timer_aggregate(ds); 1900 dispatch_timer_source_aggregate_refs_t dr; 1901 dr = (dispatch_timer_source_aggregate_refs_t)ds->ds_refs; 1902 _dispatch_timers_insert(tidx, dta->dta_kevent_timer, dr, dra_list, 1903 dta->dta_timer, dr, dta_list); 1904} 1905 1906DISPATCH_NOINLINE 1907static void 1908_dispatch_timer_aggregates_unregister(dispatch_source_t ds, unsigned int tidx) 1909{ 1910 dispatch_timer_aggregate_t dta = ds_timer_aggregate(ds); 1911 dispatch_timer_source_aggregate_refs_t dr; 1912 dr = (dispatch_timer_source_aggregate_refs_t)ds->ds_refs; 1913 _dispatch_timers_remove(tidx, (dispatch_timer_aggregate_refs_s*)NULL, 1914 dta->dta_kevent_timer, dr, dra_list, dta->dta_timer, dr, dta_list); 1915 if (!--dta->dta_refcount) { 1916 TAILQ_REMOVE(&_dispatch_timer_aggregates, dta, dta_list); 1917 } 1918} 1919 1920#pragma mark - 1921#pragma mark dispatch_select 1922 1923static int _dispatch_kq; 1924 1925static unsigned int _dispatch_select_workaround; 1926static fd_set _dispatch_rfds; 1927static fd_set _dispatch_wfds; 1928static uint64_t*_dispatch_rfd_ptrs; 1929static uint64_t*_dispatch_wfd_ptrs; 1930 1931DISPATCH_NOINLINE 1932static bool 1933_dispatch_select_register(struct kevent64_s *kev) 1934{ 1935 1936 // Must execute on manager queue 1937 DISPATCH_ASSERT_ON_MANAGER_QUEUE(); 1938 1939 // If an EINVAL or ENOENT error occurred while adding/enabling a read or 1940 // write kevent, assume it was due to a type of filedescriptor not 1941 // supported by kqueue and fall back to select 1942 switch (kev->filter) { 1943 case EVFILT_READ: 1944 if ((kev->data == EINVAL || kev->data == ENOENT) && 1945 dispatch_assume(kev->ident < FD_SETSIZE)) { 1946 FD_SET((int)kev->ident, &_dispatch_rfds); 1947 if (slowpath(!_dispatch_rfd_ptrs)) { 1948 _dispatch_rfd_ptrs = _dispatch_calloc(FD_SETSIZE, 1949 sizeof(*_dispatch_rfd_ptrs)); 1950 } 1951 if (!_dispatch_rfd_ptrs[kev->ident]) { 1952 _dispatch_rfd_ptrs[kev->ident] = kev->udata; 1953 _dispatch_select_workaround++; 1954 _dispatch_debug("select workaround used to read fd %d: 0x%lx", 1955 (int)kev->ident, (long)kev->data); 1956 } 1957 } 1958 return true; 1959 case EVFILT_WRITE: 1960 if ((kev->data == EINVAL || kev->data == ENOENT) && 1961 dispatch_assume(kev->ident < FD_SETSIZE)) { 1962 FD_SET((int)kev->ident, &_dispatch_wfds); 1963 if (slowpath(!_dispatch_wfd_ptrs)) { 1964 _dispatch_wfd_ptrs = _dispatch_calloc(FD_SETSIZE, 1965 sizeof(*_dispatch_wfd_ptrs)); 1966 } 1967 if (!_dispatch_wfd_ptrs[kev->ident]) { 1968 _dispatch_wfd_ptrs[kev->ident] = kev->udata; 1969 _dispatch_select_workaround++; 1970 _dispatch_debug("select workaround used to write fd %d: 0x%lx", 1971 (int)kev->ident, (long)kev->data); 1972 } 1973 } 1974 return true; 1975 } 1976 return false; 1977} 1978 1979DISPATCH_NOINLINE 1980static bool 1981_dispatch_select_unregister(const struct kevent64_s *kev) 1982{ 1983 // Must execute on manager queue 1984 DISPATCH_ASSERT_ON_MANAGER_QUEUE(); 1985 1986 switch (kev->filter) { 1987 case EVFILT_READ: 1988 if (_dispatch_rfd_ptrs && kev->ident < FD_SETSIZE && 1989 _dispatch_rfd_ptrs[kev->ident]) { 1990 FD_CLR((int)kev->ident, &_dispatch_rfds); 1991 _dispatch_rfd_ptrs[kev->ident] = 0; 1992 _dispatch_select_workaround--; 1993 return true; 1994 } 1995 break; 1996 case EVFILT_WRITE: 1997 if (_dispatch_wfd_ptrs && kev->ident < FD_SETSIZE && 1998 _dispatch_wfd_ptrs[kev->ident]) { 1999 FD_CLR((int)kev->ident, &_dispatch_wfds); 2000 _dispatch_wfd_ptrs[kev->ident] = 0; 2001 _dispatch_select_workaround--; 2002 return true; 2003 } 2004 break; 2005 } 2006 return false; 2007} 2008 2009DISPATCH_NOINLINE 2010static bool 2011_dispatch_mgr_select(bool poll) 2012{ 2013 static const struct timeval timeout_immediately = { 0, 0 }; 2014 fd_set tmp_rfds, tmp_wfds; 2015 struct kevent64_s kev; 2016 int err, i, r; 2017 bool kevent_avail = false; 2018 2019 FD_COPY(&_dispatch_rfds, &tmp_rfds); 2020 FD_COPY(&_dispatch_wfds, &tmp_wfds); 2021 2022 r = select(FD_SETSIZE, &tmp_rfds, &tmp_wfds, NULL, 2023 poll ? (struct timeval*)&timeout_immediately : NULL); 2024 if (slowpath(r == -1)) { 2025 err = errno; 2026 if (err != EBADF) { 2027 if (err != EINTR) { 2028 (void)dispatch_assume_zero(err); 2029 } 2030 return false; 2031 } 2032 for (i = 0; i < FD_SETSIZE; i++) { 2033 if (i == _dispatch_kq) { 2034 continue; 2035 } 2036 if (!FD_ISSET(i, &_dispatch_rfds) && !FD_ISSET(i, &_dispatch_wfds)){ 2037 continue; 2038 } 2039 r = dup(i); 2040 if (dispatch_assume(r != -1)) { 2041 close(r); 2042 } else { 2043 if (_dispatch_rfd_ptrs && _dispatch_rfd_ptrs[i]) { 2044 FD_CLR(i, &_dispatch_rfds); 2045 _dispatch_rfd_ptrs[i] = 0; 2046 _dispatch_select_workaround--; 2047 } 2048 if (_dispatch_wfd_ptrs && _dispatch_wfd_ptrs[i]) { 2049 FD_CLR(i, &_dispatch_wfds); 2050 _dispatch_wfd_ptrs[i] = 0; 2051 _dispatch_select_workaround--; 2052 } 2053 } 2054 } 2055 return false; 2056 } 2057 if (r > 0) { 2058 for (i = 0; i < FD_SETSIZE; i++) { 2059 if (FD_ISSET(i, &tmp_rfds)) { 2060 if (i == _dispatch_kq) { 2061 kevent_avail = true; 2062 continue; 2063 } 2064 FD_CLR(i, &_dispatch_rfds); // emulate EV_DISPATCH 2065 EV_SET64(&kev, i, EVFILT_READ, 2066 EV_ADD|EV_ENABLE|EV_DISPATCH, 0, 1, 2067 _dispatch_rfd_ptrs[i], 0, 0); 2068 _dispatch_kevent_drain(&kev); 2069 } 2070 if (FD_ISSET(i, &tmp_wfds)) { 2071 FD_CLR(i, &_dispatch_wfds); // emulate EV_DISPATCH 2072 EV_SET64(&kev, i, EVFILT_WRITE, 2073 EV_ADD|EV_ENABLE|EV_DISPATCH, 0, 1, 2074 _dispatch_wfd_ptrs[i], 0, 0); 2075 _dispatch_kevent_drain(&kev); 2076 } 2077 } 2078 } 2079 return kevent_avail; 2080} 2081 2082#pragma mark - 2083#pragma mark dispatch_kqueue 2084 2085static void 2086_dispatch_kq_init(void *context DISPATCH_UNUSED) 2087{ 2088 static const struct kevent64_s kev = { 2089 .ident = 1, 2090 .filter = EVFILT_USER, 2091 .flags = EV_ADD|EV_CLEAR, 2092 }; 2093 2094 _dispatch_safe_fork = false; 2095#if DISPATCH_USE_GUARDED_FD 2096 guardid_t guard = (uintptr_t)&kev; 2097 _dispatch_kq = guarded_kqueue_np(&guard, GUARD_CLOSE | GUARD_DUP); 2098#else 2099 _dispatch_kq = kqueue(); 2100#endif 2101 if (_dispatch_kq == -1) { 2102 DISPATCH_CLIENT_CRASH("kqueue() create failed: " 2103 "probably out of file descriptors"); 2104 } else if (dispatch_assume(_dispatch_kq < FD_SETSIZE)) { 2105 // in case we fall back to select() 2106 FD_SET(_dispatch_kq, &_dispatch_rfds); 2107 } 2108 2109 (void)dispatch_assume_zero(kevent64(_dispatch_kq, &kev, 1, NULL, 0, 0, 2110 NULL)); 2111 _dispatch_queue_push(_dispatch_mgr_q.do_targetq, &_dispatch_mgr_q); 2112} 2113 2114static int 2115_dispatch_get_kq(void) 2116{ 2117 static dispatch_once_t pred; 2118 2119 dispatch_once_f(&pred, NULL, _dispatch_kq_init); 2120 2121 return _dispatch_kq; 2122} 2123 2124DISPATCH_NOINLINE 2125static long 2126_dispatch_kq_update(const struct kevent64_s *kev) 2127{ 2128 int r; 2129 struct kevent64_s kev_copy; 2130 2131 if (slowpath(_dispatch_select_workaround) && (kev->flags & EV_DELETE)) { 2132 if (_dispatch_select_unregister(kev)) { 2133 return 0; 2134 } 2135 } 2136 kev_copy = *kev; 2137 // This ensures we don't get a pending kevent back while registering 2138 // a new kevent 2139 kev_copy.flags |= EV_RECEIPT; 2140retry: 2141 r = dispatch_assume(kevent64(_dispatch_get_kq(), &kev_copy, 1, 2142 &kev_copy, 1, 0, NULL)); 2143 if (slowpath(r == -1)) { 2144 int err = errno; 2145 switch (err) { 2146 case EINTR: 2147 goto retry; 2148 case EBADF: 2149 DISPATCH_CLIENT_CRASH("Do not close random Unix descriptors"); 2150 break; 2151 default: 2152 (void)dispatch_assume_zero(err); 2153 break; 2154 } 2155 return err; 2156 } 2157 switch (kev_copy.data) { 2158 case 0: 2159 return 0; 2160 case EBADF: 2161 case EPERM: 2162 case EINVAL: 2163 case ENOENT: 2164 if ((kev->flags & (EV_ADD|EV_ENABLE)) && !(kev->flags & EV_DELETE)) { 2165 if (_dispatch_select_register(&kev_copy)) { 2166 return 0; 2167 } 2168 } 2169 // fall through 2170 default: 2171 kev_copy.flags |= kev->flags; 2172 _dispatch_kevent_drain(&kev_copy); 2173 break; 2174 } 2175 return (long)kev_copy.data; 2176} 2177 2178#pragma mark - 2179#pragma mark dispatch_mgr 2180 2181static struct kevent64_s *_dispatch_kevent_enable; 2182 2183static void inline 2184_dispatch_mgr_kevent_reenable(struct kevent64_s *ke) 2185{ 2186 dispatch_assert(!_dispatch_kevent_enable || _dispatch_kevent_enable == ke); 2187 _dispatch_kevent_enable = ke; 2188} 2189 2190unsigned long 2191_dispatch_mgr_wakeup(dispatch_queue_t dq DISPATCH_UNUSED) 2192{ 2193 if (_dispatch_queue_get_current() == &_dispatch_mgr_q) { 2194 return false; 2195 } 2196 2197 static const struct kevent64_s kev = { 2198 .ident = 1, 2199 .filter = EVFILT_USER, 2200 .fflags = NOTE_TRIGGER, 2201 }; 2202 2203#if DISPATCH_DEBUG && DISPATCH_MGR_QUEUE_DEBUG 2204 _dispatch_debug("waking up the dispatch manager queue: %p", dq); 2205#endif 2206 2207 _dispatch_kq_update(&kev); 2208 2209 return false; 2210} 2211 2212DISPATCH_NOINLINE 2213static void 2214_dispatch_mgr_init(void) 2215{ 2216 (void)dispatch_atomic_inc2o(&_dispatch_mgr_q, dq_running, relaxed); 2217 _dispatch_thread_setspecific(dispatch_queue_key, &_dispatch_mgr_q); 2218 _dispatch_queue_set_bound_thread(&_dispatch_mgr_q); 2219 _dispatch_mgr_priority_init(); 2220 _dispatch_kevent_init(); 2221 _dispatch_timers_init(); 2222 _dispatch_mach_recv_msg_buf_init(); 2223 _dispatch_memorystatus_init(); 2224} 2225 2226DISPATCH_NOINLINE DISPATCH_NORETURN 2227static void 2228_dispatch_mgr_invoke(void) 2229{ 2230 static const struct timespec timeout_immediately = { 0, 0 }; 2231 struct kevent64_s kev; 2232 bool poll; 2233 int r; 2234 2235 for (;;) { 2236 _dispatch_mgr_queue_drain(); 2237 poll = _dispatch_mgr_timers(); 2238 if (slowpath(_dispatch_select_workaround)) { 2239 poll = _dispatch_mgr_select(poll); 2240 if (!poll) continue; 2241 } 2242 r = kevent64(_dispatch_kq, _dispatch_kevent_enable, 2243 _dispatch_kevent_enable ? 1 : 0, &kev, 1, 0, 2244 poll ? &timeout_immediately : NULL); 2245 _dispatch_kevent_enable = NULL; 2246 if (slowpath(r == -1)) { 2247 int err = errno; 2248 switch (err) { 2249 case EINTR: 2250 break; 2251 case EBADF: 2252 DISPATCH_CLIENT_CRASH("Do not close random Unix descriptors"); 2253 break; 2254 default: 2255 (void)dispatch_assume_zero(err); 2256 break; 2257 } 2258 } else if (r) { 2259 _dispatch_kevent_drain(&kev); 2260 } 2261 } 2262} 2263 2264DISPATCH_NORETURN 2265void 2266_dispatch_mgr_thread(dispatch_queue_t dq DISPATCH_UNUSED) 2267{ 2268 _dispatch_mgr_init(); 2269 // never returns, so burn bridges behind us & clear stack 2k ahead 2270 _dispatch_clear_stack(2048); 2271 _dispatch_mgr_invoke(); 2272} 2273 2274#pragma mark - 2275#pragma mark dispatch_memorystatus 2276 2277#if DISPATCH_USE_MEMORYSTATUS_SOURCE 2278#define DISPATCH_MEMORYSTATUS_SOURCE_TYPE DISPATCH_SOURCE_TYPE_MEMORYSTATUS 2279#define DISPATCH_MEMORYSTATUS_SOURCE_MASK ( \ 2280 DISPATCH_MEMORYSTATUS_PRESSURE_NORMAL | \ 2281 DISPATCH_MEMORYSTATUS_PRESSURE_WARN) 2282#elif DISPATCH_USE_VM_PRESSURE_SOURCE 2283#define DISPATCH_MEMORYSTATUS_SOURCE_TYPE DISPATCH_SOURCE_TYPE_VM 2284#define DISPATCH_MEMORYSTATUS_SOURCE_MASK DISPATCH_VM_PRESSURE 2285#endif 2286 2287#if DISPATCH_USE_MEMORYSTATUS_SOURCE || DISPATCH_USE_VM_PRESSURE_SOURCE 2288static dispatch_source_t _dispatch_memorystatus_source; 2289 2290static void 2291_dispatch_memorystatus_handler(void *context DISPATCH_UNUSED) 2292{ 2293#if DISPATCH_USE_MEMORYSTATUS_SOURCE 2294 unsigned long memorystatus; 2295 memorystatus = dispatch_source_get_data(_dispatch_memorystatus_source); 2296 if (memorystatus & DISPATCH_MEMORYSTATUS_PRESSURE_NORMAL) { 2297 _dispatch_continuation_cache_limit = DISPATCH_CONTINUATION_CACHE_LIMIT; 2298 return; 2299 } 2300 _dispatch_continuation_cache_limit = 2301 DISPATCH_CONTINUATION_CACHE_LIMIT_MEMORYSTATUS_PRESSURE_WARN; 2302#endif 2303 malloc_zone_pressure_relief(0,0); 2304} 2305 2306static void 2307_dispatch_memorystatus_init(void) 2308{ 2309 _dispatch_memorystatus_source = dispatch_source_create( 2310 DISPATCH_MEMORYSTATUS_SOURCE_TYPE, 0, 2311 DISPATCH_MEMORYSTATUS_SOURCE_MASK, 2312 _dispatch_get_root_queue(0, true)); 2313 dispatch_source_set_event_handler_f(_dispatch_memorystatus_source, 2314 _dispatch_memorystatus_handler); 2315 dispatch_resume(_dispatch_memorystatus_source); 2316} 2317#else 2318static inline void _dispatch_memorystatus_init(void) {} 2319#endif // DISPATCH_USE_MEMORYSTATUS_SOURCE || DISPATCH_USE_VM_PRESSURE_SOURCE 2320 2321#pragma mark - 2322#pragma mark dispatch_mach 2323 2324#if HAVE_MACH 2325 2326#if DISPATCH_DEBUG && DISPATCH_MACHPORT_DEBUG 2327#define _dispatch_debug_machport(name) \ 2328 dispatch_debug_machport((name), __func__) 2329#else 2330#define _dispatch_debug_machport(name) ((void)(name)) 2331#endif 2332 2333// Flags for all notifications that are registered/unregistered when a 2334// send-possible notification is requested/delivered 2335#define _DISPATCH_MACH_SP_FLAGS (DISPATCH_MACH_SEND_POSSIBLE| \ 2336 DISPATCH_MACH_SEND_DEAD|DISPATCH_MACH_SEND_DELETED) 2337#define _DISPATCH_MACH_RECV_FLAGS (DISPATCH_MACH_RECV_MESSAGE| \ 2338 DISPATCH_MACH_RECV_MESSAGE_DIRECT| \ 2339 DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE) 2340#define _DISPATCH_MACH_RECV_DIRECT_FLAGS ( \ 2341 DISPATCH_MACH_RECV_MESSAGE_DIRECT| \ 2342 DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE) 2343 2344#define _DISPATCH_IS_POWER_OF_TWO(v) (!(v & (v - 1)) && v) 2345#define _DISPATCH_HASH(x, y) (_DISPATCH_IS_POWER_OF_TWO(y) ? \ 2346 (MACH_PORT_INDEX(x) & ((y) - 1)) : (MACH_PORT_INDEX(x) % (y))) 2347 2348#define _DISPATCH_MACHPORT_HASH_SIZE 32 2349#define _DISPATCH_MACHPORT_HASH(x) \ 2350 _DISPATCH_HASH((x), _DISPATCH_MACHPORT_HASH_SIZE) 2351 2352#ifndef MACH_RCV_LARGE_IDENTITY 2353#define MACH_RCV_LARGE_IDENTITY 0x00000008 2354#endif 2355#define DISPATCH_MACH_RCV_TRAILER MACH_RCV_TRAILER_CTX 2356#define DISPATCH_MACH_RCV_OPTIONS ( \ 2357 MACH_RCV_MSG | MACH_RCV_LARGE | MACH_RCV_LARGE_IDENTITY | \ 2358 MACH_RCV_TRAILER_ELEMENTS(DISPATCH_MACH_RCV_TRAILER) | \ 2359 MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0)) 2360 2361#define DISPATCH_MACH_KEVENT_ARMED(dk) ((dk)->dk_kevent.ext[0]) 2362 2363static void _dispatch_kevent_machport_drain(struct kevent64_s *ke); 2364static void _dispatch_kevent_mach_msg_drain(struct kevent64_s *ke); 2365static void _dispatch_kevent_mach_msg_recv(mach_msg_header_t *hdr); 2366static void _dispatch_kevent_mach_msg_destroy(mach_msg_header_t *hdr); 2367static void _dispatch_source_merge_mach_msg(dispatch_source_t ds, 2368 dispatch_source_refs_t dr, dispatch_kevent_t dk, 2369 mach_msg_header_t *hdr, mach_msg_size_t siz); 2370static kern_return_t _dispatch_mach_notify_update(dispatch_kevent_t dk, 2371 uint32_t new_flags, uint32_t del_flags, uint32_t mask, 2372 mach_msg_id_t notify_msgid, mach_port_mscount_t notify_sync); 2373static void _dispatch_mach_notify_source_invoke(mach_msg_header_t *hdr); 2374static void _dispatch_mach_reply_kevent_unregister(dispatch_mach_t dm, 2375 dispatch_mach_reply_refs_t dmr, bool disconnected); 2376static void _dispatch_mach_msg_recv(dispatch_mach_t dm, mach_msg_header_t *hdr, 2377 mach_msg_size_t siz); 2378static void _dispatch_mach_merge_kevent(dispatch_mach_t dm, 2379 const struct kevent64_s *ke); 2380static void _dispatch_mach_kevent_unregister(dispatch_mach_t dm); 2381 2382static const size_t _dispatch_mach_recv_msg_size = 2383 DISPATCH_MACH_RECEIVE_MAX_INLINE_MESSAGE_SIZE; 2384static const size_t dispatch_mach_trailer_size = 2385 sizeof(dispatch_mach_trailer_t); 2386static const size_t _dispatch_mach_recv_msg_buf_size = mach_vm_round_page( 2387 _dispatch_mach_recv_msg_size + dispatch_mach_trailer_size); 2388static mach_port_t _dispatch_mach_portset, _dispatch_mach_recv_portset; 2389static mach_port_t _dispatch_mach_notify_port; 2390static struct kevent64_s _dispatch_mach_recv_kevent = { 2391 .filter = EVFILT_MACHPORT, 2392 .flags = EV_ADD|EV_ENABLE|EV_DISPATCH, 2393 .fflags = DISPATCH_MACH_RCV_OPTIONS, 2394}; 2395static dispatch_source_t _dispatch_mach_notify_source; 2396static const 2397struct dispatch_source_type_s _dispatch_source_type_mach_recv_direct = { 2398 .ke = { 2399 .filter = EVFILT_MACHPORT, 2400 .flags = EV_CLEAR, 2401 .fflags = DISPATCH_MACH_RECV_MESSAGE_DIRECT, 2402 }, 2403}; 2404 2405static void 2406_dispatch_mach_recv_msg_buf_init(void) 2407{ 2408 mach_vm_size_t vm_size = _dispatch_mach_recv_msg_buf_size; 2409 mach_vm_address_t vm_addr = vm_page_size; 2410 kern_return_t kr; 2411 2412 while (slowpath(kr = mach_vm_allocate(mach_task_self(), &vm_addr, vm_size, 2413 VM_FLAGS_ANYWHERE))) { 2414 if (kr != KERN_NO_SPACE) { 2415 (void)dispatch_assume_zero(kr); 2416 DISPATCH_CLIENT_CRASH("Could not allocate mach msg receive buffer"); 2417 } 2418 _dispatch_temporary_resource_shortage(); 2419 vm_addr = vm_page_size; 2420 } 2421 _dispatch_mach_recv_kevent.ext[0] = (uintptr_t)vm_addr; 2422 _dispatch_mach_recv_kevent.ext[1] = _dispatch_mach_recv_msg_buf_size; 2423} 2424 2425static inline void* 2426_dispatch_get_mach_recv_msg_buf(void) 2427{ 2428 return (void*)_dispatch_mach_recv_kevent.ext[0]; 2429} 2430 2431static void 2432_dispatch_mach_recv_portset_init(void *context DISPATCH_UNUSED) 2433{ 2434 kern_return_t kr; 2435 2436 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET, 2437 &_dispatch_mach_recv_portset); 2438 DISPATCH_VERIFY_MIG(kr); 2439 if (dispatch_assume_zero(kr)) { 2440 DISPATCH_CLIENT_CRASH( 2441 "mach_port_allocate() failed: cannot create port set"); 2442 } 2443 dispatch_assert(_dispatch_get_mach_recv_msg_buf()); 2444 dispatch_assert(dispatch_mach_trailer_size == 2445 REQUESTED_TRAILER_SIZE_NATIVE(MACH_RCV_TRAILER_ELEMENTS( 2446 DISPATCH_MACH_RCV_TRAILER))); 2447 _dispatch_mach_recv_kevent.ident = _dispatch_mach_recv_portset; 2448 _dispatch_kq_update(&_dispatch_mach_recv_kevent); 2449 2450 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE, 2451 &_dispatch_mach_notify_port); 2452 DISPATCH_VERIFY_MIG(kr); 2453 if (dispatch_assume_zero(kr)) { 2454 DISPATCH_CLIENT_CRASH( 2455 "mach_port_allocate() failed: cannot create receive right"); 2456 } 2457 _dispatch_mach_notify_source = dispatch_source_create( 2458 &_dispatch_source_type_mach_recv_direct, 2459 _dispatch_mach_notify_port, 0, &_dispatch_mgr_q); 2460 _dispatch_mach_notify_source->ds_refs->ds_handler_func = 2461 (void*)_dispatch_mach_notify_source_invoke; 2462 dispatch_assert(_dispatch_mach_notify_source); 2463 dispatch_resume(_dispatch_mach_notify_source); 2464} 2465 2466static mach_port_t 2467_dispatch_get_mach_recv_portset(void) 2468{ 2469 static dispatch_once_t pred; 2470 dispatch_once_f(&pred, NULL, _dispatch_mach_recv_portset_init); 2471 return _dispatch_mach_recv_portset; 2472} 2473 2474static void 2475_dispatch_mach_portset_init(void *context DISPATCH_UNUSED) 2476{ 2477 struct kevent64_s kev = { 2478 .filter = EVFILT_MACHPORT, 2479 .flags = EV_ADD, 2480 }; 2481 kern_return_t kr; 2482 2483 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET, 2484 &_dispatch_mach_portset); 2485 DISPATCH_VERIFY_MIG(kr); 2486 if (dispatch_assume_zero(kr)) { 2487 DISPATCH_CLIENT_CRASH( 2488 "mach_port_allocate() failed: cannot create port set"); 2489 } 2490 kev.ident = _dispatch_mach_portset; 2491 _dispatch_kq_update(&kev); 2492} 2493 2494static mach_port_t 2495_dispatch_get_mach_portset(void) 2496{ 2497 static dispatch_once_t pred; 2498 dispatch_once_f(&pred, NULL, _dispatch_mach_portset_init); 2499 return _dispatch_mach_portset; 2500} 2501 2502static kern_return_t 2503_dispatch_mach_portset_update(dispatch_kevent_t dk, mach_port_t mps) 2504{ 2505 mach_port_t mp = (mach_port_t)dk->dk_kevent.ident; 2506 kern_return_t kr; 2507 2508 _dispatch_debug_machport(mp); 2509 kr = mach_port_move_member(mach_task_self(), mp, mps); 2510 if (slowpath(kr)) { 2511 DISPATCH_VERIFY_MIG(kr); 2512 switch (kr) { 2513 case KERN_INVALID_RIGHT: 2514 if (mps) { 2515 _dispatch_bug_mach_client("_dispatch_kevent_machport_enable: " 2516 "mach_port_move_member() failed ", kr); 2517 break; 2518 } 2519 //fall through 2520 case KERN_INVALID_NAME: 2521#if DISPATCH_DEBUG 2522 _dispatch_log("Corruption: Mach receive right 0x%x destroyed " 2523 "prematurely", mp); 2524#endif 2525 break; 2526 default: 2527 (void)dispatch_assume_zero(kr); 2528 break; 2529 } 2530 } 2531 return mps ? kr : 0; 2532} 2533 2534static void 2535_dispatch_kevent_mach_recv_reenable(struct kevent64_s *ke DISPATCH_UNUSED) 2536{ 2537#if (TARGET_IPHONE_SIMULATOR && \ 2538 IPHONE_SIMULATOR_HOST_MIN_VERSION_REQUIRED < 1090) || \ 2539 (!TARGET_OS_IPHONE && __MAC_OS_X_VERSION_MIN_REQUIRED < 1090) 2540 // delete and re-add kevent to workaround <rdar://problem/13924256> 2541 if (ke->ext[1] != _dispatch_mach_recv_kevent.ext[1]) { 2542 struct kevent64_s kev = _dispatch_mach_recv_kevent; 2543 kev.flags = EV_DELETE; 2544 _dispatch_kq_update(&kev); 2545 } 2546#endif 2547 _dispatch_mgr_kevent_reenable(&_dispatch_mach_recv_kevent); 2548} 2549 2550static kern_return_t 2551_dispatch_kevent_machport_resume(dispatch_kevent_t dk, uint32_t new_flags, 2552 uint32_t del_flags) 2553{ 2554 kern_return_t kr = 0; 2555 dispatch_assert_zero(new_flags & del_flags); 2556 if ((new_flags & _DISPATCH_MACH_RECV_FLAGS) || 2557 (del_flags & _DISPATCH_MACH_RECV_FLAGS)) { 2558 mach_port_t mps; 2559 if (new_flags & _DISPATCH_MACH_RECV_DIRECT_FLAGS) { 2560 mps = _dispatch_get_mach_recv_portset(); 2561 } else if ((new_flags & DISPATCH_MACH_RECV_MESSAGE) || 2562 ((del_flags & _DISPATCH_MACH_RECV_DIRECT_FLAGS) && 2563 (dk->dk_kevent.fflags & DISPATCH_MACH_RECV_MESSAGE))) { 2564 mps = _dispatch_get_mach_portset(); 2565 } else { 2566 mps = MACH_PORT_NULL; 2567 } 2568 kr = _dispatch_mach_portset_update(dk, mps); 2569 } 2570 return kr; 2571} 2572 2573static kern_return_t 2574_dispatch_kevent_mach_notify_resume(dispatch_kevent_t dk, uint32_t new_flags, 2575 uint32_t del_flags) 2576{ 2577 kern_return_t kr = 0; 2578 dispatch_assert_zero(new_flags & del_flags); 2579 if ((new_flags & _DISPATCH_MACH_SP_FLAGS) || 2580 (del_flags & _DISPATCH_MACH_SP_FLAGS)) { 2581 // Requesting a (delayed) non-sync send-possible notification 2582 // registers for both immediate dead-name notification and delayed-arm 2583 // send-possible notification for the port. 2584 // The send-possible notification is armed when a mach_msg() with the 2585 // the MACH_SEND_NOTIFY to the port times out. 2586 // If send-possible is unavailable, fall back to immediate dead-name 2587 // registration rdar://problem/2527840&9008724 2588 kr = _dispatch_mach_notify_update(dk, new_flags, del_flags, 2589 _DISPATCH_MACH_SP_FLAGS, MACH_NOTIFY_SEND_POSSIBLE, 2590 MACH_NOTIFY_SEND_POSSIBLE == MACH_NOTIFY_DEAD_NAME ? 1 : 0); 2591 } 2592 return kr; 2593} 2594 2595static inline void 2596_dispatch_kevent_mach_portset(struct kevent64_s *ke) 2597{ 2598 if (ke->ident == _dispatch_mach_recv_portset) { 2599 return _dispatch_kevent_mach_msg_drain(ke); 2600 } else if (ke->ident == _dispatch_mach_portset) { 2601 return _dispatch_kevent_machport_drain(ke); 2602 } else { 2603 return _dispatch_kevent_error(ke); 2604 } 2605} 2606 2607DISPATCH_NOINLINE 2608static void 2609_dispatch_kevent_machport_drain(struct kevent64_s *ke) 2610{ 2611 mach_port_t name = (mach_port_name_t)ke->data; 2612 dispatch_kevent_t dk; 2613 struct kevent64_s kev; 2614 2615 _dispatch_debug_machport(name); 2616 dk = _dispatch_kevent_find(name, EVFILT_MACHPORT); 2617 if (!dispatch_assume(dk)) { 2618 return; 2619 } 2620 _dispatch_mach_portset_update(dk, MACH_PORT_NULL); // emulate EV_DISPATCH 2621 2622 EV_SET64(&kev, name, EVFILT_MACHPORT, EV_ADD|EV_ENABLE|EV_DISPATCH, 2623 DISPATCH_MACH_RECV_MESSAGE, 0, (uintptr_t)dk, 0, 0); 2624 _dispatch_kevent_debug(&kev, __func__); 2625 _dispatch_kevent_merge(&kev); 2626} 2627 2628DISPATCH_NOINLINE 2629static void 2630_dispatch_kevent_mach_msg_drain(struct kevent64_s *ke) 2631{ 2632 mach_msg_header_t *hdr = (mach_msg_header_t*)ke->ext[0]; 2633 mach_msg_size_t siz, msgsiz; 2634 mach_msg_return_t kr = (mach_msg_return_t)ke->fflags; 2635 2636 _dispatch_kevent_mach_recv_reenable(ke); 2637 if (!dispatch_assume(hdr)) { 2638 DISPATCH_CRASH("EVFILT_MACHPORT with no message"); 2639 } 2640 if (fastpath(!kr)) { 2641 return _dispatch_kevent_mach_msg_recv(hdr); 2642 } else if (kr != MACH_RCV_TOO_LARGE) { 2643 goto out; 2644 } 2645 if (!dispatch_assume(ke->ext[1] <= UINT_MAX - 2646 dispatch_mach_trailer_size)) { 2647 DISPATCH_CRASH("EVFILT_MACHPORT with overlarge message"); 2648 } 2649 siz = (mach_msg_size_t)ke->ext[1] + dispatch_mach_trailer_size; 2650 hdr = malloc(siz); 2651 if (ke->data) { 2652 if (!dispatch_assume(hdr)) { 2653 // Kernel will discard message too large to fit 2654 hdr = _dispatch_get_mach_recv_msg_buf(); 2655 siz = _dispatch_mach_recv_msg_buf_size; 2656 } 2657 mach_port_t name = (mach_port_name_t)ke->data; 2658 const mach_msg_option_t options = ((DISPATCH_MACH_RCV_OPTIONS | 2659 MACH_RCV_TIMEOUT) & ~MACH_RCV_LARGE); 2660 kr = mach_msg(hdr, options, 0, siz, name, MACH_MSG_TIMEOUT_NONE, 2661 MACH_PORT_NULL); 2662 if (fastpath(!kr)) { 2663 return _dispatch_kevent_mach_msg_recv(hdr); 2664 } else if (kr == MACH_RCV_TOO_LARGE) { 2665 _dispatch_log("BUG in libdispatch client: " 2666 "_dispatch_kevent_mach_msg_drain: dropped message too " 2667 "large to fit in memory: id = 0x%x, size = %lld", 2668 hdr->msgh_id, ke->ext[1]); 2669 kr = MACH_MSG_SUCCESS; 2670 } 2671 } else { 2672 // We don't know which port in the portset contains the large message, 2673 // so need to receive all messages pending on the portset to ensure the 2674 // large message is drained. <rdar://problem/13950432> 2675 bool received = false; 2676 for (;;) { 2677 if (!dispatch_assume(hdr)) { 2678 DISPATCH_CLIENT_CRASH("Message too large to fit in memory"); 2679 } 2680 const mach_msg_option_t options = (DISPATCH_MACH_RCV_OPTIONS | 2681 MACH_RCV_TIMEOUT); 2682 kr = mach_msg(hdr, options, 0, siz, _dispatch_mach_recv_portset, 2683 MACH_MSG_TIMEOUT_NONE, MACH_PORT_NULL); 2684 if ((!kr || kr == MACH_RCV_TOO_LARGE) && !dispatch_assume( 2685 hdr->msgh_size <= UINT_MAX - dispatch_mach_trailer_size)) { 2686 DISPATCH_CRASH("Overlarge message"); 2687 } 2688 if (fastpath(!kr)) { 2689 msgsiz = hdr->msgh_size + dispatch_mach_trailer_size; 2690 if (msgsiz < siz) { 2691 void *shrink = realloc(hdr, msgsiz); 2692 if (shrink) hdr = shrink; 2693 } 2694 _dispatch_kevent_mach_msg_recv(hdr); 2695 hdr = NULL; 2696 received = true; 2697 } else if (kr == MACH_RCV_TOO_LARGE) { 2698 siz = hdr->msgh_size + dispatch_mach_trailer_size; 2699 } else { 2700 if (kr == MACH_RCV_TIMED_OUT && received) { 2701 kr = MACH_MSG_SUCCESS; 2702 } 2703 break; 2704 } 2705 hdr = reallocf(hdr, siz); 2706 } 2707 } 2708 if (hdr != _dispatch_get_mach_recv_msg_buf()) { 2709 free(hdr); 2710 } 2711out: 2712 if (slowpath(kr)) { 2713 _dispatch_bug_mach_client("_dispatch_kevent_mach_msg_drain: " 2714 "message reception failed", kr); 2715 } 2716} 2717 2718static void 2719_dispatch_kevent_mach_msg_recv(mach_msg_header_t *hdr) 2720{ 2721 dispatch_source_refs_t dri; 2722 dispatch_kevent_t dk; 2723 mach_port_t name = hdr->msgh_local_port; 2724 mach_msg_size_t siz = hdr->msgh_size + dispatch_mach_trailer_size; 2725 2726 if (!dispatch_assume(hdr->msgh_size <= UINT_MAX - 2727 dispatch_mach_trailer_size)) { 2728 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: " 2729 "received overlarge message"); 2730 return _dispatch_kevent_mach_msg_destroy(hdr); 2731 } 2732 if (!dispatch_assume(name)) { 2733 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: " 2734 "received message with MACH_PORT_NULL port"); 2735 return _dispatch_kevent_mach_msg_destroy(hdr); 2736 } 2737 _dispatch_debug_machport(name); 2738 dk = _dispatch_kevent_find(name, EVFILT_MACHPORT); 2739 if (!dispatch_assume(dk)) { 2740 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: " 2741 "received message with unknown kevent"); 2742 return _dispatch_kevent_mach_msg_destroy(hdr); 2743 } 2744 _dispatch_kevent_debug(&dk->dk_kevent, __func__); 2745 TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) { 2746 dispatch_source_t dsi = _dispatch_source_from_refs(dri); 2747 if (dsi->ds_pending_data_mask & _DISPATCH_MACH_RECV_DIRECT_FLAGS) { 2748 return _dispatch_source_merge_mach_msg(dsi, dri, dk, hdr, siz); 2749 } 2750 } 2751 _dispatch_bug_client("_dispatch_kevent_mach_msg_recv: " 2752 "received message with no listeners"); 2753 return _dispatch_kevent_mach_msg_destroy(hdr); 2754} 2755 2756static void 2757_dispatch_kevent_mach_msg_destroy(mach_msg_header_t *hdr) 2758{ 2759 if (hdr) { 2760 mach_msg_destroy(hdr); 2761 if (hdr != _dispatch_get_mach_recv_msg_buf()) { 2762 free(hdr); 2763 } 2764 } 2765} 2766 2767static void 2768_dispatch_source_merge_mach_msg(dispatch_source_t ds, dispatch_source_refs_t dr, 2769 dispatch_kevent_t dk, mach_msg_header_t *hdr, mach_msg_size_t siz) 2770{ 2771 if (ds == _dispatch_mach_notify_source) { 2772 _dispatch_mach_notify_source_invoke(hdr); 2773 return _dispatch_kevent_mach_msg_destroy(hdr); 2774 } 2775 if (dk->dk_kevent.fflags & DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE) { 2776 _dispatch_mach_reply_kevent_unregister((dispatch_mach_t)ds, 2777 (dispatch_mach_reply_refs_t)dr, false); 2778 } 2779 return _dispatch_mach_msg_recv((dispatch_mach_t)ds, hdr, siz); 2780} 2781 2782DISPATCH_ALWAYS_INLINE 2783static inline void 2784_dispatch_mach_notify_merge(mach_port_t name, uint32_t flag, bool final) 2785{ 2786 dispatch_source_refs_t dri, dr_next; 2787 dispatch_kevent_t dk; 2788 struct kevent64_s kev; 2789 bool unreg; 2790 2791 dk = _dispatch_kevent_find(name, DISPATCH_EVFILT_MACH_NOTIFICATION); 2792 if (!dk) { 2793 return; 2794 } 2795 2796 // Update notification registration state. 2797 dk->dk_kevent.data &= ~_DISPATCH_MACH_SP_FLAGS; 2798 EV_SET64(&kev, name, DISPATCH_EVFILT_MACH_NOTIFICATION, EV_ADD|EV_ENABLE, 2799 flag, 0, (uintptr_t)dk, 0, 0); 2800 if (final) { 2801 // This can never happen again 2802 unreg = true; 2803 } else { 2804 // Re-register for notification before delivery 2805 unreg = _dispatch_kevent_resume(dk, flag, 0); 2806 } 2807 DISPATCH_MACH_KEVENT_ARMED(dk) = 0; 2808 TAILQ_FOREACH_SAFE(dri, &dk->dk_sources, dr_list, dr_next) { 2809 dispatch_source_t dsi = _dispatch_source_from_refs(dri); 2810 if (dx_type(dsi) == DISPATCH_MACH_CHANNEL_TYPE) { 2811 dispatch_mach_t dm = (dispatch_mach_t)dsi; 2812 _dispatch_mach_merge_kevent(dm, &kev); 2813 if (unreg && dm->dm_dkev) { 2814 _dispatch_mach_kevent_unregister(dm); 2815 } 2816 } else { 2817 _dispatch_source_merge_kevent(dsi, &kev); 2818 if (unreg) { 2819 _dispatch_source_kevent_unregister(dsi); 2820 } 2821 } 2822 if (!dr_next || DISPATCH_MACH_KEVENT_ARMED(dk)) { 2823 // current merge is last in list (dk might have been freed) 2824 // or it re-armed the notification 2825 return; 2826 } 2827 } 2828} 2829 2830static kern_return_t 2831_dispatch_mach_notify_update(dispatch_kevent_t dk, uint32_t new_flags, 2832 uint32_t del_flags, uint32_t mask, mach_msg_id_t notify_msgid, 2833 mach_port_mscount_t notify_sync) 2834{ 2835 mach_port_t previous, port = (mach_port_t)dk->dk_kevent.ident; 2836 typeof(dk->dk_kevent.data) prev = dk->dk_kevent.data; 2837 kern_return_t kr, krr = 0; 2838 2839 // Update notification registration state. 2840 dk->dk_kevent.data |= (new_flags | dk->dk_kevent.fflags) & mask; 2841 dk->dk_kevent.data &= ~(del_flags & mask); 2842 2843 _dispatch_debug_machport(port); 2844 if ((dk->dk_kevent.data & mask) && !(prev & mask)) { 2845 // initialize _dispatch_mach_notify_port: 2846 (void)_dispatch_get_mach_recv_portset(); 2847 _dispatch_debug("machport[0x%08x]: registering for send-possible " 2848 "notification", port); 2849 previous = MACH_PORT_NULL; 2850 krr = mach_port_request_notification(mach_task_self(), port, 2851 notify_msgid, notify_sync, _dispatch_mach_notify_port, 2852 MACH_MSG_TYPE_MAKE_SEND_ONCE, &previous); 2853 DISPATCH_VERIFY_MIG(krr); 2854 2855 switch(krr) { 2856 case KERN_INVALID_NAME: 2857 case KERN_INVALID_RIGHT: 2858 // Supress errors & clear registration state 2859 dk->dk_kevent.data &= ~mask; 2860 break; 2861 default: 2862 // Else, we dont expect any errors from mach. Log any errors 2863 if (dispatch_assume_zero(krr)) { 2864 // log the error & clear registration state 2865 dk->dk_kevent.data &= ~mask; 2866 } else if (dispatch_assume_zero(previous)) { 2867 // Another subsystem has beat libdispatch to requesting the 2868 // specified Mach notification on this port. We should 2869 // technically cache the previous port and message it when the 2870 // kernel messages our port. Or we can just say screw those 2871 // subsystems and deallocate the previous port. 2872 // They should adopt libdispatch :-P 2873 kr = mach_port_deallocate(mach_task_self(), previous); 2874 DISPATCH_VERIFY_MIG(kr); 2875 (void)dispatch_assume_zero(kr); 2876 previous = MACH_PORT_NULL; 2877 } 2878 } 2879 } else if (!(dk->dk_kevent.data & mask) && (prev & mask)) { 2880 _dispatch_debug("machport[0x%08x]: unregistering for send-possible " 2881 "notification", port); 2882 previous = MACH_PORT_NULL; 2883 kr = mach_port_request_notification(mach_task_self(), port, 2884 notify_msgid, notify_sync, MACH_PORT_NULL, 2885 MACH_MSG_TYPE_MOVE_SEND_ONCE, &previous); 2886 DISPATCH_VERIFY_MIG(kr); 2887 2888 switch (kr) { 2889 case KERN_INVALID_NAME: 2890 case KERN_INVALID_RIGHT: 2891 case KERN_INVALID_ARGUMENT: 2892 break; 2893 default: 2894 if (dispatch_assume_zero(kr)) { 2895 // log the error 2896 } 2897 } 2898 } else { 2899 return 0; 2900 } 2901 if (slowpath(previous)) { 2902 // the kernel has not consumed the send-once right yet 2903 (void)dispatch_assume_zero( 2904 _dispatch_send_consume_send_once_right(previous)); 2905 } 2906 return krr; 2907} 2908 2909static void 2910_dispatch_mach_host_notify_update(void *context DISPATCH_UNUSED) 2911{ 2912 (void)_dispatch_get_mach_recv_portset(); 2913 _dispatch_debug("registering for calendar-change notification"); 2914 kern_return_t kr = host_request_notification(mach_host_self(), 2915 HOST_NOTIFY_CALENDAR_CHANGE, _dispatch_mach_notify_port); 2916 DISPATCH_VERIFY_MIG(kr); 2917 (void)dispatch_assume_zero(kr); 2918} 2919 2920static void 2921_dispatch_mach_host_calendar_change_register(void) 2922{ 2923 static dispatch_once_t pred; 2924 dispatch_once_f(&pred, NULL, _dispatch_mach_host_notify_update); 2925} 2926 2927static void 2928_dispatch_mach_notify_source_invoke(mach_msg_header_t *hdr) 2929{ 2930 mig_reply_error_t reply; 2931 dispatch_assert(sizeof(mig_reply_error_t) == sizeof(union 2932 __ReplyUnion___dispatch_libdispatch_internal_protocol_subsystem)); 2933 dispatch_assert(sizeof(mig_reply_error_t) < _dispatch_mach_recv_msg_size); 2934 boolean_t success = libdispatch_internal_protocol_server(hdr, &reply.Head); 2935 if (!success && reply.RetCode == MIG_BAD_ID && hdr->msgh_id == 950) { 2936 // host_notify_reply.defs: host_calendar_changed 2937 _dispatch_debug("calendar-change notification"); 2938 _dispatch_timers_calendar_change(); 2939 _dispatch_mach_host_notify_update(NULL); 2940 success = TRUE; 2941 reply.RetCode = KERN_SUCCESS; 2942 } 2943 if (dispatch_assume(success) && reply.RetCode != MIG_NO_REPLY) { 2944 (void)dispatch_assume_zero(reply.RetCode); 2945 } 2946} 2947 2948kern_return_t 2949_dispatch_mach_notify_port_deleted(mach_port_t notify DISPATCH_UNUSED, 2950 mach_port_name_t name) 2951{ 2952#if DISPATCH_DEBUG 2953 _dispatch_log("Corruption: Mach send/send-once/dead-name right 0x%x " 2954 "deleted prematurely", name); 2955#endif 2956 2957 _dispatch_debug_machport(name); 2958 _dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_DELETED, true); 2959 2960 return KERN_SUCCESS; 2961} 2962 2963kern_return_t 2964_dispatch_mach_notify_dead_name(mach_port_t notify DISPATCH_UNUSED, 2965 mach_port_name_t name) 2966{ 2967 kern_return_t kr; 2968 2969 _dispatch_debug("machport[0x%08x]: dead-name notification", name); 2970 _dispatch_debug_machport(name); 2971 _dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_DEAD, true); 2972 2973 // the act of receiving a dead name notification allocates a dead-name 2974 // right that must be deallocated 2975 kr = mach_port_deallocate(mach_task_self(), name); 2976 DISPATCH_VERIFY_MIG(kr); 2977 //(void)dispatch_assume_zero(kr); 2978 2979 return KERN_SUCCESS; 2980} 2981 2982kern_return_t 2983_dispatch_mach_notify_send_possible(mach_port_t notify DISPATCH_UNUSED, 2984 mach_port_name_t name) 2985{ 2986 _dispatch_debug("machport[0x%08x]: send-possible notification", name); 2987 _dispatch_debug_machport(name); 2988 _dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_POSSIBLE, false); 2989 2990 return KERN_SUCCESS; 2991} 2992 2993#pragma mark - 2994#pragma mark dispatch_mach_t 2995 2996#define DISPATCH_MACH_NEVER_CONNECTED (UINT32_MAX/2) 2997#define DISPATCH_MACH_PSEUDO_RECEIVED 0x1 2998#define DISPATCH_MACH_REGISTER_FOR_REPLY 0x2 2999#define DISPATCH_MACH_OPTIONS_MASK 0xffff 3000 3001static mach_port_t _dispatch_mach_msg_get_remote_port(dispatch_object_t dou); 3002static void _dispatch_mach_msg_disconnected(dispatch_mach_t dm, 3003 mach_port_t local_port, mach_port_t remote_port); 3004static bool _dispatch_mach_reconnect_invoke(dispatch_mach_t dm, 3005 dispatch_object_t dou); 3006static inline mach_msg_header_t* _dispatch_mach_msg_get_msg( 3007 dispatch_mach_msg_t dmsg); 3008 3009static dispatch_mach_t 3010_dispatch_mach_create(const char *label, dispatch_queue_t q, void *context, 3011 dispatch_mach_handler_function_t handler, bool handler_is_block) 3012{ 3013 dispatch_mach_t dm; 3014 dispatch_mach_refs_t dr; 3015 3016 dm = _dispatch_alloc(DISPATCH_VTABLE(mach), 3017 sizeof(struct dispatch_mach_s)); 3018 _dispatch_queue_init((dispatch_queue_t)dm); 3019 dm->dq_label = label; 3020 3021 dm->do_ref_cnt++; // the reference _dispatch_mach_cancel_invoke holds 3022 dm->do_ref_cnt++; // since channel is created suspended 3023 dm->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_INTERVAL; 3024 dm->do_targetq = &_dispatch_mgr_q; 3025 3026 dr = _dispatch_calloc(1ul, sizeof(struct dispatch_mach_refs_s)); 3027 dr->dr_source_wref = _dispatch_ptr2wref(dm); 3028 dr->dm_handler_func = handler; 3029 dr->dm_handler_ctxt = context; 3030 dm->ds_refs = dr; 3031 dm->ds_handler_is_block = handler_is_block; 3032 3033 dm->dm_refs = _dispatch_calloc(1ul, 3034 sizeof(struct dispatch_mach_send_refs_s)); 3035 dm->dm_refs->dr_source_wref = _dispatch_ptr2wref(dm); 3036 dm->dm_refs->dm_disconnect_cnt = DISPATCH_MACH_NEVER_CONNECTED; 3037 TAILQ_INIT(&dm->dm_refs->dm_replies); 3038 3039 // First item on the channel sets the user-specified target queue 3040 dispatch_set_target_queue(dm, q); 3041 _dispatch_object_debug(dm, "%s", __func__); 3042 return dm; 3043} 3044 3045dispatch_mach_t 3046dispatch_mach_create(const char *label, dispatch_queue_t q, 3047 dispatch_mach_handler_t handler) 3048{ 3049 dispatch_block_t bb = _dispatch_Block_copy((void*)handler); 3050 return _dispatch_mach_create(label, q, bb, 3051 (dispatch_mach_handler_function_t)_dispatch_Block_invoke(bb), true); 3052} 3053 3054dispatch_mach_t 3055dispatch_mach_create_f(const char *label, dispatch_queue_t q, void *context, 3056 dispatch_mach_handler_function_t handler) 3057{ 3058 return _dispatch_mach_create(label, q, context, handler, false); 3059} 3060 3061void 3062_dispatch_mach_dispose(dispatch_mach_t dm) 3063{ 3064 _dispatch_object_debug(dm, "%s", __func__); 3065 dispatch_mach_refs_t dr = dm->ds_refs; 3066 if (dm->ds_handler_is_block && dr->dm_handler_ctxt) { 3067 Block_release(dr->dm_handler_ctxt); 3068 } 3069 free(dr); 3070 free(dm->dm_refs); 3071 _dispatch_queue_destroy(dm); 3072} 3073 3074void 3075dispatch_mach_connect(dispatch_mach_t dm, mach_port_t receive, 3076 mach_port_t send, dispatch_mach_msg_t checkin) 3077{ 3078 dispatch_mach_send_refs_t dr = dm->dm_refs; 3079 dispatch_kevent_t dk; 3080 3081 if (MACH_PORT_VALID(receive)) { 3082 dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s)); 3083 dk->dk_kevent = _dispatch_source_type_mach_recv_direct.ke; 3084 dk->dk_kevent.ident = receive; 3085 dk->dk_kevent.flags |= EV_ADD|EV_ENABLE; 3086 dk->dk_kevent.udata = (uintptr_t)dk; 3087 TAILQ_INIT(&dk->dk_sources); 3088 dm->ds_dkev = dk; 3089 dm->ds_pending_data_mask = dk->dk_kevent.fflags; 3090 _dispatch_retain(dm); // the reference the manager queue holds 3091 } 3092 dr->dm_send = send; 3093 if (MACH_PORT_VALID(send)) { 3094 if (checkin) { 3095 dispatch_retain(checkin); 3096 dr->dm_checkin_port = _dispatch_mach_msg_get_remote_port(checkin); 3097 } 3098 dr->dm_checkin = checkin; 3099 } 3100 // monitor message reply ports 3101 dm->ds_pending_data_mask |= DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE; 3102 if (slowpath(!dispatch_atomic_cmpxchg2o(dr, dm_disconnect_cnt, 3103 DISPATCH_MACH_NEVER_CONNECTED, 0, release))) { 3104 DISPATCH_CLIENT_CRASH("Channel already connected"); 3105 } 3106 _dispatch_object_debug(dm, "%s", __func__); 3107 return dispatch_resume(dm); 3108} 3109 3110DISPATCH_NOINLINE 3111static void 3112_dispatch_mach_reply_kevent_unregister(dispatch_mach_t dm, 3113 dispatch_mach_reply_refs_t dmr, bool disconnected) 3114{ 3115 dispatch_kevent_t dk = dmr->dm_dkev; 3116 mach_port_t local_port = (mach_port_t)dk->dk_kevent.ident; 3117 TAILQ_REMOVE(&dk->dk_sources, (dispatch_source_refs_t)dmr, dr_list); 3118 _dispatch_kevent_unregister(dk, DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE); 3119 TAILQ_REMOVE(&dm->dm_refs->dm_replies, dmr, dm_list); 3120 free(dmr); 3121 if (disconnected) { 3122 _dispatch_mach_msg_disconnected(dm, local_port, MACH_PORT_NULL); 3123 } 3124} 3125 3126DISPATCH_NOINLINE 3127static void 3128_dispatch_mach_reply_kevent_register(dispatch_mach_t dm, mach_port_t reply, 3129 void *ctxt) 3130{ 3131 dispatch_kevent_t dk; 3132 dispatch_mach_reply_refs_t dmr; 3133 3134 dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s)); 3135 dk->dk_kevent = _dispatch_source_type_mach_recv_direct.ke; 3136 dk->dk_kevent.ident = reply; 3137 dk->dk_kevent.flags |= EV_ADD|EV_ENABLE; 3138 dk->dk_kevent.fflags = DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE; 3139 dk->dk_kevent.udata = (uintptr_t)dk; 3140 // make reply context visible to leaks rdar://11777199 3141 dk->dk_kevent.ext[1] = (uintptr_t)ctxt; 3142 TAILQ_INIT(&dk->dk_sources); 3143 3144 dmr = _dispatch_calloc(1ul, sizeof(struct dispatch_mach_reply_refs_s)); 3145 dmr->dr_source_wref = _dispatch_ptr2wref(dm); 3146 dmr->dm_dkev = dk; 3147 3148 _dispatch_debug("machport[0x%08x]: registering for reply, ctxt %p", reply, 3149 ctxt); 3150 uint32_t flags; 3151 bool do_resume = _dispatch_kevent_register(&dmr->dm_dkev, &flags); 3152 TAILQ_INSERT_TAIL(&dmr->dm_dkev->dk_sources, (dispatch_source_refs_t)dmr, 3153 dr_list); 3154 TAILQ_INSERT_TAIL(&dm->dm_refs->dm_replies, dmr, dm_list); 3155 if (do_resume && _dispatch_kevent_resume(dmr->dm_dkev, flags, 0)) { 3156 _dispatch_mach_reply_kevent_unregister(dm, dmr, true); 3157 } 3158} 3159 3160DISPATCH_NOINLINE 3161static void 3162_dispatch_mach_kevent_unregister(dispatch_mach_t dm) 3163{ 3164 dispatch_kevent_t dk = dm->dm_dkev; 3165 dm->dm_dkev = NULL; 3166 TAILQ_REMOVE(&dk->dk_sources, (dispatch_source_refs_t)dm->dm_refs, 3167 dr_list); 3168 dm->ds_pending_data_mask &= ~(unsigned long) 3169 (DISPATCH_MACH_SEND_POSSIBLE|DISPATCH_MACH_SEND_DEAD); 3170 _dispatch_kevent_unregister(dk, 3171 DISPATCH_MACH_SEND_POSSIBLE|DISPATCH_MACH_SEND_DEAD); 3172} 3173 3174DISPATCH_NOINLINE 3175static void 3176_dispatch_mach_kevent_register(dispatch_mach_t dm, mach_port_t send) 3177{ 3178 dispatch_kevent_t dk; 3179 3180 dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s)); 3181 dk->dk_kevent = _dispatch_source_type_mach_send.ke; 3182 dk->dk_kevent.ident = send; 3183 dk->dk_kevent.flags |= EV_ADD|EV_ENABLE; 3184 dk->dk_kevent.fflags = DISPATCH_MACH_SEND_POSSIBLE|DISPATCH_MACH_SEND_DEAD; 3185 dk->dk_kevent.udata = (uintptr_t)dk; 3186 TAILQ_INIT(&dk->dk_sources); 3187 3188 dm->ds_pending_data_mask |= dk->dk_kevent.fflags; 3189 3190 uint32_t flags; 3191 bool do_resume = _dispatch_kevent_register(&dk, &flags); 3192 TAILQ_INSERT_TAIL(&dk->dk_sources, 3193 (dispatch_source_refs_t)dm->dm_refs, dr_list); 3194 dm->dm_dkev = dk; 3195 if (do_resume && _dispatch_kevent_resume(dm->dm_dkev, flags, 0)) { 3196 _dispatch_mach_kevent_unregister(dm); 3197 } 3198} 3199 3200static inline void 3201_dispatch_mach_push(dispatch_object_t dm, dispatch_object_t dou) 3202{ 3203 return _dispatch_queue_push(dm._dq, dou); 3204} 3205 3206static inline void 3207_dispatch_mach_msg_set_options(dispatch_object_t dou, mach_msg_option_t options) 3208{ 3209 dou._do->do_suspend_cnt = (unsigned int)options; 3210} 3211 3212static inline mach_msg_option_t 3213_dispatch_mach_msg_get_options(dispatch_object_t dou) 3214{ 3215 mach_msg_option_t options = (mach_msg_option_t)dou._do->do_suspend_cnt; 3216 return options; 3217} 3218 3219static inline void 3220_dispatch_mach_msg_set_reason(dispatch_object_t dou, mach_error_t err, 3221 unsigned long reason) 3222{ 3223 dispatch_assert_zero(reason & ~(unsigned long)code_emask); 3224 dou._do->do_suspend_cnt = (unsigned int)((err || !reason) ? err : 3225 err_local|err_sub(0x3e0)|(mach_error_t)reason); 3226} 3227 3228static inline unsigned long 3229_dispatch_mach_msg_get_reason(dispatch_object_t dou, mach_error_t *err_ptr) 3230{ 3231 mach_error_t err = (mach_error_t)dou._do->do_suspend_cnt; 3232 dou._do->do_suspend_cnt = 0; 3233 if ((err & system_emask) == err_local && err_get_sub(err) == 0x3e0) { 3234 *err_ptr = 0; 3235 return err_get_code(err); 3236 } 3237 *err_ptr = err; 3238 return err ? DISPATCH_MACH_MESSAGE_SEND_FAILED : DISPATCH_MACH_MESSAGE_SENT; 3239} 3240 3241static void 3242_dispatch_mach_msg_recv(dispatch_mach_t dm, mach_msg_header_t *hdr, 3243 mach_msg_size_t siz) 3244{ 3245 _dispatch_debug_machport(hdr->msgh_remote_port); 3246 _dispatch_debug("machport[0x%08x]: received msg id 0x%x, reply on 0x%08x", 3247 hdr->msgh_local_port, hdr->msgh_id, hdr->msgh_remote_port); 3248 if (slowpath(dm->ds_atomic_flags & DSF_CANCELED)) { 3249 return _dispatch_kevent_mach_msg_destroy(hdr); 3250 } 3251 dispatch_mach_msg_t dmsg; 3252 dispatch_mach_msg_destructor_t destructor; 3253 destructor = (hdr == _dispatch_get_mach_recv_msg_buf()) ? 3254 DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT : 3255 DISPATCH_MACH_MSG_DESTRUCTOR_FREE; 3256 dmsg = dispatch_mach_msg_create(hdr, siz, destructor, NULL); 3257 _dispatch_mach_msg_set_reason(dmsg, 0, DISPATCH_MACH_MESSAGE_RECEIVED); 3258 return _dispatch_mach_push(dm, dmsg); 3259} 3260 3261static inline mach_port_t 3262_dispatch_mach_msg_get_remote_port(dispatch_object_t dou) 3263{ 3264 mach_msg_header_t *hdr = _dispatch_mach_msg_get_msg(dou._dmsg); 3265 mach_port_t remote = hdr->msgh_remote_port; 3266 return remote; 3267} 3268 3269static inline mach_port_t 3270_dispatch_mach_msg_get_reply_port(dispatch_mach_t dm, dispatch_object_t dou) 3271{ 3272 mach_msg_header_t *hdr = _dispatch_mach_msg_get_msg(dou._dmsg); 3273 mach_port_t reply = MACH_PORT_NULL; 3274 mach_msg_option_t msg_opts = _dispatch_mach_msg_get_options(dou); 3275 if (msg_opts & DISPATCH_MACH_PSEUDO_RECEIVED) { 3276 reply = hdr->msgh_reserved; 3277 hdr->msgh_reserved = 0; 3278 } else if (MACH_MSGH_BITS_LOCAL(hdr->msgh_bits) == 3279 MACH_MSG_TYPE_MAKE_SEND_ONCE && 3280 MACH_PORT_VALID(hdr->msgh_local_port) && (!dm->ds_dkev || 3281 dm->ds_dkev->dk_kevent.ident != hdr->msgh_local_port)) { 3282 reply = hdr->msgh_local_port; 3283 } 3284 return reply; 3285} 3286 3287static inline void 3288_dispatch_mach_msg_disconnected(dispatch_mach_t dm, mach_port_t local_port, 3289 mach_port_t remote_port) 3290{ 3291 mach_msg_header_t *hdr; 3292 dispatch_mach_msg_t dmsg; 3293 dmsg = dispatch_mach_msg_create(NULL, sizeof(mach_msg_header_t), 3294 DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT, &hdr); 3295 if (local_port) hdr->msgh_local_port = local_port; 3296 if (remote_port) hdr->msgh_remote_port = remote_port; 3297 _dispatch_mach_msg_set_reason(dmsg, 0, DISPATCH_MACH_DISCONNECTED); 3298 return _dispatch_mach_push(dm, dmsg); 3299} 3300 3301DISPATCH_NOINLINE 3302static void 3303_dispatch_mach_msg_not_sent(dispatch_mach_t dm, dispatch_object_t dou) 3304{ 3305 mach_port_t reply = _dispatch_mach_msg_get_reply_port(dm, dou); 3306 _dispatch_mach_msg_set_reason(dou, 0, DISPATCH_MACH_MESSAGE_NOT_SENT); 3307 _dispatch_mach_push(dm, dou); 3308 if (reply) { 3309 _dispatch_mach_msg_disconnected(dm, reply, MACH_PORT_NULL); 3310 } 3311} 3312 3313DISPATCH_NOINLINE 3314static dispatch_object_t 3315_dispatch_mach_msg_send(dispatch_mach_t dm, dispatch_object_t dou) 3316{ 3317 dispatch_mach_send_refs_t dr = dm->dm_refs; 3318 dispatch_mach_msg_t dmsg = dou._dmsg; 3319 dr->dm_needs_mgr = 0; 3320 if (slowpath(dr->dm_checkin) && dmsg != dr->dm_checkin) { 3321 // send initial checkin message 3322 if (dm->dm_dkev && slowpath(_dispatch_queue_get_current() != 3323 &_dispatch_mgr_q)) { 3324 // send kevent must be uninstalled on the manager queue 3325 dr->dm_needs_mgr = 1; 3326 goto out; 3327 } 3328 dr->dm_checkin = _dispatch_mach_msg_send(dm, dr->dm_checkin)._dmsg; 3329 if (slowpath(dr->dm_checkin)) { 3330 goto out; 3331 } 3332 } 3333 mach_msg_header_t *msg = _dispatch_mach_msg_get_msg(dmsg); 3334 mach_msg_return_t kr = 0; 3335 mach_port_t reply = _dispatch_mach_msg_get_reply_port(dm, dmsg); 3336 mach_msg_option_t opts = 0, msg_opts = _dispatch_mach_msg_get_options(dmsg); 3337 if (!slowpath(msg_opts & DISPATCH_MACH_REGISTER_FOR_REPLY)) { 3338 opts = MACH_SEND_MSG | (msg_opts & DISPATCH_MACH_OPTIONS_MASK); 3339 if (MACH_MSGH_BITS_REMOTE(msg->msgh_bits) != 3340 MACH_MSG_TYPE_MOVE_SEND_ONCE) { 3341 if (dmsg != dr->dm_checkin) { 3342 msg->msgh_remote_port = dr->dm_send; 3343 } 3344 if (_dispatch_queue_get_current() == &_dispatch_mgr_q) { 3345 if (slowpath(!dm->dm_dkev)) { 3346 _dispatch_mach_kevent_register(dm, msg->msgh_remote_port); 3347 } 3348 if (fastpath(dm->dm_dkev)) { 3349 if (DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev)) { 3350 goto out; 3351 } 3352 opts |= MACH_SEND_NOTIFY; 3353 } 3354 } 3355 opts |= MACH_SEND_TIMEOUT; 3356 } 3357 _dispatch_debug_machport(msg->msgh_remote_port); 3358 if (reply) _dispatch_debug_machport(reply); 3359 kr = mach_msg(msg, opts, msg->msgh_size, 0, MACH_PORT_NULL, 0, 3360 MACH_PORT_NULL); 3361 } 3362 _dispatch_debug("machport[0x%08x]: sent msg id 0x%x, ctxt %p, opts 0x%x, " 3363 "msg_opts 0x%x, reply on 0x%08x: %s - 0x%x", msg->msgh_remote_port, 3364 msg->msgh_id, dmsg->do_ctxt, opts, msg_opts, reply, 3365 mach_error_string(kr), kr); 3366 if (kr == MACH_SEND_TIMED_OUT && (opts & MACH_SEND_TIMEOUT)) { 3367 if (opts & MACH_SEND_NOTIFY) { 3368 _dispatch_debug("machport[0x%08x]: send-possible notification " 3369 "armed", (mach_port_t)dm->dm_dkev->dk_kevent.ident); 3370 DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev) = 1; 3371 } else { 3372 // send kevent must be installed on the manager queue 3373 dr->dm_needs_mgr = 1; 3374 } 3375 if (reply) { 3376 _dispatch_mach_msg_set_options(dmsg, msg_opts | 3377 DISPATCH_MACH_PSEUDO_RECEIVED); 3378 msg->msgh_reserved = reply; // Remember the original reply port 3379 } 3380 goto out; 3381 } 3382 if (fastpath(!kr) && reply) { 3383 if (_dispatch_queue_get_current() != &_dispatch_mgr_q) { 3384 // reply receive kevent must be installed on the manager queue 3385 dr->dm_needs_mgr = 1; 3386 _dispatch_mach_msg_set_options(dmsg, msg_opts | 3387 DISPATCH_MACH_REGISTER_FOR_REPLY); 3388 if (msg_opts & DISPATCH_MACH_PSEUDO_RECEIVED) { 3389 msg->msgh_reserved = reply; // Remember the original reply port 3390 } 3391 goto out; 3392 } 3393 _dispatch_mach_reply_kevent_register(dm, reply, dmsg->do_ctxt); 3394 } 3395 if (slowpath(dmsg == dr->dm_checkin) && dm->dm_dkev) { 3396 _dispatch_mach_kevent_unregister(dm); 3397 } 3398 _dispatch_mach_msg_set_reason(dmsg, kr, 0); 3399 _dispatch_mach_push(dm, dmsg); 3400 dmsg = NULL; 3401 if (slowpath(kr) && reply) { 3402 // Send failed, so reply was never connected <rdar://problem/14309159> 3403 _dispatch_mach_msg_disconnected(dm, reply, MACH_PORT_NULL); 3404 } 3405out: 3406 return (dispatch_object_t)dmsg; 3407} 3408 3409static void 3410_dispatch_mach_send_push(dispatch_mach_t dm, dispatch_object_t dou) 3411{ 3412 dispatch_mach_send_refs_t dr = dm->dm_refs; 3413 struct dispatch_object_s *prev, *dc = dou._do; 3414 dc->do_next = NULL; 3415 3416 prev = dispatch_atomic_xchg2o(dr, dm_tail, dc, release); 3417 if (fastpath(prev)) { 3418 prev->do_next = dc; 3419 return; 3420 } 3421 dr->dm_head = dc; 3422 _dispatch_wakeup(dm); 3423} 3424 3425DISPATCH_NOINLINE 3426static void 3427_dispatch_mach_send_drain(dispatch_mach_t dm) 3428{ 3429 dispatch_mach_send_refs_t dr = dm->dm_refs; 3430 struct dispatch_object_s *dc = NULL, *next_dc = NULL; 3431 while (dr->dm_tail) { 3432 while (!(dc = fastpath(dr->dm_head))) { 3433 dispatch_hardware_pause(); 3434 } 3435 do { 3436 next_dc = fastpath(dc->do_next); 3437 dr->dm_head = next_dc; 3438 if (!next_dc && !dispatch_atomic_cmpxchg2o(dr, dm_tail, dc, NULL, 3439 relaxed)) { 3440 // Enqueue is TIGHTLY controlled, we won't wait long. 3441 while (!(next_dc = fastpath(dc->do_next))) { 3442 dispatch_hardware_pause(); 3443 } 3444 dr->dm_head = next_dc; 3445 } 3446 if (!DISPATCH_OBJ_IS_VTABLE(dc)) { 3447 if ((long)dc->do_vtable & DISPATCH_OBJ_BARRIER_BIT) { 3448 // send barrier 3449 // leave send queue locked until barrier has completed 3450 return _dispatch_mach_push(dm, dc); 3451 } 3452#if DISPATCH_MACH_SEND_SYNC 3453 if (slowpath((long)dc->do_vtable & DISPATCH_OBJ_SYNC_SLOW_BIT)){ 3454 _dispatch_thread_semaphore_signal( 3455 (_dispatch_thread_semaphore_t)dc->do_ctxt); 3456 continue; 3457 } 3458#endif // DISPATCH_MACH_SEND_SYNC 3459 if (slowpath(!_dispatch_mach_reconnect_invoke(dm, dc))) { 3460 goto out; 3461 } 3462 continue; 3463 } 3464 if (slowpath(dr->dm_disconnect_cnt) || 3465 slowpath(dm->ds_atomic_flags & DSF_CANCELED)) { 3466 _dispatch_mach_msg_not_sent(dm, dc); 3467 continue; 3468 } 3469 if (slowpath(dc = _dispatch_mach_msg_send(dm, dc)._do)) { 3470 goto out; 3471 } 3472 } while ((dc = next_dc)); 3473 } 3474out: 3475 // if this is not a complete drain, we must undo some things 3476 if (slowpath(dc)) { 3477 if (!next_dc && 3478 !dispatch_atomic_cmpxchg2o(dr, dm_tail, NULL, dc, relaxed)) { 3479 // wait for enqueue slow path to finish 3480 while (!(next_dc = fastpath(dr->dm_head))) { 3481 dispatch_hardware_pause(); 3482 } 3483 dc->do_next = next_dc; 3484 } 3485 dr->dm_head = dc; 3486 } 3487 (void)dispatch_atomic_dec2o(dr, dm_sending, release); 3488 _dispatch_wakeup(dm); 3489} 3490 3491static inline void 3492_dispatch_mach_send(dispatch_mach_t dm) 3493{ 3494 dispatch_mach_send_refs_t dr = dm->dm_refs; 3495 if (!fastpath(dr->dm_tail) || !fastpath(dispatch_atomic_cmpxchg2o(dr, 3496 dm_sending, 0, 1, acquire))) { 3497 return; 3498 } 3499 _dispatch_object_debug(dm, "%s", __func__); 3500 _dispatch_mach_send_drain(dm); 3501} 3502 3503DISPATCH_NOINLINE 3504static void 3505_dispatch_mach_merge_kevent(dispatch_mach_t dm, const struct kevent64_s *ke) 3506{ 3507 if (!(ke->fflags & dm->ds_pending_data_mask)) { 3508 return; 3509 } 3510 _dispatch_mach_send(dm); 3511} 3512 3513DISPATCH_NOINLINE 3514void 3515dispatch_mach_send(dispatch_mach_t dm, dispatch_mach_msg_t dmsg, 3516 mach_msg_option_t options) 3517{ 3518 dispatch_mach_send_refs_t dr = dm->dm_refs; 3519 if (slowpath(dmsg->do_next != DISPATCH_OBJECT_LISTLESS)) { 3520 DISPATCH_CLIENT_CRASH("Message already enqueued"); 3521 } 3522 dispatch_retain(dmsg); 3523 dispatch_assert_zero(options & DISPATCH_MACH_OPTIONS_MASK); 3524 _dispatch_mach_msg_set_options(dmsg, options & ~DISPATCH_MACH_OPTIONS_MASK); 3525 if (slowpath(dr->dm_tail) || slowpath(dr->dm_disconnect_cnt) || 3526 slowpath(dm->ds_atomic_flags & DSF_CANCELED) || 3527 slowpath(!dispatch_atomic_cmpxchg2o(dr, dm_sending, 0, 1, 3528 acquire))) { 3529 return _dispatch_mach_send_push(dm, dmsg); 3530 } 3531 if (slowpath(dmsg = _dispatch_mach_msg_send(dm, dmsg)._dmsg)) { 3532 (void)dispatch_atomic_dec2o(dr, dm_sending, release); 3533 return _dispatch_mach_send_push(dm, dmsg); 3534 } 3535 if (slowpath(dr->dm_tail)) { 3536 return _dispatch_mach_send_drain(dm); 3537 } 3538 (void)dispatch_atomic_dec2o(dr, dm_sending, release); 3539 _dispatch_wakeup(dm); 3540} 3541 3542static void 3543_dispatch_mach_disconnect(dispatch_mach_t dm) 3544{ 3545 dispatch_mach_send_refs_t dr = dm->dm_refs; 3546 if (dm->dm_dkev) { 3547 _dispatch_mach_kevent_unregister(dm); 3548 } 3549 if (MACH_PORT_VALID(dr->dm_send)) { 3550 _dispatch_mach_msg_disconnected(dm, MACH_PORT_NULL, dr->dm_send); 3551 } 3552 dr->dm_send = MACH_PORT_NULL; 3553 if (dr->dm_checkin) { 3554 _dispatch_mach_msg_not_sent(dm, dr->dm_checkin); 3555 dr->dm_checkin = NULL; 3556 } 3557 if (!TAILQ_EMPTY(&dm->dm_refs->dm_replies)) { 3558 dispatch_mach_reply_refs_t dmr, tmp; 3559 TAILQ_FOREACH_SAFE(dmr, &dm->dm_refs->dm_replies, dm_list, tmp){ 3560 _dispatch_mach_reply_kevent_unregister(dm, dmr, true); 3561 } 3562 } 3563} 3564 3565DISPATCH_NOINLINE 3566static bool 3567_dispatch_mach_cancel(dispatch_mach_t dm) 3568{ 3569 dispatch_mach_send_refs_t dr = dm->dm_refs; 3570 if (!fastpath(dispatch_atomic_cmpxchg2o(dr, dm_sending, 0, 1, acquire))) { 3571 return false; 3572 } 3573 _dispatch_object_debug(dm, "%s", __func__); 3574 _dispatch_mach_disconnect(dm); 3575 if (dm->ds_dkev) { 3576 mach_port_t local_port = (mach_port_t)dm->ds_dkev->dk_kevent.ident; 3577 _dispatch_source_kevent_unregister((dispatch_source_t)dm); 3578 _dispatch_mach_msg_disconnected(dm, local_port, MACH_PORT_NULL); 3579 } 3580 (void)dispatch_atomic_dec2o(dr, dm_sending, release); 3581 return true; 3582} 3583 3584DISPATCH_NOINLINE 3585static bool 3586_dispatch_mach_reconnect_invoke(dispatch_mach_t dm, dispatch_object_t dou) 3587{ 3588 if (dm->dm_dkev || !TAILQ_EMPTY(&dm->dm_refs->dm_replies)) { 3589 if (slowpath(_dispatch_queue_get_current() != &_dispatch_mgr_q)) { 3590 // send/reply kevents must be uninstalled on the manager queue 3591 return false; 3592 } 3593 } 3594 _dispatch_mach_disconnect(dm); 3595 dispatch_mach_send_refs_t dr = dm->dm_refs; 3596 dr->dm_checkin = dou._dc->dc_data; 3597 dr->dm_send = (mach_port_t)dou._dc->dc_other; 3598 _dispatch_continuation_free(dou._dc); 3599 (void)dispatch_atomic_dec2o(dr, dm_disconnect_cnt, relaxed); 3600 _dispatch_object_debug(dm, "%s", __func__); 3601 return true; 3602} 3603 3604DISPATCH_NOINLINE 3605void 3606dispatch_mach_reconnect(dispatch_mach_t dm, mach_port_t send, 3607 dispatch_mach_msg_t checkin) 3608{ 3609 dispatch_mach_send_refs_t dr = dm->dm_refs; 3610 (void)dispatch_atomic_inc2o(dr, dm_disconnect_cnt, relaxed); 3611 if (MACH_PORT_VALID(send) && checkin) { 3612 dispatch_retain(checkin); 3613 dr->dm_checkin_port = _dispatch_mach_msg_get_remote_port(checkin); 3614 } else { 3615 checkin = NULL; 3616 dr->dm_checkin_port = MACH_PORT_NULL; 3617 } 3618 dispatch_continuation_t dc = _dispatch_continuation_alloc(); 3619 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT); 3620 dc->dc_func = (void*)_dispatch_mach_reconnect_invoke; 3621 dc->dc_ctxt = dc; 3622 dc->dc_data = checkin; 3623 dc->dc_other = (void*)(uintptr_t)send; 3624 return _dispatch_mach_send_push(dm, dc); 3625} 3626 3627#if DISPATCH_MACH_SEND_SYNC 3628DISPATCH_NOINLINE 3629static void 3630_dispatch_mach_send_sync_slow(dispatch_mach_t dm) 3631{ 3632 _dispatch_thread_semaphore_t sema = _dispatch_get_thread_semaphore(); 3633 struct dispatch_object_s dc = { 3634 .do_vtable = (void *)(DISPATCH_OBJ_SYNC_SLOW_BIT), 3635 .do_ctxt = (void*)sema, 3636 }; 3637 _dispatch_mach_send_push(dm, &dc); 3638 _dispatch_thread_semaphore_wait(sema); 3639 _dispatch_put_thread_semaphore(sema); 3640} 3641#endif // DISPATCH_MACH_SEND_SYNC 3642 3643DISPATCH_NOINLINE 3644mach_port_t 3645dispatch_mach_get_checkin_port(dispatch_mach_t dm) 3646{ 3647 dispatch_mach_send_refs_t dr = dm->dm_refs; 3648 if (slowpath(dm->ds_atomic_flags & DSF_CANCELED)) { 3649 return MACH_PORT_DEAD; 3650 } 3651 return dr->dm_checkin_port; 3652} 3653 3654DISPATCH_NOINLINE 3655static void 3656_dispatch_mach_connect_invoke(dispatch_mach_t dm) 3657{ 3658 dispatch_mach_refs_t dr = dm->ds_refs; 3659 _dispatch_client_callout4(dr->dm_handler_ctxt, 3660 DISPATCH_MACH_CONNECTED, NULL, 0, dr->dm_handler_func); 3661 dm->dm_connect_handler_called = 1; 3662} 3663 3664DISPATCH_NOINLINE 3665void 3666_dispatch_mach_msg_invoke(dispatch_mach_msg_t dmsg) 3667{ 3668 dispatch_mach_t dm = (dispatch_mach_t)_dispatch_queue_get_current(); 3669 dispatch_mach_refs_t dr = dm->ds_refs; 3670 mach_error_t err; 3671 unsigned long reason = _dispatch_mach_msg_get_reason(dmsg, &err); 3672 3673 dmsg->do_next = DISPATCH_OBJECT_LISTLESS; 3674 _dispatch_thread_setspecific(dispatch_queue_key, dm->do_targetq); 3675 if (slowpath(!dm->dm_connect_handler_called)) { 3676 _dispatch_mach_connect_invoke(dm); 3677 } 3678 _dispatch_client_callout4(dr->dm_handler_ctxt, reason, dmsg, err, 3679 dr->dm_handler_func); 3680 _dispatch_thread_setspecific(dispatch_queue_key, (dispatch_queue_t)dm); 3681 _dispatch_introspection_queue_item_complete(dmsg); 3682 dispatch_release(dmsg); 3683} 3684 3685DISPATCH_NOINLINE 3686void 3687_dispatch_mach_barrier_invoke(void *ctxt) 3688{ 3689 dispatch_mach_t dm = (dispatch_mach_t)_dispatch_queue_get_current(); 3690 dispatch_mach_refs_t dr = dm->ds_refs; 3691 struct dispatch_continuation_s *dc = ctxt; 3692 void *context = dc->dc_data; 3693 dispatch_function_t barrier = dc->dc_other; 3694 bool send_barrier = ((long)dc->do_vtable & DISPATCH_OBJ_BARRIER_BIT); 3695 3696 _dispatch_thread_setspecific(dispatch_queue_key, dm->do_targetq); 3697 if (slowpath(!dm->dm_connect_handler_called)) { 3698 _dispatch_mach_connect_invoke(dm); 3699 } 3700 _dispatch_client_callout(context, barrier); 3701 _dispatch_client_callout4(dr->dm_handler_ctxt, 3702 DISPATCH_MACH_BARRIER_COMPLETED, NULL, 0, dr->dm_handler_func); 3703 _dispatch_thread_setspecific(dispatch_queue_key, (dispatch_queue_t)dm); 3704 if (send_barrier) { 3705 (void)dispatch_atomic_dec2o(dm->dm_refs, dm_sending, release); 3706 } 3707} 3708 3709DISPATCH_NOINLINE 3710void 3711dispatch_mach_send_barrier_f(dispatch_mach_t dm, void *context, 3712 dispatch_function_t barrier) 3713{ 3714 dispatch_continuation_t dc = _dispatch_continuation_alloc(); 3715 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT); 3716 dc->dc_func = _dispatch_mach_barrier_invoke; 3717 dc->dc_ctxt = dc; 3718 dc->dc_data = context; 3719 dc->dc_other = barrier; 3720 3721 dispatch_mach_send_refs_t dr = dm->dm_refs; 3722 if (slowpath(dr->dm_tail) || slowpath(!dispatch_atomic_cmpxchg2o(dr, 3723 dm_sending, 0, 1, acquire))) { 3724 return _dispatch_mach_send_push(dm, dc); 3725 } 3726 // leave send queue locked until barrier has completed 3727 return _dispatch_mach_push(dm, dc); 3728} 3729 3730DISPATCH_NOINLINE 3731void 3732dispatch_mach_receive_barrier_f(dispatch_mach_t dm, void *context, 3733 dispatch_function_t barrier) 3734{ 3735 dispatch_continuation_t dc = _dispatch_continuation_alloc(); 3736 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT); 3737 dc->dc_func = _dispatch_mach_barrier_invoke; 3738 dc->dc_ctxt = dc; 3739 dc->dc_data = context; 3740 dc->dc_other = barrier; 3741 return _dispatch_mach_push(dm, dc); 3742} 3743 3744DISPATCH_NOINLINE 3745void 3746dispatch_mach_send_barrier(dispatch_mach_t dm, dispatch_block_t barrier) 3747{ 3748 dispatch_mach_send_barrier_f(dm, _dispatch_Block_copy(barrier), 3749 _dispatch_call_block_and_release); 3750} 3751 3752DISPATCH_NOINLINE 3753void 3754dispatch_mach_receive_barrier(dispatch_mach_t dm, dispatch_block_t barrier) 3755{ 3756 dispatch_mach_receive_barrier_f(dm, _dispatch_Block_copy(barrier), 3757 _dispatch_call_block_and_release); 3758} 3759 3760DISPATCH_NOINLINE 3761static void 3762_dispatch_mach_cancel_invoke(dispatch_mach_t dm) 3763{ 3764 dispatch_mach_refs_t dr = dm->ds_refs; 3765 if (slowpath(!dm->dm_connect_handler_called)) { 3766 _dispatch_mach_connect_invoke(dm); 3767 } 3768 _dispatch_client_callout4(dr->dm_handler_ctxt, 3769 DISPATCH_MACH_CANCELED, NULL, 0, dr->dm_handler_func); 3770 dm->dm_cancel_handler_called = 1; 3771 _dispatch_release(dm); // the retain is done at creation time 3772} 3773 3774DISPATCH_NOINLINE 3775void 3776dispatch_mach_cancel(dispatch_mach_t dm) 3777{ 3778 dispatch_source_cancel((dispatch_source_t)dm); 3779} 3780 3781DISPATCH_ALWAYS_INLINE 3782static inline dispatch_queue_t 3783_dispatch_mach_invoke2(dispatch_object_t dou, 3784 _dispatch_thread_semaphore_t *sema_ptr DISPATCH_UNUSED) 3785{ 3786 dispatch_mach_t dm = dou._dm; 3787 3788 // This function performs all mach channel actions. Each action is 3789 // responsible for verifying that it takes place on the appropriate queue. 3790 // If the current queue is not the correct queue for this action, the 3791 // correct queue will be returned and the invoke will be re-driven on that 3792 // queue. 3793 3794 // The order of tests here in invoke and in probe should be consistent. 3795 3796 dispatch_queue_t dq = _dispatch_queue_get_current(); 3797 dispatch_mach_send_refs_t dr = dm->dm_refs; 3798 3799 if (slowpath(!dm->ds_is_installed)) { 3800 // The channel needs to be installed on the manager queue. 3801 if (dq != &_dispatch_mgr_q) { 3802 return &_dispatch_mgr_q; 3803 } 3804 if (dm->ds_dkev) { 3805 _dispatch_source_kevent_register((dispatch_source_t)dm); 3806 } 3807 dm->ds_is_installed = true; 3808 _dispatch_mach_send(dm); 3809 // Apply initial target queue change 3810 _dispatch_queue_drain(dou); 3811 if (dm->dq_items_tail) { 3812 return dm->do_targetq; 3813 } 3814 } else if (dm->dq_items_tail) { 3815 // The channel has pending messages to deliver to the target queue. 3816 if (dq != dm->do_targetq) { 3817 return dm->do_targetq; 3818 } 3819 dispatch_queue_t tq = dm->do_targetq; 3820 if (slowpath(_dispatch_queue_drain(dou))) { 3821 DISPATCH_CLIENT_CRASH("Sync onto mach channel"); 3822 } 3823 if (slowpath(tq != dm->do_targetq)) { 3824 // An item on the channel changed the target queue 3825 return dm->do_targetq; 3826 } 3827 } else if (dr->dm_tail) { 3828 if (slowpath(dr->dm_needs_mgr) || (slowpath(dr->dm_disconnect_cnt) && 3829 (dm->dm_dkev || !TAILQ_EMPTY(&dm->dm_refs->dm_replies)))) { 3830 // Send/reply kevents need to be installed or uninstalled 3831 if (dq != &_dispatch_mgr_q) { 3832 return &_dispatch_mgr_q; 3833 } 3834 } 3835 if (!(dm->dm_dkev && DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev)) || 3836 (dm->ds_atomic_flags & DSF_CANCELED) || dr->dm_disconnect_cnt) { 3837 // The channel has pending messages to send. 3838 _dispatch_mach_send(dm); 3839 } 3840 } else if (dm->ds_atomic_flags & DSF_CANCELED){ 3841 // The channel has been cancelled and needs to be uninstalled from the 3842 // manager queue. After uninstallation, the cancellation handler needs 3843 // to be delivered to the target queue. 3844 if (dm->ds_dkev || dm->dm_dkev || dr->dm_send || 3845 !TAILQ_EMPTY(&dm->dm_refs->dm_replies)) { 3846 if (dq != &_dispatch_mgr_q) { 3847 return &_dispatch_mgr_q; 3848 } 3849 if (!_dispatch_mach_cancel(dm)) { 3850 return NULL; 3851 } 3852 } 3853 if (!dm->dm_cancel_handler_called) { 3854 if (dq != dm->do_targetq) { 3855 return dm->do_targetq; 3856 } 3857 _dispatch_mach_cancel_invoke(dm); 3858 } 3859 } 3860 return NULL; 3861} 3862 3863DISPATCH_NOINLINE 3864void 3865_dispatch_mach_invoke(dispatch_mach_t dm) 3866{ 3867 _dispatch_queue_class_invoke(dm, _dispatch_mach_invoke2); 3868} 3869 3870unsigned long 3871_dispatch_mach_probe(dispatch_mach_t dm) 3872{ 3873 // This function determines whether the mach channel needs to be invoked. 3874 // The order of tests here in probe and in invoke should be consistent. 3875 3876 dispatch_mach_send_refs_t dr = dm->dm_refs; 3877 3878 if (slowpath(!dm->ds_is_installed)) { 3879 // The channel needs to be installed on the manager queue. 3880 return true; 3881 } else if (dm->dq_items_tail) { 3882 // The source has pending messages to deliver to the target queue. 3883 return true; 3884 } else if (dr->dm_tail && 3885 (!(dm->dm_dkev && DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev)) || 3886 (dm->ds_atomic_flags & DSF_CANCELED) || dr->dm_disconnect_cnt)) { 3887 // The channel has pending messages to send. 3888 return true; 3889 } else if (dm->ds_atomic_flags & DSF_CANCELED) { 3890 if (dm->ds_dkev || dm->dm_dkev || dr->dm_send || 3891 !TAILQ_EMPTY(&dm->dm_refs->dm_replies) || 3892 !dm->dm_cancel_handler_called) { 3893 // The channel needs to be uninstalled from the manager queue, or 3894 // the cancellation handler needs to be delivered to the target 3895 // queue. 3896 return true; 3897 } 3898 } 3899 // Nothing to do. 3900 return false; 3901} 3902 3903#pragma mark - 3904#pragma mark dispatch_mach_msg_t 3905 3906dispatch_mach_msg_t 3907dispatch_mach_msg_create(mach_msg_header_t *msg, size_t size, 3908 dispatch_mach_msg_destructor_t destructor, mach_msg_header_t **msg_ptr) 3909{ 3910 if (slowpath(size < sizeof(mach_msg_header_t)) || 3911 slowpath(destructor && !msg)) { 3912 DISPATCH_CLIENT_CRASH("Empty message"); 3913 } 3914 dispatch_mach_msg_t dmsg = _dispatch_alloc(DISPATCH_VTABLE(mach_msg), 3915 sizeof(struct dispatch_mach_msg_s) + 3916 (destructor ? 0 : size - sizeof(dmsg->msg))); 3917 if (destructor) { 3918 dmsg->msg = msg; 3919 } else if (msg) { 3920 memcpy(dmsg->buf, msg, size); 3921 } 3922 dmsg->do_next = DISPATCH_OBJECT_LISTLESS; 3923 dmsg->do_targetq = _dispatch_get_root_queue(0, false); 3924 dmsg->destructor = destructor; 3925 dmsg->size = size; 3926 if (msg_ptr) { 3927 *msg_ptr = _dispatch_mach_msg_get_msg(dmsg); 3928 } 3929 return dmsg; 3930} 3931 3932void 3933_dispatch_mach_msg_dispose(dispatch_mach_msg_t dmsg) 3934{ 3935 switch (dmsg->destructor) { 3936 case DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT: 3937 break; 3938 case DISPATCH_MACH_MSG_DESTRUCTOR_FREE: 3939 free(dmsg->msg); 3940 break; 3941 case DISPATCH_MACH_MSG_DESTRUCTOR_VM_DEALLOCATE: { 3942 mach_vm_size_t vm_size = dmsg->size; 3943 mach_vm_address_t vm_addr = (uintptr_t)dmsg->msg; 3944 (void)dispatch_assume_zero(mach_vm_deallocate(mach_task_self(), 3945 vm_addr, vm_size)); 3946 break; 3947 }} 3948} 3949 3950static inline mach_msg_header_t* 3951_dispatch_mach_msg_get_msg(dispatch_mach_msg_t dmsg) 3952{ 3953 return dmsg->destructor ? dmsg->msg : (mach_msg_header_t*)dmsg->buf; 3954} 3955 3956mach_msg_header_t* 3957dispatch_mach_msg_get_msg(dispatch_mach_msg_t dmsg, size_t *size_ptr) 3958{ 3959 if (size_ptr) { 3960 *size_ptr = dmsg->size; 3961 } 3962 return _dispatch_mach_msg_get_msg(dmsg); 3963} 3964 3965size_t 3966_dispatch_mach_msg_debug(dispatch_mach_msg_t dmsg, char* buf, size_t bufsiz) 3967{ 3968 size_t offset = 0; 3969 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ", 3970 dx_kind(dmsg), dmsg); 3971 offset += dsnprintf(&buf[offset], bufsiz - offset, "xrefcnt = 0x%x, " 3972 "refcnt = 0x%x, ", dmsg->do_xref_cnt + 1, dmsg->do_ref_cnt + 1); 3973 offset += dsnprintf(&buf[offset], bufsiz - offset, "opts/err = 0x%x, " 3974 "msgh[%p] = { ", dmsg->do_suspend_cnt, dmsg->buf); 3975 mach_msg_header_t *hdr = _dispatch_mach_msg_get_msg(dmsg); 3976 if (hdr->msgh_id) { 3977 offset += dsnprintf(&buf[offset], bufsiz - offset, "id 0x%x, ", 3978 hdr->msgh_id); 3979 } 3980 if (hdr->msgh_size) { 3981 offset += dsnprintf(&buf[offset], bufsiz - offset, "size %u, ", 3982 hdr->msgh_size); 3983 } 3984 if (hdr->msgh_bits) { 3985 offset += dsnprintf(&buf[offset], bufsiz - offset, "bits <l %u, r %u", 3986 MACH_MSGH_BITS_LOCAL(hdr->msgh_bits), 3987 MACH_MSGH_BITS_REMOTE(hdr->msgh_bits)); 3988 if (MACH_MSGH_BITS_OTHER(hdr->msgh_bits)) { 3989 offset += dsnprintf(&buf[offset], bufsiz - offset, ", o 0x%x", 3990 MACH_MSGH_BITS_OTHER(hdr->msgh_bits)); 3991 } 3992 offset += dsnprintf(&buf[offset], bufsiz - offset, ">, "); 3993 } 3994 if (hdr->msgh_local_port && hdr->msgh_remote_port) { 3995 offset += dsnprintf(&buf[offset], bufsiz - offset, "local 0x%x, " 3996 "remote 0x%x", hdr->msgh_local_port, hdr->msgh_remote_port); 3997 } else if (hdr->msgh_local_port) { 3998 offset += dsnprintf(&buf[offset], bufsiz - offset, "local 0x%x", 3999 hdr->msgh_local_port); 4000 } else if (hdr->msgh_remote_port) { 4001 offset += dsnprintf(&buf[offset], bufsiz - offset, "remote 0x%x", 4002 hdr->msgh_remote_port); 4003 } else { 4004 offset += dsnprintf(&buf[offset], bufsiz - offset, "no ports"); 4005 } 4006 offset += dsnprintf(&buf[offset], bufsiz - offset, " } }"); 4007 return offset; 4008} 4009 4010#pragma mark - 4011#pragma mark dispatch_mig_server 4012 4013mach_msg_return_t 4014dispatch_mig_server(dispatch_source_t ds, size_t maxmsgsz, 4015 dispatch_mig_callback_t callback) 4016{ 4017 mach_msg_options_t options = MACH_RCV_MSG | MACH_RCV_TIMEOUT 4018 | MACH_RCV_TRAILER_ELEMENTS(MACH_RCV_TRAILER_CTX) 4019 | MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0); 4020 mach_msg_options_t tmp_options; 4021 mig_reply_error_t *bufTemp, *bufRequest, *bufReply; 4022 mach_msg_return_t kr = 0; 4023 uint64_t assertion_token = 0; 4024 unsigned int cnt = 1000; // do not stall out serial queues 4025 boolean_t demux_success; 4026 bool received = false; 4027 size_t rcv_size = maxmsgsz + MAX_TRAILER_SIZE; 4028 4029 // XXX FIXME -- allocate these elsewhere 4030 bufRequest = alloca(rcv_size); 4031 bufReply = alloca(rcv_size); 4032 bufReply->Head.msgh_size = 0; 4033 bufRequest->RetCode = 0; 4034 4035#if DISPATCH_DEBUG 4036 options |= MACH_RCV_LARGE; // rdar://problem/8422992 4037#endif 4038 tmp_options = options; 4039 // XXX FIXME -- change this to not starve out the target queue 4040 for (;;) { 4041 if (DISPATCH_OBJECT_SUSPENDED(ds) || (--cnt == 0)) { 4042 options &= ~MACH_RCV_MSG; 4043 tmp_options &= ~MACH_RCV_MSG; 4044 4045 if (!(tmp_options & MACH_SEND_MSG)) { 4046 goto out; 4047 } 4048 } 4049 kr = mach_msg(&bufReply->Head, tmp_options, bufReply->Head.msgh_size, 4050 (mach_msg_size_t)rcv_size, (mach_port_t)ds->ds_ident_hack, 0,0); 4051 4052 tmp_options = options; 4053 4054 if (slowpath(kr)) { 4055 switch (kr) { 4056 case MACH_SEND_INVALID_DEST: 4057 case MACH_SEND_TIMED_OUT: 4058 if (bufReply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX) { 4059 mach_msg_destroy(&bufReply->Head); 4060 } 4061 break; 4062 case MACH_RCV_TIMED_OUT: 4063 // Don't return an error if a message was sent this time or 4064 // a message was successfully received previously 4065 // rdar://problems/7363620&7791738 4066 if(bufReply->Head.msgh_remote_port || received) { 4067 kr = MACH_MSG_SUCCESS; 4068 } 4069 break; 4070 case MACH_RCV_INVALID_NAME: 4071 break; 4072#if DISPATCH_DEBUG 4073 case MACH_RCV_TOO_LARGE: 4074 // receive messages that are too large and log their id and size 4075 // rdar://problem/8422992 4076 tmp_options &= ~MACH_RCV_LARGE; 4077 size_t large_size = bufReply->Head.msgh_size + MAX_TRAILER_SIZE; 4078 void *large_buf = malloc(large_size); 4079 if (large_buf) { 4080 rcv_size = large_size; 4081 bufReply = large_buf; 4082 } 4083 if (!mach_msg(&bufReply->Head, tmp_options, 0, 4084 (mach_msg_size_t)rcv_size, 4085 (mach_port_t)ds->ds_ident_hack, 0, 0)) { 4086 _dispatch_log("BUG in libdispatch client: " 4087 "dispatch_mig_server received message larger than " 4088 "requested size %zd: id = 0x%x, size = %d", 4089 maxmsgsz, bufReply->Head.msgh_id, 4090 bufReply->Head.msgh_size); 4091 } 4092 if (large_buf) { 4093 free(large_buf); 4094 } 4095 // fall through 4096#endif 4097 default: 4098 _dispatch_bug_mach_client( 4099 "dispatch_mig_server: mach_msg() failed", kr); 4100 break; 4101 } 4102 goto out; 4103 } 4104 4105 if (!(tmp_options & MACH_RCV_MSG)) { 4106 goto out; 4107 } 4108 4109 if (assertion_token) { 4110#if DISPATCH_USE_IMPORTANCE_ASSERTION 4111 int r = proc_importance_assertion_complete(assertion_token); 4112 (void)dispatch_assume_zero(r); 4113#endif 4114 assertion_token = 0; 4115 } 4116 received = true; 4117 4118 bufTemp = bufRequest; 4119 bufRequest = bufReply; 4120 bufReply = bufTemp; 4121 4122#if DISPATCH_USE_IMPORTANCE_ASSERTION 4123 int r = proc_importance_assertion_begin_with_msg(&bufRequest->Head, 4124 NULL, &assertion_token); 4125 if (r && slowpath(r != EIO)) { 4126 (void)dispatch_assume_zero(r); 4127 } 4128#endif 4129 4130 demux_success = callback(&bufRequest->Head, &bufReply->Head); 4131 4132 if (!demux_success) { 4133 // destroy the request - but not the reply port 4134 bufRequest->Head.msgh_remote_port = 0; 4135 mach_msg_destroy(&bufRequest->Head); 4136 } else if (!(bufReply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX)) { 4137 // if MACH_MSGH_BITS_COMPLEX is _not_ set, then bufReply->RetCode 4138 // is present 4139 if (slowpath(bufReply->RetCode)) { 4140 if (bufReply->RetCode == MIG_NO_REPLY) { 4141 continue; 4142 } 4143 4144 // destroy the request - but not the reply port 4145 bufRequest->Head.msgh_remote_port = 0; 4146 mach_msg_destroy(&bufRequest->Head); 4147 } 4148 } 4149 4150 if (bufReply->Head.msgh_remote_port) { 4151 tmp_options |= MACH_SEND_MSG; 4152 if (MACH_MSGH_BITS_REMOTE(bufReply->Head.msgh_bits) != 4153 MACH_MSG_TYPE_MOVE_SEND_ONCE) { 4154 tmp_options |= MACH_SEND_TIMEOUT; 4155 } 4156 } 4157 } 4158 4159out: 4160 if (assertion_token) { 4161#if DISPATCH_USE_IMPORTANCE_ASSERTION 4162 int r = proc_importance_assertion_complete(assertion_token); 4163 (void)dispatch_assume_zero(r); 4164#endif 4165 } 4166 4167 return kr; 4168} 4169 4170#endif /* HAVE_MACH */ 4171 4172#pragma mark - 4173#pragma mark dispatch_source_debug 4174 4175DISPATCH_NOINLINE 4176static const char * 4177_evfiltstr(short filt) 4178{ 4179 switch (filt) { 4180#define _evfilt2(f) case (f): return #f 4181 _evfilt2(EVFILT_READ); 4182 _evfilt2(EVFILT_WRITE); 4183 _evfilt2(EVFILT_AIO); 4184 _evfilt2(EVFILT_VNODE); 4185 _evfilt2(EVFILT_PROC); 4186 _evfilt2(EVFILT_SIGNAL); 4187 _evfilt2(EVFILT_TIMER); 4188#ifdef EVFILT_VM 4189 _evfilt2(EVFILT_VM); 4190#endif 4191#ifdef EVFILT_MEMORYSTATUS 4192 _evfilt2(EVFILT_MEMORYSTATUS); 4193#endif 4194#if HAVE_MACH 4195 _evfilt2(EVFILT_MACHPORT); 4196 _evfilt2(DISPATCH_EVFILT_MACH_NOTIFICATION); 4197#endif 4198 _evfilt2(EVFILT_FS); 4199 _evfilt2(EVFILT_USER); 4200 4201 _evfilt2(DISPATCH_EVFILT_TIMER); 4202 _evfilt2(DISPATCH_EVFILT_CUSTOM_ADD); 4203 _evfilt2(DISPATCH_EVFILT_CUSTOM_OR); 4204 default: 4205 return "EVFILT_missing"; 4206 } 4207} 4208 4209static size_t 4210_dispatch_source_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz) 4211{ 4212 dispatch_queue_t target = ds->do_targetq; 4213 return dsnprintf(buf, bufsiz, "target = %s[%p], ident = 0x%lx, " 4214 "pending_data = 0x%lx, pending_data_mask = 0x%lx, ", 4215 target && target->dq_label ? target->dq_label : "", target, 4216 ds->ds_ident_hack, ds->ds_pending_data, ds->ds_pending_data_mask); 4217} 4218 4219static size_t 4220_dispatch_timer_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz) 4221{ 4222 dispatch_source_refs_t dr = ds->ds_refs; 4223 return dsnprintf(buf, bufsiz, "timer = { target = 0x%llx, deadline = 0x%llx," 4224 " last_fire = 0x%llx, interval = 0x%llx, flags = 0x%lx }, ", 4225 ds_timer(dr).target, ds_timer(dr).deadline, ds_timer(dr).last_fire, 4226 ds_timer(dr).interval, ds_timer(dr).flags); 4227} 4228 4229size_t 4230_dispatch_source_debug(dispatch_source_t ds, char* buf, size_t bufsiz) 4231{ 4232 size_t offset = 0; 4233 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ", 4234 dx_kind(ds), ds); 4235 offset += _dispatch_object_debug_attr(ds, &buf[offset], bufsiz - offset); 4236 offset += _dispatch_source_debug_attr(ds, &buf[offset], bufsiz - offset); 4237 if (ds->ds_is_timer) { 4238 offset += _dispatch_timer_debug_attr(ds, &buf[offset], bufsiz - offset); 4239 } 4240 offset += dsnprintf(&buf[offset], bufsiz - offset, "filter = %s }", 4241 ds->ds_dkev ? _evfiltstr(ds->ds_dkev->dk_kevent.filter) : "????"); 4242 return offset; 4243} 4244 4245static size_t 4246_dispatch_mach_debug_attr(dispatch_mach_t dm, char* buf, size_t bufsiz) 4247{ 4248 dispatch_queue_t target = dm->do_targetq; 4249 return dsnprintf(buf, bufsiz, "target = %s[%p], receive = 0x%x, " 4250 "send = 0x%x, send-possible = 0x%x%s, checkin = 0x%x%s, " 4251 "sending = %d, disconnected = %d, canceled = %d ", 4252 target && target->dq_label ? target->dq_label : "", target, 4253 dm->ds_dkev ?(mach_port_t)dm->ds_dkev->dk_kevent.ident:0, 4254 dm->dm_refs->dm_send, 4255 dm->dm_dkev ?(mach_port_t)dm->dm_dkev->dk_kevent.ident:0, 4256 dm->dm_dkev && DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev) ? 4257 " (armed)" : "", dm->dm_refs->dm_checkin_port, 4258 dm->dm_refs->dm_checkin ? " (pending)" : "", 4259 dm->dm_refs->dm_sending, dm->dm_refs->dm_disconnect_cnt, 4260 (bool)(dm->ds_atomic_flags & DSF_CANCELED)); 4261} 4262size_t 4263_dispatch_mach_debug(dispatch_mach_t dm, char* buf, size_t bufsiz) 4264{ 4265 size_t offset = 0; 4266 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ", 4267 dm->dq_label ? dm->dq_label : dx_kind(dm), dm); 4268 offset += _dispatch_object_debug_attr(dm, &buf[offset], bufsiz - offset); 4269 offset += _dispatch_mach_debug_attr(dm, &buf[offset], bufsiz - offset); 4270 offset += dsnprintf(&buf[offset], bufsiz - offset, "}"); 4271 return offset; 4272} 4273 4274#if DISPATCH_DEBUG 4275static void 4276_dispatch_kevent_debug(struct kevent64_s* kev, const char* str) 4277{ 4278 _dispatch_log("kevent[%p] = { ident = 0x%llx, filter = %s, flags = 0x%x, " 4279 "fflags = 0x%x, data = 0x%llx, udata = 0x%llx, ext[0] = 0x%llx, " 4280 "ext[1] = 0x%llx }: %s", kev, kev->ident, _evfiltstr(kev->filter), 4281 kev->flags, kev->fflags, kev->data, kev->udata, kev->ext[0], 4282 kev->ext[1], str); 4283} 4284 4285static void 4286_dispatch_kevent_debugger2(void *context) 4287{ 4288 struct sockaddr sa; 4289 socklen_t sa_len = sizeof(sa); 4290 int c, fd = (int)(long)context; 4291 unsigned int i; 4292 dispatch_kevent_t dk; 4293 dispatch_source_t ds; 4294 dispatch_source_refs_t dr; 4295 FILE *debug_stream; 4296 4297 c = accept(fd, &sa, &sa_len); 4298 if (c == -1) { 4299 if (errno != EAGAIN) { 4300 (void)dispatch_assume_zero(errno); 4301 } 4302 return; 4303 } 4304#if 0 4305 int r = fcntl(c, F_SETFL, 0); // disable non-blocking IO 4306 if (r == -1) { 4307 (void)dispatch_assume_zero(errno); 4308 } 4309#endif 4310 debug_stream = fdopen(c, "a"); 4311 if (!dispatch_assume(debug_stream)) { 4312 close(c); 4313 return; 4314 } 4315 4316 fprintf(debug_stream, "HTTP/1.0 200 OK\r\n"); 4317 fprintf(debug_stream, "Content-type: text/html\r\n"); 4318 fprintf(debug_stream, "Pragma: nocache\r\n"); 4319 fprintf(debug_stream, "\r\n"); 4320 fprintf(debug_stream, "<html>\n"); 4321 fprintf(debug_stream, "<head><title>PID %u</title></head>\n", getpid()); 4322 fprintf(debug_stream, "<body>\n<ul>\n"); 4323 4324 //fprintf(debug_stream, "<tr><td>DK</td><td>DK</td><td>DK</td><td>DK</td>" 4325 // "<td>DK</td><td>DK</td><td>DK</td></tr>\n"); 4326 4327 for (i = 0; i < DSL_HASH_SIZE; i++) { 4328 if (TAILQ_EMPTY(&_dispatch_sources[i])) { 4329 continue; 4330 } 4331 TAILQ_FOREACH(dk, &_dispatch_sources[i], dk_list) { 4332 fprintf(debug_stream, "\t<br><li>DK %p ident %lu filter %s flags " 4333 "0x%hx fflags 0x%x data 0x%lx udata %p\n", 4334 dk, (unsigned long)dk->dk_kevent.ident, 4335 _evfiltstr(dk->dk_kevent.filter), dk->dk_kevent.flags, 4336 dk->dk_kevent.fflags, (unsigned long)dk->dk_kevent.data, 4337 (void*)dk->dk_kevent.udata); 4338 fprintf(debug_stream, "\t\t<ul>\n"); 4339 TAILQ_FOREACH(dr, &dk->dk_sources, dr_list) { 4340 ds = _dispatch_source_from_refs(dr); 4341 fprintf(debug_stream, "\t\t\t<li>DS %p refcnt 0x%x suspend " 4342 "0x%x data 0x%lx mask 0x%lx flags 0x%x</li>\n", 4343 ds, ds->do_ref_cnt + 1, ds->do_suspend_cnt, 4344 ds->ds_pending_data, ds->ds_pending_data_mask, 4345 ds->ds_atomic_flags); 4346 if (ds->do_suspend_cnt == DISPATCH_OBJECT_SUSPEND_LOCK) { 4347 dispatch_queue_t dq = ds->do_targetq; 4348 fprintf(debug_stream, "\t\t<br>DQ: %p refcnt 0x%x suspend " 4349 "0x%x label: %s\n", dq, dq->do_ref_cnt + 1, 4350 dq->do_suspend_cnt, dq->dq_label ? dq->dq_label:""); 4351 } 4352 } 4353 fprintf(debug_stream, "\t\t</ul>\n"); 4354 fprintf(debug_stream, "\t</li>\n"); 4355 } 4356 } 4357 fprintf(debug_stream, "</ul>\n</body>\n</html>\n"); 4358 fflush(debug_stream); 4359 fclose(debug_stream); 4360} 4361 4362static void 4363_dispatch_kevent_debugger2_cancel(void *context) 4364{ 4365 int ret, fd = (int)(long)context; 4366 4367 ret = close(fd); 4368 if (ret != -1) { 4369 (void)dispatch_assume_zero(errno); 4370 } 4371} 4372 4373static void 4374_dispatch_kevent_debugger(void *context DISPATCH_UNUSED) 4375{ 4376 union { 4377 struct sockaddr_in sa_in; 4378 struct sockaddr sa; 4379 } sa_u = { 4380 .sa_in = { 4381 .sin_family = AF_INET, 4382 .sin_addr = { htonl(INADDR_LOOPBACK), }, 4383 }, 4384 }; 4385 dispatch_source_t ds; 4386 const char *valstr; 4387 int val, r, fd, sock_opt = 1; 4388 socklen_t slen = sizeof(sa_u); 4389 4390 if (issetugid()) { 4391 return; 4392 } 4393 valstr = getenv("LIBDISPATCH_DEBUGGER"); 4394 if (!valstr) { 4395 return; 4396 } 4397 val = atoi(valstr); 4398 if (val == 2) { 4399 sa_u.sa_in.sin_addr.s_addr = 0; 4400 } 4401 fd = socket(PF_INET, SOCK_STREAM, 0); 4402 if (fd == -1) { 4403 (void)dispatch_assume_zero(errno); 4404 return; 4405 } 4406 r = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&sock_opt, 4407 (socklen_t) sizeof sock_opt); 4408 if (r == -1) { 4409 (void)dispatch_assume_zero(errno); 4410 goto out_bad; 4411 } 4412#if 0 4413 r = fcntl(fd, F_SETFL, O_NONBLOCK); 4414 if (r == -1) { 4415 (void)dispatch_assume_zero(errno); 4416 goto out_bad; 4417 } 4418#endif 4419 r = bind(fd, &sa_u.sa, sizeof(sa_u)); 4420 if (r == -1) { 4421 (void)dispatch_assume_zero(errno); 4422 goto out_bad; 4423 } 4424 r = listen(fd, SOMAXCONN); 4425 if (r == -1) { 4426 (void)dispatch_assume_zero(errno); 4427 goto out_bad; 4428 } 4429 r = getsockname(fd, &sa_u.sa, &slen); 4430 if (r == -1) { 4431 (void)dispatch_assume_zero(errno); 4432 goto out_bad; 4433 } 4434 4435 ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, (uintptr_t)fd, 0, 4436 &_dispatch_mgr_q); 4437 if (dispatch_assume(ds)) { 4438 _dispatch_log("LIBDISPATCH: debug port: %hu", 4439 (in_port_t)ntohs(sa_u.sa_in.sin_port)); 4440 4441 /* ownership of fd transfers to ds */ 4442 dispatch_set_context(ds, (void *)(long)fd); 4443 dispatch_source_set_event_handler_f(ds, _dispatch_kevent_debugger2); 4444 dispatch_source_set_cancel_handler_f(ds, 4445 _dispatch_kevent_debugger2_cancel); 4446 dispatch_resume(ds); 4447 4448 return; 4449 } 4450out_bad: 4451 close(fd); 4452} 4453 4454#if HAVE_MACH 4455 4456#ifndef MACH_PORT_TYPE_SPREQUEST 4457#define MACH_PORT_TYPE_SPREQUEST 0x40000000 4458#endif 4459 4460DISPATCH_NOINLINE 4461void 4462dispatch_debug_machport(mach_port_t name, const char* str) 4463{ 4464 mach_port_type_t type; 4465 mach_msg_bits_t ns = 0, nr = 0, nso = 0, nd = 0; 4466 unsigned int dnreqs = 0, dnrsiz; 4467 kern_return_t kr = mach_port_type(mach_task_self(), name, &type); 4468 if (kr) { 4469 _dispatch_log("machport[0x%08x] = { error(0x%x) \"%s\" }: %s", name, 4470 kr, mach_error_string(kr), str); 4471 return; 4472 } 4473 if (type & MACH_PORT_TYPE_SEND) { 4474 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name, 4475 MACH_PORT_RIGHT_SEND, &ns)); 4476 } 4477 if (type & MACH_PORT_TYPE_SEND_ONCE) { 4478 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name, 4479 MACH_PORT_RIGHT_SEND_ONCE, &nso)); 4480 } 4481 if (type & MACH_PORT_TYPE_DEAD_NAME) { 4482 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name, 4483 MACH_PORT_RIGHT_DEAD_NAME, &nd)); 4484 } 4485 if (type & (MACH_PORT_TYPE_RECEIVE|MACH_PORT_TYPE_SEND)) { 4486 (void)dispatch_assume_zero(mach_port_dnrequest_info(mach_task_self(), 4487 name, &dnrsiz, &dnreqs)); 4488 } 4489 if (type & MACH_PORT_TYPE_RECEIVE) { 4490 mach_port_status_t status = { .mps_pset = 0, }; 4491 mach_msg_type_number_t cnt = MACH_PORT_RECEIVE_STATUS_COUNT; 4492 (void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name, 4493 MACH_PORT_RIGHT_RECEIVE, &nr)); 4494 (void)dispatch_assume_zero(mach_port_get_attributes(mach_task_self(), 4495 name, MACH_PORT_RECEIVE_STATUS, (void*)&status, &cnt)); 4496 _dispatch_log("machport[0x%08x] = { R(%03u) S(%03u) SO(%03u) D(%03u) " 4497 "dnreqs(%03u) spreq(%s) nsreq(%s) pdreq(%s) srights(%s) " 4498 "sorights(%03u) qlim(%03u) msgcount(%03u) mkscount(%03u) " 4499 "seqno(%03u) }: %s", name, nr, ns, nso, nd, dnreqs, 4500 type & MACH_PORT_TYPE_SPREQUEST ? "Y":"N", 4501 status.mps_nsrequest ? "Y":"N", status.mps_pdrequest ? "Y":"N", 4502 status.mps_srights ? "Y":"N", status.mps_sorights, 4503 status.mps_qlimit, status.mps_msgcount, status.mps_mscount, 4504 status.mps_seqno, str); 4505 } else if (type & (MACH_PORT_TYPE_SEND|MACH_PORT_TYPE_SEND_ONCE| 4506 MACH_PORT_TYPE_DEAD_NAME)) { 4507 _dispatch_log("machport[0x%08x] = { R(%03u) S(%03u) SO(%03u) D(%03u) " 4508 "dnreqs(%03u) spreq(%s) }: %s", name, nr, ns, nso, nd, dnreqs, 4509 type & MACH_PORT_TYPE_SPREQUEST ? "Y":"N", str); 4510 } else { 4511 _dispatch_log("machport[0x%08x] = { type(0x%08x) }: %s", name, type, 4512 str); 4513 } 4514} 4515 4516#endif // HAVE_MACH 4517 4518#endif // DISPATCH_DEBUG 4519