1/*
2 * Copyright (c) 2008-2013 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21#include "internal.h"
22#if HAVE_MACH
23#include "protocol.h"
24#include "protocolServer.h"
25#endif
26#include <sys/mount.h>
27
28static void _dispatch_source_merge_kevent(dispatch_source_t ds,
29		const struct kevent64_s *ke);
30static bool _dispatch_kevent_register(dispatch_kevent_t *dkp, uint32_t *flgp);
31static void _dispatch_kevent_unregister(dispatch_kevent_t dk, uint32_t flg);
32static bool _dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags,
33		uint32_t del_flags);
34static void _dispatch_kevent_drain(struct kevent64_s *ke);
35static void _dispatch_kevent_merge(struct kevent64_s *ke);
36static void _dispatch_timers_kevent(struct kevent64_s *ke);
37static void _dispatch_timers_unregister(dispatch_source_t ds,
38		dispatch_kevent_t dk);
39static void _dispatch_timers_update(dispatch_source_t ds);
40static void _dispatch_timer_aggregates_check(void);
41static void _dispatch_timer_aggregates_register(dispatch_source_t ds);
42static void _dispatch_timer_aggregates_update(dispatch_source_t ds,
43		unsigned int tidx);
44static void _dispatch_timer_aggregates_unregister(dispatch_source_t ds,
45		unsigned int tidx);
46static inline unsigned long _dispatch_source_timer_data(
47		dispatch_source_refs_t dr, unsigned long prev);
48static long _dispatch_kq_update(const struct kevent64_s *);
49static void _dispatch_memorystatus_init(void);
50#if HAVE_MACH
51static void _dispatch_mach_host_calendar_change_register(void);
52static void _dispatch_mach_recv_msg_buf_init(void);
53static kern_return_t _dispatch_kevent_machport_resume(dispatch_kevent_t dk,
54		uint32_t new_flags, uint32_t del_flags);
55static kern_return_t _dispatch_kevent_mach_notify_resume(dispatch_kevent_t dk,
56		uint32_t new_flags, uint32_t del_flags);
57static inline void _dispatch_kevent_mach_portset(struct kevent64_s *ke);
58#else
59static inline void _dispatch_mach_host_calendar_change_register(void) {}
60static inline void _dispatch_mach_recv_msg_buf_init(void) {}
61#endif
62static const char * _evfiltstr(short filt);
63#if DISPATCH_DEBUG
64static void _dispatch_kevent_debug(struct kevent64_s* kev, const char* str);
65static void _dispatch_kevent_debugger(void *context);
66#define DISPATCH_ASSERT_ON_MANAGER_QUEUE() \
67	dispatch_assert(_dispatch_queue_get_current() == &_dispatch_mgr_q)
68#else
69static inline void
70_dispatch_kevent_debug(struct kevent64_s* kev DISPATCH_UNUSED,
71		const char* str DISPATCH_UNUSED) {}
72#define DISPATCH_ASSERT_ON_MANAGER_QUEUE()
73#endif
74
75#pragma mark -
76#pragma mark dispatch_source_t
77
78dispatch_source_t
79dispatch_source_create(dispatch_source_type_t type,
80	uintptr_t handle,
81	unsigned long mask,
82	dispatch_queue_t q)
83{
84	const struct kevent64_s *proto_kev = &type->ke;
85	dispatch_source_t ds;
86	dispatch_kevent_t dk;
87
88	// input validation
89	if (type == NULL || (mask & ~type->mask)) {
90		return NULL;
91	}
92
93	switch (type->ke.filter) {
94	case EVFILT_SIGNAL:
95		if (handle >= NSIG) {
96			return NULL;
97		}
98		break;
99	case EVFILT_FS:
100#if DISPATCH_USE_VM_PRESSURE
101	case EVFILT_VM:
102#endif
103#if DISPATCH_USE_MEMORYSTATUS
104	case EVFILT_MEMORYSTATUS:
105#endif
106	case DISPATCH_EVFILT_CUSTOM_ADD:
107	case DISPATCH_EVFILT_CUSTOM_OR:
108		if (handle) {
109			return NULL;
110		}
111		break;
112	case DISPATCH_EVFILT_TIMER:
113		if (!!handle ^ !!type->ke.ident) {
114			return NULL;
115		}
116		break;
117	default:
118		break;
119	}
120
121	ds = _dispatch_alloc(DISPATCH_VTABLE(source),
122			sizeof(struct dispatch_source_s));
123	// Initialize as a queue first, then override some settings below.
124	_dispatch_queue_init((dispatch_queue_t)ds);
125	ds->dq_label = "source";
126
127	ds->do_ref_cnt++; // the reference the manager queue holds
128	ds->do_ref_cnt++; // since source is created suspended
129	ds->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_INTERVAL;
130	// The initial target queue is the manager queue, in order to get
131	// the source installed. <rdar://problem/8928171>
132	ds->do_targetq = &_dispatch_mgr_q;
133
134	dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s));
135	dk->dk_kevent = *proto_kev;
136	dk->dk_kevent.ident = handle;
137	dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
138	dk->dk_kevent.fflags |= (uint32_t)mask;
139	dk->dk_kevent.udata = (uintptr_t)dk;
140	TAILQ_INIT(&dk->dk_sources);
141
142	ds->ds_dkev = dk;
143	ds->ds_pending_data_mask = dk->dk_kevent.fflags;
144	ds->ds_ident_hack = (uintptr_t)dk->dk_kevent.ident;
145	if ((EV_DISPATCH|EV_ONESHOT) & proto_kev->flags) {
146		ds->ds_is_level = true;
147		ds->ds_needs_rearm = true;
148	} else if (!(EV_CLEAR & proto_kev->flags)) {
149		// we cheat and use EV_CLEAR to mean a "flag thingy"
150		ds->ds_is_adder = true;
151	}
152	// Some sources require special processing
153	if (type->init != NULL) {
154		type->init(ds, type, handle, mask, q);
155	}
156	dispatch_assert(!(ds->ds_is_level && ds->ds_is_adder));
157
158	if (fastpath(!ds->ds_refs)) {
159		ds->ds_refs = _dispatch_calloc(1ul,
160				sizeof(struct dispatch_source_refs_s));
161	}
162	ds->ds_refs->dr_source_wref = _dispatch_ptr2wref(ds);
163
164	// First item on the queue sets the user-specified target queue
165	dispatch_set_target_queue(ds, q);
166	_dispatch_object_debug(ds, "%s", __func__);
167	return ds;
168}
169
170void
171_dispatch_source_dispose(dispatch_source_t ds)
172{
173	_dispatch_object_debug(ds, "%s", __func__);
174	free(ds->ds_refs);
175	_dispatch_queue_destroy(ds);
176}
177
178void
179_dispatch_source_xref_dispose(dispatch_source_t ds)
180{
181	_dispatch_wakeup(ds);
182}
183
184void
185dispatch_source_cancel(dispatch_source_t ds)
186{
187	_dispatch_object_debug(ds, "%s", __func__);
188	// Right after we set the cancel flag, someone else
189	// could potentially invoke the source, do the cancelation,
190	// unregister the source, and deallocate it. We would
191	// need to therefore retain/release before setting the bit
192
193	_dispatch_retain(ds);
194	(void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_CANCELED, relaxed);
195	_dispatch_wakeup(ds);
196	_dispatch_release(ds);
197}
198
199long
200dispatch_source_testcancel(dispatch_source_t ds)
201{
202	return (bool)(ds->ds_atomic_flags & DSF_CANCELED);
203}
204
205unsigned long
206dispatch_source_get_mask(dispatch_source_t ds)
207{
208	unsigned long mask = ds->ds_pending_data_mask;
209	if (ds->ds_vmpressure_override) {
210		mask = NOTE_VM_PRESSURE;
211	}
212#if TARGET_IPHONE_SIMULATOR
213	else if (ds->ds_memorystatus_override) {
214		mask = NOTE_MEMORYSTATUS_PRESSURE_WARN;
215	}
216#endif
217	return mask;
218}
219
220uintptr_t
221dispatch_source_get_handle(dispatch_source_t ds)
222{
223	unsigned int handle = (unsigned int)ds->ds_ident_hack;
224#if TARGET_IPHONE_SIMULATOR
225	if (ds->ds_memorystatus_override) {
226		handle = 0;
227	}
228#endif
229	return handle;
230}
231
232unsigned long
233dispatch_source_get_data(dispatch_source_t ds)
234{
235	unsigned long data = ds->ds_data;
236	if (ds->ds_vmpressure_override) {
237		data = NOTE_VM_PRESSURE;
238	}
239#if TARGET_IPHONE_SIMULATOR
240	else if (ds->ds_memorystatus_override) {
241		data = NOTE_MEMORYSTATUS_PRESSURE_WARN;
242	}
243#endif
244	return data;
245}
246
247void
248dispatch_source_merge_data(dispatch_source_t ds, unsigned long val)
249{
250	struct kevent64_s kev = {
251		.fflags = (typeof(kev.fflags))val,
252		.data = (typeof(kev.data))val,
253	};
254
255	dispatch_assert(
256			ds->ds_dkev->dk_kevent.filter == DISPATCH_EVFILT_CUSTOM_ADD ||
257			ds->ds_dkev->dk_kevent.filter == DISPATCH_EVFILT_CUSTOM_OR);
258
259	_dispatch_source_merge_kevent(ds, &kev);
260}
261
262#pragma mark -
263#pragma mark dispatch_source_handler
264
265DISPATCH_ALWAYS_INLINE
266static inline dispatch_continuation_t
267_dispatch_source_handler_alloc(dispatch_source_t ds, void *handler, long kind,
268		bool block)
269{
270	dispatch_continuation_t dc = _dispatch_continuation_alloc();
271	if (handler) {
272		dc->do_vtable = (void *)((block ? DISPATCH_OBJ_BLOCK_RELEASE_BIT :
273				DISPATCH_OBJ_CTXT_FETCH_BIT) | (kind != DS_EVENT_HANDLER ?
274				DISPATCH_OBJ_ASYNC_BIT : 0l));
275		dc->dc_priority = 0;
276		dc->dc_voucher = NULL;
277		if (block) {
278#ifdef __BLOCKS__
279			if (slowpath(_dispatch_block_has_private_data(handler))) {
280				// sources don't propagate priority by default
281				dispatch_block_flags_t flags = DISPATCH_BLOCK_NO_QOS_CLASS;
282				flags |= _dispatch_block_get_flags(handler);
283				_dispatch_continuation_priority_set(dc,
284						_dispatch_block_get_priority(handler), flags);
285			}
286			if (kind != DS_EVENT_HANDLER) {
287				dc->dc_func = _dispatch_call_block_and_release;
288			} else {
289				dc->dc_func = _dispatch_Block_invoke(handler);
290			}
291			dc->dc_ctxt = _dispatch_Block_copy(handler);
292#endif /* __BLOCKS__ */
293		} else {
294			dc->dc_func = handler;
295			dc->dc_ctxt = ds->do_ctxt;
296		}
297		_dispatch_trace_continuation_push((dispatch_queue_t)ds, dc);
298	} else {
299		dc->dc_func = NULL;
300	}
301	dc->dc_data = (void*)kind;
302	return dc;
303}
304
305static inline void
306_dispatch_source_handler_replace(dispatch_source_refs_t dr, long kind,
307		dispatch_continuation_t dc_new)
308{
309	dispatch_continuation_t dc = dr->ds_handler[kind];
310	if (dc) {
311#ifdef __BLOCKS__
312		if ((long)dc->do_vtable & DISPATCH_OBJ_BLOCK_RELEASE_BIT) {
313			Block_release(dc->dc_ctxt);
314		}
315#endif /* __BLOCKS__ */
316		if (dc->dc_voucher) {
317			_voucher_release(dc->dc_voucher);
318			dc->dc_voucher = NULL;
319		}
320		_dispatch_continuation_free(dc);
321	}
322	dr->ds_handler[kind] = dc_new;
323}
324
325static inline void
326_dispatch_source_handler_free(dispatch_source_refs_t dr, long kind)
327{
328	_dispatch_source_handler_replace(dr, kind, NULL);
329}
330
331static void
332_dispatch_source_set_handler(void *context)
333{
334	dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
335	dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);
336	dispatch_continuation_t dc = context;
337	long kind = (long)dc->dc_data;
338	dc->dc_data = 0;
339	if (!dc->dc_func) {
340		_dispatch_continuation_free(dc);
341		dc = NULL;
342	} else if ((long)dc->do_vtable & DISPATCH_OBJ_CTXT_FETCH_BIT) {
343		dc->dc_ctxt = ds->do_ctxt;
344	}
345	_dispatch_source_handler_replace(ds->ds_refs, kind, dc);
346	if (kind == DS_EVENT_HANDLER && dc && dc->dc_priority) {
347#if HAVE_PTHREAD_WORKQUEUE_QOS
348		ds->dq_priority = dc->dc_priority & ~_PTHREAD_PRIORITY_FLAGS_MASK;
349		_dispatch_queue_set_override_priority((dispatch_queue_t)ds);
350#endif
351	}
352}
353
354#ifdef __BLOCKS__
355void
356dispatch_source_set_event_handler(dispatch_source_t ds,
357		dispatch_block_t handler)
358{
359	dispatch_continuation_t dc;
360	dc = _dispatch_source_handler_alloc(ds, handler, DS_EVENT_HANDLER, true);
361	_dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc,
362			_dispatch_source_set_handler);
363}
364#endif /* __BLOCKS__ */
365
366void
367dispatch_source_set_event_handler_f(dispatch_source_t ds,
368		dispatch_function_t handler)
369{
370	dispatch_continuation_t dc;
371	dc = _dispatch_source_handler_alloc(ds, handler, DS_EVENT_HANDLER, false);
372	_dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc,
373			_dispatch_source_set_handler);
374}
375
376void
377_dispatch_source_set_event_handler_with_context_f(dispatch_source_t ds,
378		void *ctxt, dispatch_function_t handler)
379{
380	dispatch_continuation_t dc;
381	dc = _dispatch_source_handler_alloc(ds, handler, DS_EVENT_HANDLER, false);
382	dc->do_vtable = (void *)((long)dc->do_vtable &~DISPATCH_OBJ_CTXT_FETCH_BIT);
383	dc->dc_other = dc->dc_ctxt;
384	dc->dc_ctxt = ctxt;
385	_dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc,
386			_dispatch_source_set_handler);
387}
388
389#ifdef __BLOCKS__
390void
391dispatch_source_set_cancel_handler(dispatch_source_t ds,
392		dispatch_block_t handler)
393{
394	dispatch_continuation_t dc;
395	dc = _dispatch_source_handler_alloc(ds, handler, DS_CANCEL_HANDLER, true);
396	_dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc,
397			_dispatch_source_set_handler);
398}
399#endif /* __BLOCKS__ */
400
401void
402dispatch_source_set_cancel_handler_f(dispatch_source_t ds,
403		dispatch_function_t handler)
404{
405	dispatch_continuation_t dc;
406	dc = _dispatch_source_handler_alloc(ds, handler, DS_CANCEL_HANDLER, false);
407	_dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc,
408			_dispatch_source_set_handler);
409}
410
411#ifdef __BLOCKS__
412void
413dispatch_source_set_registration_handler(dispatch_source_t ds,
414		dispatch_block_t handler)
415{
416	dispatch_continuation_t dc;
417	dc = _dispatch_source_handler_alloc(ds, handler, DS_REGISTN_HANDLER, true);
418	_dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc,
419			_dispatch_source_set_handler);
420}
421#endif /* __BLOCKS__ */
422
423void
424dispatch_source_set_registration_handler_f(dispatch_source_t ds,
425	dispatch_function_t handler)
426{
427	dispatch_continuation_t dc;
428	dc = _dispatch_source_handler_alloc(ds, handler, DS_REGISTN_HANDLER, false);
429	_dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc,
430			_dispatch_source_set_handler);
431}
432
433#pragma mark -
434#pragma mark dispatch_source_invoke
435
436static void
437_dispatch_source_registration_callout(dispatch_source_t ds)
438{
439	dispatch_source_refs_t dr = ds->ds_refs;
440	dispatch_continuation_t dc = dr->ds_handler[DS_REGISTN_HANDLER];
441	if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)) {
442		// no registration callout if source is canceled rdar://problem/8955246
443		return _dispatch_source_handler_free(dr, DS_REGISTN_HANDLER);
444	}
445	pthread_priority_t old_dp = _dispatch_set_defaultpriority(ds->dq_priority);
446	if ((long)dc->do_vtable & DISPATCH_OBJ_CTXT_FETCH_BIT) {
447		dc->dc_ctxt = ds->do_ctxt;
448	}
449	_dispatch_continuation_pop(dc);
450	dr->ds_handler[DS_REGISTN_HANDLER] = NULL;
451	_dispatch_reset_defaultpriority(old_dp);
452}
453
454static void
455_dispatch_source_cancel_callout(dispatch_source_t ds)
456{
457	dispatch_source_refs_t dr = ds->ds_refs;
458	dispatch_continuation_t dc = dr->ds_handler[DS_CANCEL_HANDLER];
459	ds->ds_pending_data_mask = 0;
460	ds->ds_pending_data = 0;
461	ds->ds_data = 0;
462	_dispatch_source_handler_free(dr, DS_EVENT_HANDLER);
463	_dispatch_source_handler_free(dr, DS_REGISTN_HANDLER);
464	if (!dc) {
465		return;
466	}
467	if (!(ds->ds_atomic_flags & DSF_CANCELED)) {
468		return _dispatch_source_handler_free(dr, DS_CANCEL_HANDLER);
469	}
470	pthread_priority_t old_dp = _dispatch_set_defaultpriority(ds->dq_priority);
471	if ((long)dc->do_vtable & DISPATCH_OBJ_CTXT_FETCH_BIT) {
472		dc->dc_ctxt = ds->do_ctxt;
473	}
474	_dispatch_continuation_pop(dc);
475	dr->ds_handler[DS_CANCEL_HANDLER] = NULL;
476	_dispatch_reset_defaultpriority(old_dp);
477}
478
479static void
480_dispatch_source_latch_and_call(dispatch_source_t ds)
481{
482	unsigned long prev;
483
484	if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)) {
485		return;
486	}
487	dispatch_source_refs_t dr = ds->ds_refs;
488	dispatch_continuation_t dc = dr->ds_handler[DS_EVENT_HANDLER];
489	prev = dispatch_atomic_xchg2o(ds, ds_pending_data, 0, relaxed);
490	if (ds->ds_is_level) {
491		ds->ds_data = ~prev;
492	} else if (ds->ds_is_timer && ds_timer(dr).target && prev) {
493		ds->ds_data = _dispatch_source_timer_data(dr, prev);
494	} else {
495		ds->ds_data = prev;
496	}
497	if (!dispatch_assume(prev) || !dc) {
498		return;
499	}
500	pthread_priority_t old_dp = _dispatch_set_defaultpriority(ds->dq_priority);
501	_dispatch_trace_continuation_pop(_dispatch_queue_get_current(), dc);
502	voucher_t voucher = dc->dc_voucher ? _voucher_retain(dc->dc_voucher) : NULL;
503	_dispatch_continuation_voucher_adopt(dc); // consumes voucher reference
504	_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
505	_dispatch_introspection_queue_item_complete(dc);
506	if (voucher) dc->dc_voucher = voucher;
507	_dispatch_reset_defaultpriority(old_dp);
508}
509
510static void
511_dispatch_source_kevent_unregister(dispatch_source_t ds)
512{
513	_dispatch_object_debug(ds, "%s", __func__);
514	dispatch_kevent_t dk = ds->ds_dkev;
515	ds->ds_dkev = NULL;
516	switch (dk->dk_kevent.filter) {
517	case DISPATCH_EVFILT_TIMER:
518		_dispatch_timers_unregister(ds, dk);
519		break;
520	default:
521		TAILQ_REMOVE(&dk->dk_sources, ds->ds_refs, dr_list);
522		_dispatch_kevent_unregister(dk, (uint32_t)ds->ds_pending_data_mask);
523		break;
524	}
525
526	(void)dispatch_atomic_and2o(ds, ds_atomic_flags, ~DSF_ARMED, relaxed);
527	ds->ds_needs_rearm = false; // re-arm is pointless and bad now
528	_dispatch_release(ds); // the retain is done at creation time
529}
530
531static void
532_dispatch_source_kevent_resume(dispatch_source_t ds, uint32_t new_flags)
533{
534	switch (ds->ds_dkev->dk_kevent.filter) {
535	case DISPATCH_EVFILT_TIMER:
536		return _dispatch_timers_update(ds);
537	case EVFILT_MACHPORT:
538		if (ds->ds_pending_data_mask & DISPATCH_MACH_RECV_MESSAGE) {
539			new_flags |= DISPATCH_MACH_RECV_MESSAGE; // emulate EV_DISPATCH
540		}
541		break;
542	}
543	if (_dispatch_kevent_resume(ds->ds_dkev, new_flags, 0)) {
544		_dispatch_source_kevent_unregister(ds);
545	}
546}
547
548static void
549_dispatch_source_kevent_register(dispatch_source_t ds)
550{
551	dispatch_assert_zero(ds->ds_is_installed);
552	switch (ds->ds_dkev->dk_kevent.filter) {
553	case DISPATCH_EVFILT_TIMER:
554		return _dispatch_timers_update(ds);
555	}
556	uint32_t flags;
557	bool do_resume = _dispatch_kevent_register(&ds->ds_dkev, &flags);
558	TAILQ_INSERT_TAIL(&ds->ds_dkev->dk_sources, ds->ds_refs, dr_list);
559	if (do_resume || ds->ds_needs_rearm) {
560		_dispatch_source_kevent_resume(ds, flags);
561	}
562	(void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, relaxed);
563	_dispatch_object_debug(ds, "%s", __func__);
564}
565
566DISPATCH_ALWAYS_INLINE
567static inline dispatch_queue_t
568_dispatch_source_invoke2(dispatch_object_t dou,
569		_dispatch_thread_semaphore_t *sema_ptr DISPATCH_UNUSED)
570{
571	dispatch_source_t ds = dou._ds;
572	if (slowpath(_dispatch_queue_drain(ds))) {
573		DISPATCH_CLIENT_CRASH("Sync onto source");
574	}
575
576	// This function performs all source actions. Each action is responsible
577	// for verifying that it takes place on the appropriate queue. If the
578	// current queue is not the correct queue for this action, the correct queue
579	// will be returned and the invoke will be re-driven on that queue.
580
581	// The order of tests here in invoke and in probe should be consistent.
582
583	dispatch_queue_t dq = _dispatch_queue_get_current();
584	dispatch_source_refs_t dr = ds->ds_refs;
585
586	if (!ds->ds_is_installed) {
587		// The source needs to be installed on the manager queue.
588		if (dq != &_dispatch_mgr_q) {
589			return &_dispatch_mgr_q;
590		}
591		_dispatch_source_kevent_register(ds);
592		ds->ds_is_installed = true;
593		if (dr->ds_handler[DS_REGISTN_HANDLER]) {
594			return ds->do_targetq;
595		}
596		if (slowpath(ds->do_xref_cnt == -1)) {
597			return &_dispatch_mgr_q; // rdar://problem/9558246
598		}
599	} else if (slowpath(DISPATCH_OBJECT_SUSPENDED(ds))) {
600		// Source suspended by an item drained from the source queue.
601		return NULL;
602	} else if (dr->ds_handler[DS_REGISTN_HANDLER]) {
603		// The source has been registered and the registration handler needs
604		// to be delivered on the target queue.
605		if (dq != ds->do_targetq) {
606			return ds->do_targetq;
607		}
608		// clears ds_registration_handler
609		_dispatch_source_registration_callout(ds);
610		if (slowpath(ds->do_xref_cnt == -1)) {
611			return &_dispatch_mgr_q; // rdar://problem/9558246
612		}
613	} else if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)){
614		// The source has been cancelled and needs to be uninstalled from the
615		// manager queue. After uninstallation, the cancellation handler needs
616		// to be delivered to the target queue.
617		if (ds->ds_dkev) {
618			if (dq != &_dispatch_mgr_q) {
619				return &_dispatch_mgr_q;
620			}
621			_dispatch_source_kevent_unregister(ds);
622		}
623		if (dr->ds_handler[DS_EVENT_HANDLER] ||
624				dr->ds_handler[DS_CANCEL_HANDLER] ||
625				dr->ds_handler[DS_REGISTN_HANDLER]) {
626			if (dq != ds->do_targetq) {
627				return ds->do_targetq;
628			}
629		}
630		_dispatch_source_cancel_callout(ds);
631	} else if (ds->ds_pending_data) {
632		// The source has pending data to deliver via the event handler callback
633		// on the target queue. Some sources need to be rearmed on the manager
634		// queue after event delivery.
635		if (dq != ds->do_targetq) {
636			return ds->do_targetq;
637		}
638		_dispatch_source_latch_and_call(ds);
639		if (ds->ds_needs_rearm) {
640			return &_dispatch_mgr_q;
641		}
642	} else if (ds->ds_needs_rearm && !(ds->ds_atomic_flags & DSF_ARMED)) {
643		// The source needs to be rearmed on the manager queue.
644		if (dq != &_dispatch_mgr_q) {
645			return &_dispatch_mgr_q;
646		}
647		_dispatch_source_kevent_resume(ds, 0);
648		(void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, relaxed);
649	}
650
651	return NULL;
652}
653
654DISPATCH_NOINLINE
655void
656_dispatch_source_invoke(dispatch_source_t ds)
657{
658	_dispatch_queue_class_invoke(ds, _dispatch_source_invoke2);
659}
660
661unsigned long
662_dispatch_source_probe(dispatch_source_t ds)
663{
664	// This function determines whether the source needs to be invoked.
665	// The order of tests here in probe and in invoke should be consistent.
666
667	dispatch_source_refs_t dr = ds->ds_refs;
668	if (!ds->ds_is_installed) {
669		// The source needs to be installed on the manager queue.
670		return true;
671	} else if (dr->ds_handler[DS_REGISTN_HANDLER]) {
672		// The registration handler needs to be delivered to the target queue.
673		return true;
674	} else if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)){
675		// The source needs to be uninstalled from the manager queue, or the
676		// cancellation handler needs to be delivered to the target queue.
677		// Note: cancellation assumes installation.
678		if (ds->ds_dkev || dr->ds_handler[DS_EVENT_HANDLER] ||
679				dr->ds_handler[DS_CANCEL_HANDLER] ||
680				dr->ds_handler[DS_REGISTN_HANDLER]) {
681			return true;
682		}
683	} else if (ds->ds_pending_data) {
684		// The source has pending data to deliver to the target queue.
685		return true;
686	} else if (ds->ds_needs_rearm && !(ds->ds_atomic_flags & DSF_ARMED)) {
687		// The source needs to be rearmed on the manager queue.
688		return true;
689	}
690	return _dispatch_queue_class_probe(ds);
691}
692
693static void
694_dispatch_source_merge_kevent(dispatch_source_t ds, const struct kevent64_s *ke)
695{
696	if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)) {
697		return;
698	}
699	if (ds->ds_is_level) {
700		// ke->data is signed and "negative available data" makes no sense
701		// zero bytes happens when EV_EOF is set
702		// 10A268 does not fail this assert with EVFILT_READ and a 10 GB file
703		dispatch_assert(ke->data >= 0l);
704		dispatch_atomic_store2o(ds, ds_pending_data, ~(unsigned long)ke->data,
705				relaxed);
706	} else if (ds->ds_is_adder) {
707		(void)dispatch_atomic_add2o(ds, ds_pending_data,
708				(unsigned long)ke->data, relaxed);
709	} else if (ke->fflags & ds->ds_pending_data_mask) {
710		(void)dispatch_atomic_or2o(ds, ds_pending_data,
711				ke->fflags & ds->ds_pending_data_mask, relaxed);
712	}
713	// EV_DISPATCH and EV_ONESHOT sources are no longer armed after delivery
714	if (ds->ds_needs_rearm) {
715		(void)dispatch_atomic_and2o(ds, ds_atomic_flags, ~DSF_ARMED, relaxed);
716	}
717
718	_dispatch_wakeup(ds);
719}
720
721#pragma mark -
722#pragma mark dispatch_kevent_t
723
724#if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
725static void _dispatch_kevent_guard(dispatch_kevent_t dk);
726static void _dispatch_kevent_unguard(dispatch_kevent_t dk);
727#else
728static inline void _dispatch_kevent_guard(dispatch_kevent_t dk) { (void)dk; }
729static inline void _dispatch_kevent_unguard(dispatch_kevent_t dk) { (void)dk; }
730#endif
731
732static struct dispatch_kevent_s _dispatch_kevent_data_or = {
733	.dk_kevent = {
734		.filter = DISPATCH_EVFILT_CUSTOM_OR,
735		.flags = EV_CLEAR,
736	},
737	.dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_or.dk_sources),
738};
739static struct dispatch_kevent_s _dispatch_kevent_data_add = {
740	.dk_kevent = {
741		.filter = DISPATCH_EVFILT_CUSTOM_ADD,
742	},
743	.dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_add.dk_sources),
744};
745
746#define DSL_HASH(x) ((x) & (DSL_HASH_SIZE - 1))
747
748DISPATCH_CACHELINE_ALIGN
749static TAILQ_HEAD(, dispatch_kevent_s) _dispatch_sources[DSL_HASH_SIZE];
750
751static void
752_dispatch_kevent_init()
753{
754	unsigned int i;
755	for (i = 0; i < DSL_HASH_SIZE; i++) {
756		TAILQ_INIT(&_dispatch_sources[i]);
757	}
758
759	TAILQ_INSERT_TAIL(&_dispatch_sources[0],
760			&_dispatch_kevent_data_or, dk_list);
761	TAILQ_INSERT_TAIL(&_dispatch_sources[0],
762			&_dispatch_kevent_data_add, dk_list);
763	_dispatch_kevent_data_or.dk_kevent.udata =
764			(uintptr_t)&_dispatch_kevent_data_or;
765	_dispatch_kevent_data_add.dk_kevent.udata =
766			(uintptr_t)&_dispatch_kevent_data_add;
767}
768
769static inline uintptr_t
770_dispatch_kevent_hash(uint64_t ident, short filter)
771{
772	uint64_t value;
773#if HAVE_MACH
774	value = (filter == EVFILT_MACHPORT ||
775			filter == DISPATCH_EVFILT_MACH_NOTIFICATION ?
776			MACH_PORT_INDEX(ident) : ident);
777#else
778	value = ident;
779#endif
780	return DSL_HASH((uintptr_t)value);
781}
782
783static dispatch_kevent_t
784_dispatch_kevent_find(uint64_t ident, short filter)
785{
786	uintptr_t hash = _dispatch_kevent_hash(ident, filter);
787	dispatch_kevent_t dki;
788
789	TAILQ_FOREACH(dki, &_dispatch_sources[hash], dk_list) {
790		if (dki->dk_kevent.ident == ident && dki->dk_kevent.filter == filter) {
791			break;
792		}
793	}
794	return dki;
795}
796
797static void
798_dispatch_kevent_insert(dispatch_kevent_t dk)
799{
800	_dispatch_kevent_guard(dk);
801	uintptr_t hash = _dispatch_kevent_hash(dk->dk_kevent.ident,
802			dk->dk_kevent.filter);
803	TAILQ_INSERT_TAIL(&_dispatch_sources[hash], dk, dk_list);
804}
805
806// Find existing kevents, and merge any new flags if necessary
807static bool
808_dispatch_kevent_register(dispatch_kevent_t *dkp, uint32_t *flgp)
809{
810	dispatch_kevent_t dk, ds_dkev = *dkp;
811	uint32_t new_flags;
812	bool do_resume = false;
813
814	dk = _dispatch_kevent_find(ds_dkev->dk_kevent.ident,
815			ds_dkev->dk_kevent.filter);
816	if (dk) {
817		// If an existing dispatch kevent is found, check to see if new flags
818		// need to be added to the existing kevent
819		new_flags = ~dk->dk_kevent.fflags & ds_dkev->dk_kevent.fflags;
820		dk->dk_kevent.fflags |= ds_dkev->dk_kevent.fflags;
821		free(ds_dkev);
822		*dkp = dk;
823		do_resume = new_flags;
824	} else {
825		dk = ds_dkev;
826		_dispatch_kevent_insert(dk);
827		new_flags = dk->dk_kevent.fflags;
828		do_resume = true;
829	}
830	// Re-register the kevent with the kernel if new flags were added
831	// by the dispatch kevent
832	if (do_resume) {
833		dk->dk_kevent.flags |= EV_ADD;
834	}
835	*flgp = new_flags;
836	return do_resume;
837}
838
839static bool
840_dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags,
841		uint32_t del_flags)
842{
843	long r;
844	switch (dk->dk_kevent.filter) {
845	case DISPATCH_EVFILT_TIMER:
846	case DISPATCH_EVFILT_CUSTOM_ADD:
847	case DISPATCH_EVFILT_CUSTOM_OR:
848		// these types not registered with kevent
849		return 0;
850#if HAVE_MACH
851	case EVFILT_MACHPORT:
852		return _dispatch_kevent_machport_resume(dk, new_flags, del_flags);
853	case DISPATCH_EVFILT_MACH_NOTIFICATION:
854		return _dispatch_kevent_mach_notify_resume(dk, new_flags, del_flags);
855#endif
856	case EVFILT_PROC:
857		if (dk->dk_kevent.flags & EV_ONESHOT) {
858			return 0;
859		}
860		// fall through
861	default:
862		r = _dispatch_kq_update(&dk->dk_kevent);
863		if (dk->dk_kevent.flags & EV_DISPATCH) {
864			dk->dk_kevent.flags &= ~EV_ADD;
865		}
866		return r;
867	}
868}
869
870static void
871_dispatch_kevent_dispose(dispatch_kevent_t dk)
872{
873	uintptr_t hash;
874
875	switch (dk->dk_kevent.filter) {
876	case DISPATCH_EVFILT_TIMER:
877	case DISPATCH_EVFILT_CUSTOM_ADD:
878	case DISPATCH_EVFILT_CUSTOM_OR:
879		// these sources live on statically allocated lists
880		return;
881#if HAVE_MACH
882	case EVFILT_MACHPORT:
883		_dispatch_kevent_machport_resume(dk, 0, dk->dk_kevent.fflags);
884		break;
885	case DISPATCH_EVFILT_MACH_NOTIFICATION:
886		_dispatch_kevent_mach_notify_resume(dk, 0, dk->dk_kevent.fflags);
887		break;
888#endif
889	case EVFILT_PROC:
890		if (dk->dk_kevent.flags & EV_ONESHOT) {
891			break; // implicitly deleted
892		}
893		// fall through
894	default:
895		if (~dk->dk_kevent.flags & EV_DELETE) {
896			dk->dk_kevent.flags |= EV_DELETE;
897			dk->dk_kevent.flags &= ~(EV_ADD|EV_ENABLE);
898			_dispatch_kq_update(&dk->dk_kevent);
899		}
900		break;
901	}
902
903	hash = _dispatch_kevent_hash(dk->dk_kevent.ident,
904			dk->dk_kevent.filter);
905	TAILQ_REMOVE(&_dispatch_sources[hash], dk, dk_list);
906	_dispatch_kevent_unguard(dk);
907	free(dk);
908}
909
910static void
911_dispatch_kevent_unregister(dispatch_kevent_t dk, uint32_t flg)
912{
913	dispatch_source_refs_t dri;
914	uint32_t del_flags, fflags = 0;
915
916	if (TAILQ_EMPTY(&dk->dk_sources)) {
917		_dispatch_kevent_dispose(dk);
918	} else {
919		TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) {
920			dispatch_source_t dsi = _dispatch_source_from_refs(dri);
921			uint32_t mask = (uint32_t)dsi->ds_pending_data_mask;
922			fflags |= mask;
923		}
924		del_flags = flg & ~fflags;
925		if (del_flags) {
926			dk->dk_kevent.flags |= EV_ADD;
927			dk->dk_kevent.fflags = fflags;
928			_dispatch_kevent_resume(dk, 0, del_flags);
929		}
930	}
931}
932
933DISPATCH_NOINLINE
934static void
935_dispatch_kevent_proc_exit(struct kevent64_s *ke)
936{
937	// EVFILT_PROC may fail with ESRCH when the process exists but is a zombie
938	// <rdar://problem/5067725>. As a workaround, we simulate an exit event for
939	// any EVFILT_PROC with an invalid pid <rdar://problem/6626350>.
940	struct kevent64_s fake;
941	fake = *ke;
942	fake.flags &= ~EV_ERROR;
943	fake.fflags = NOTE_EXIT;
944	fake.data = 0;
945	_dispatch_kevent_drain(&fake);
946}
947
948DISPATCH_NOINLINE
949static void
950_dispatch_kevent_error(struct kevent64_s *ke)
951{
952	_dispatch_kevent_debug(ke, __func__);
953	if (ke->data) {
954		// log the unexpected error
955		_dispatch_bug_kevent_client("kevent", _evfiltstr(ke->filter),
956				ke->flags & EV_DELETE ? "delete" :
957				ke->flags & EV_ADD ? "add" :
958				ke->flags & EV_ENABLE ? "enable" : "monitor",
959				(int)ke->data);
960	}
961}
962
963static void
964_dispatch_kevent_drain(struct kevent64_s *ke)
965{
966#if DISPATCH_DEBUG
967	static dispatch_once_t pred;
968	dispatch_once_f(&pred, NULL, _dispatch_kevent_debugger);
969#endif
970	if (ke->filter == EVFILT_USER) {
971		return;
972	}
973	if (slowpath(ke->flags & EV_ERROR)) {
974		if (ke->filter == EVFILT_PROC) {
975			if (ke->flags & EV_DELETE) {
976				// Process exited while monitored
977				return;
978			} else if (ke->data == ESRCH) {
979				return _dispatch_kevent_proc_exit(ke);
980			}
981		}
982		return _dispatch_kevent_error(ke);
983	}
984	_dispatch_kevent_debug(ke, __func__);
985	if (ke->filter == EVFILT_TIMER) {
986		return _dispatch_timers_kevent(ke);
987	}
988#if HAVE_MACH
989	if (ke->filter == EVFILT_MACHPORT) {
990		return _dispatch_kevent_mach_portset(ke);
991	}
992#endif
993	return _dispatch_kevent_merge(ke);
994}
995
996DISPATCH_NOINLINE
997static void
998_dispatch_kevent_merge(struct kevent64_s *ke)
999{
1000	dispatch_kevent_t dk;
1001	dispatch_source_refs_t dri;
1002
1003	dk = (void*)ke->udata;
1004	dispatch_assert(dk);
1005
1006	if (ke->flags & EV_ONESHOT) {
1007		dk->dk_kevent.flags |= EV_ONESHOT;
1008	}
1009	TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) {
1010		_dispatch_source_merge_kevent(_dispatch_source_from_refs(dri), ke);
1011	}
1012}
1013
1014#if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
1015static void
1016_dispatch_kevent_guard(dispatch_kevent_t dk)
1017{
1018	guardid_t guard;
1019	const unsigned int guard_flags = GUARD_CLOSE;
1020	int r, fd_flags = 0;
1021	switch (dk->dk_kevent.filter) {
1022	case EVFILT_READ:
1023	case EVFILT_WRITE:
1024	case EVFILT_VNODE:
1025		guard = &dk->dk_kevent;
1026		r = change_fdguard_np((int)dk->dk_kevent.ident, NULL, 0,
1027				&guard, guard_flags, &fd_flags);
1028		if (slowpath(r == -1)) {
1029			int err = errno;
1030			if (err != EPERM) {
1031				(void)dispatch_assume_zero(err);
1032			}
1033			return;
1034		}
1035		dk->dk_kevent.ext[0] = guard_flags;
1036		dk->dk_kevent.ext[1] = fd_flags;
1037		break;
1038	}
1039}
1040
1041static void
1042_dispatch_kevent_unguard(dispatch_kevent_t dk)
1043{
1044	guardid_t guard;
1045	unsigned int guard_flags;
1046	int r, fd_flags;
1047	switch (dk->dk_kevent.filter) {
1048	case EVFILT_READ:
1049	case EVFILT_WRITE:
1050	case EVFILT_VNODE:
1051		guard_flags = (unsigned int)dk->dk_kevent.ext[0];
1052		if (!guard_flags) {
1053			return;
1054		}
1055		guard = &dk->dk_kevent;
1056		fd_flags = (int)dk->dk_kevent.ext[1];
1057		r = change_fdguard_np((int)dk->dk_kevent.ident, &guard,
1058				guard_flags, NULL, 0, &fd_flags);
1059		if (slowpath(r == -1)) {
1060			(void)dispatch_assume_zero(errno);
1061			return;
1062		}
1063		dk->dk_kevent.ext[0] = 0;
1064		break;
1065	}
1066}
1067#endif // DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
1068
1069#pragma mark -
1070#pragma mark dispatch_source_timer
1071
1072#if DISPATCH_USE_DTRACE
1073static dispatch_source_refs_t
1074		_dispatch_trace_next_timer[DISPATCH_TIMER_QOS_COUNT];
1075#define _dispatch_trace_next_timer_set(x, q) \
1076		_dispatch_trace_next_timer[(q)] = (x)
1077#define _dispatch_trace_next_timer_program(d, q) \
1078		_dispatch_trace_timer_program(_dispatch_trace_next_timer[(q)], (d))
1079#define _dispatch_trace_next_timer_wake(q) \
1080		_dispatch_trace_timer_wake(_dispatch_trace_next_timer[(q)])
1081#else
1082#define _dispatch_trace_next_timer_set(x, q)
1083#define _dispatch_trace_next_timer_program(d, q)
1084#define _dispatch_trace_next_timer_wake(q)
1085#endif
1086
1087#define _dispatch_source_timer_telemetry_enabled() false
1088
1089DISPATCH_NOINLINE
1090static void
1091_dispatch_source_timer_telemetry_slow(dispatch_source_t ds,
1092		uintptr_t ident, struct dispatch_timer_source_s *values)
1093{
1094	if (_dispatch_trace_timer_configure_enabled()) {
1095		_dispatch_trace_timer_configure(ds, ident, values);
1096	}
1097}
1098
1099DISPATCH_ALWAYS_INLINE
1100static inline void
1101_dispatch_source_timer_telemetry(dispatch_source_t ds, uintptr_t ident,
1102		struct dispatch_timer_source_s *values)
1103{
1104	if (_dispatch_trace_timer_configure_enabled() ||
1105			_dispatch_source_timer_telemetry_enabled()) {
1106		_dispatch_source_timer_telemetry_slow(ds, ident, values);
1107		asm(""); // prevent tailcall
1108	}
1109}
1110
1111// approx 1 year (60s * 60m * 24h * 365d)
1112#define FOREVER_NSEC 31536000000000000ull
1113
1114DISPATCH_ALWAYS_INLINE
1115static inline uint64_t
1116_dispatch_source_timer_now(uint64_t nows[], unsigned int tidx)
1117{
1118	unsigned int tk = DISPATCH_TIMER_KIND(tidx);
1119	if (nows && fastpath(nows[tk])) {
1120		return nows[tk];
1121	}
1122	uint64_t now;
1123	switch (tk) {
1124	case DISPATCH_TIMER_KIND_MACH:
1125		now = _dispatch_absolute_time();
1126		break;
1127	case DISPATCH_TIMER_KIND_WALL:
1128		now = _dispatch_get_nanoseconds();
1129		break;
1130	}
1131	if (nows) {
1132		nows[tk] = now;
1133	}
1134	return now;
1135}
1136
1137static inline unsigned long
1138_dispatch_source_timer_data(dispatch_source_refs_t dr, unsigned long prev)
1139{
1140	// calculate the number of intervals since last fire
1141	unsigned long data, missed;
1142	uint64_t now;
1143	now = _dispatch_source_timer_now(NULL, _dispatch_source_timer_idx(dr));
1144	missed = (unsigned long)((now - ds_timer(dr).last_fire) /
1145			ds_timer(dr).interval);
1146	// correct for missed intervals already delivered last time
1147	data = prev - ds_timer(dr).missed + missed;
1148	ds_timer(dr).missed = missed;
1149	return data;
1150}
1151
1152struct dispatch_set_timer_params {
1153	dispatch_source_t ds;
1154	uintptr_t ident;
1155	struct dispatch_timer_source_s values;
1156};
1157
1158static void
1159_dispatch_source_set_timer3(void *context)
1160{
1161	// Called on the _dispatch_mgr_q
1162	struct dispatch_set_timer_params *params = context;
1163	dispatch_source_t ds = params->ds;
1164	ds->ds_ident_hack = params->ident;
1165	ds_timer(ds->ds_refs) = params->values;
1166	// Clear any pending data that might have accumulated on
1167	// older timer params <rdar://problem/8574886>
1168	ds->ds_pending_data = 0;
1169	// Re-arm in case we got disarmed because of pending set_timer suspension
1170	(void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, release);
1171	dispatch_resume(ds);
1172	// Must happen after resume to avoid getting disarmed due to suspension
1173	_dispatch_timers_update(ds);
1174	dispatch_release(ds);
1175	if (params->values.flags & DISPATCH_TIMER_WALL_CLOCK) {
1176		_dispatch_mach_host_calendar_change_register();
1177	}
1178	free(params);
1179}
1180
1181static void
1182_dispatch_source_set_timer2(void *context)
1183{
1184	// Called on the source queue
1185	struct dispatch_set_timer_params *params = context;
1186	dispatch_suspend(params->ds);
1187	_dispatch_barrier_async_detached_f(&_dispatch_mgr_q, params,
1188			_dispatch_source_set_timer3);
1189}
1190
1191DISPATCH_NOINLINE
1192static struct dispatch_set_timer_params *
1193_dispatch_source_timer_params(dispatch_source_t ds, dispatch_time_t start,
1194		uint64_t interval, uint64_t leeway)
1195{
1196	struct dispatch_set_timer_params *params;
1197	params = _dispatch_calloc(1ul, sizeof(struct dispatch_set_timer_params));
1198	params->ds = ds;
1199	params->values.flags = ds_timer(ds->ds_refs).flags;
1200
1201	if (interval == 0) {
1202		// we use zero internally to mean disabled
1203		interval = 1;
1204	} else if ((int64_t)interval < 0) {
1205		// 6866347 - make sure nanoseconds won't overflow
1206		interval = INT64_MAX;
1207	}
1208	if ((int64_t)leeway < 0) {
1209		leeway = INT64_MAX;
1210	}
1211	if (start == DISPATCH_TIME_NOW) {
1212		start = _dispatch_absolute_time();
1213	} else if (start == DISPATCH_TIME_FOREVER) {
1214		start = INT64_MAX;
1215	}
1216
1217	if ((int64_t)start < 0) {
1218		// wall clock
1219		start = (dispatch_time_t)-((int64_t)start);
1220		params->values.flags |= DISPATCH_TIMER_WALL_CLOCK;
1221	} else {
1222		// absolute clock
1223		interval = _dispatch_time_nano2mach(interval);
1224		if (interval < 1) {
1225			// rdar://problem/7287561 interval must be at least one in
1226			// in order to avoid later division by zero when calculating
1227			// the missed interval count. (NOTE: the wall clock's
1228			// interval is already "fixed" to be 1 or more)
1229			interval = 1;
1230		}
1231		leeway = _dispatch_time_nano2mach(leeway);
1232		params->values.flags &= ~(unsigned long)DISPATCH_TIMER_WALL_CLOCK;
1233	}
1234	params->ident = DISPATCH_TIMER_IDENT(params->values.flags);
1235	params->values.target = start;
1236	params->values.deadline = (start < UINT64_MAX - leeway) ?
1237			start + leeway : UINT64_MAX;
1238	params->values.interval = interval;
1239	params->values.leeway = (interval == INT64_MAX || leeway < interval / 2) ?
1240			leeway : interval / 2;
1241	return params;
1242}
1243
1244DISPATCH_ALWAYS_INLINE
1245static inline void
1246_dispatch_source_set_timer(dispatch_source_t ds, dispatch_time_t start,
1247		uint64_t interval, uint64_t leeway, bool source_sync)
1248{
1249	if (slowpath(!ds->ds_is_timer) ||
1250			slowpath(ds_timer(ds->ds_refs).flags & DISPATCH_TIMER_INTERVAL)) {
1251		DISPATCH_CLIENT_CRASH("Attempt to set timer on a non-timer source");
1252	}
1253
1254	struct dispatch_set_timer_params *params;
1255	params = _dispatch_source_timer_params(ds, start, interval, leeway);
1256
1257	_dispatch_source_timer_telemetry(ds, params->ident, &params->values);
1258	// Suspend the source so that it doesn't fire with pending changes
1259	// The use of suspend/resume requires the external retain/release
1260	dispatch_retain(ds);
1261	if (source_sync) {
1262		return _dispatch_barrier_trysync_f((dispatch_queue_t)ds, params,
1263				_dispatch_source_set_timer2);
1264	} else {
1265		return _dispatch_source_set_timer2(params);
1266	}
1267}
1268
1269void
1270dispatch_source_set_timer(dispatch_source_t ds, dispatch_time_t start,
1271		uint64_t interval, uint64_t leeway)
1272{
1273	_dispatch_source_set_timer(ds, start, interval, leeway, true);
1274}
1275
1276void
1277_dispatch_source_set_runloop_timer_4CF(dispatch_source_t ds,
1278		dispatch_time_t start, uint64_t interval, uint64_t leeway)
1279{
1280	// Don't serialize through the source queue for CF timers <rdar://13833190>
1281	_dispatch_source_set_timer(ds, start, interval, leeway, false);
1282}
1283
1284void
1285_dispatch_source_set_interval(dispatch_source_t ds, uint64_t interval)
1286{
1287	dispatch_source_refs_t dr = ds->ds_refs;
1288	#define NSEC_PER_FRAME (NSEC_PER_SEC/60)
1289	const bool animation = ds_timer(dr).flags & DISPATCH_INTERVAL_UI_ANIMATION;
1290	if (fastpath(interval <= (animation ? FOREVER_NSEC/NSEC_PER_FRAME :
1291			FOREVER_NSEC/NSEC_PER_MSEC))) {
1292		interval *= animation ? NSEC_PER_FRAME : NSEC_PER_MSEC;
1293	} else {
1294		interval = FOREVER_NSEC;
1295	}
1296	interval = _dispatch_time_nano2mach(interval);
1297	uint64_t target = _dispatch_absolute_time() + interval;
1298	target = (target / interval) * interval;
1299	const uint64_t leeway = animation ?
1300			_dispatch_time_nano2mach(NSEC_PER_FRAME) : interval / 2;
1301	ds_timer(dr).target = target;
1302	ds_timer(dr).deadline = target + leeway;
1303	ds_timer(dr).interval = interval;
1304	ds_timer(dr).leeway = leeway;
1305	_dispatch_source_timer_telemetry(ds, ds->ds_ident_hack, &ds_timer(dr));
1306}
1307
1308#pragma mark -
1309#pragma mark dispatch_timers
1310
1311#define DISPATCH_TIMER_STRUCT(refs) \
1312	uint64_t target, deadline; \
1313	TAILQ_HEAD(, refs) dt_sources
1314
1315typedef struct dispatch_timer_s {
1316	DISPATCH_TIMER_STRUCT(dispatch_timer_source_refs_s);
1317} *dispatch_timer_t;
1318
1319#define DISPATCH_TIMER_INITIALIZER(tidx) \
1320	[tidx] = { \
1321		.target = UINT64_MAX, \
1322		.deadline = UINT64_MAX, \
1323		.dt_sources = TAILQ_HEAD_INITIALIZER( \
1324				_dispatch_timer[tidx].dt_sources), \
1325	}
1326#define DISPATCH_TIMER_INIT(kind, qos) \
1327		DISPATCH_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX( \
1328		DISPATCH_TIMER_KIND_##kind, DISPATCH_TIMER_QOS_##qos))
1329
1330struct dispatch_timer_s _dispatch_timer[] =  {
1331	DISPATCH_TIMER_INIT(WALL, NORMAL),
1332	DISPATCH_TIMER_INIT(WALL, CRITICAL),
1333	DISPATCH_TIMER_INIT(WALL, BACKGROUND),
1334	DISPATCH_TIMER_INIT(MACH, NORMAL),
1335	DISPATCH_TIMER_INIT(MACH, CRITICAL),
1336	DISPATCH_TIMER_INIT(MACH, BACKGROUND),
1337};
1338#define DISPATCH_TIMER_COUNT \
1339		((sizeof(_dispatch_timer) / sizeof(_dispatch_timer[0])))
1340
1341#define DISPATCH_KEVENT_TIMER_UDATA(tidx) \
1342		(uintptr_t)&_dispatch_kevent_timer[tidx]
1343#ifdef __LP64__
1344#define DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx) \
1345		.udata = DISPATCH_KEVENT_TIMER_UDATA(tidx)
1346#else // __LP64__
1347// dynamic initialization in _dispatch_timers_init()
1348#define DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx) \
1349		.udata = 0
1350#endif // __LP64__
1351#define DISPATCH_KEVENT_TIMER_INITIALIZER(tidx) \
1352	[tidx] = { \
1353		.dk_kevent = { \
1354			.ident = tidx, \
1355			.filter = DISPATCH_EVFILT_TIMER, \
1356			DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx), \
1357		}, \
1358		.dk_sources = TAILQ_HEAD_INITIALIZER( \
1359				_dispatch_kevent_timer[tidx].dk_sources), \
1360	}
1361#define DISPATCH_KEVENT_TIMER_INIT(kind, qos) \
1362		DISPATCH_KEVENT_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX( \
1363		DISPATCH_TIMER_KIND_##kind, DISPATCH_TIMER_QOS_##qos))
1364
1365struct dispatch_kevent_s _dispatch_kevent_timer[] = {
1366	DISPATCH_KEVENT_TIMER_INIT(WALL, NORMAL),
1367	DISPATCH_KEVENT_TIMER_INIT(WALL, CRITICAL),
1368	DISPATCH_KEVENT_TIMER_INIT(WALL, BACKGROUND),
1369	DISPATCH_KEVENT_TIMER_INIT(MACH, NORMAL),
1370	DISPATCH_KEVENT_TIMER_INIT(MACH, CRITICAL),
1371	DISPATCH_KEVENT_TIMER_INIT(MACH, BACKGROUND),
1372	DISPATCH_KEVENT_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX_DISARM),
1373};
1374#define DISPATCH_KEVENT_TIMER_COUNT \
1375		((sizeof(_dispatch_kevent_timer) / sizeof(_dispatch_kevent_timer[0])))
1376
1377#define DISPATCH_KEVENT_TIMEOUT_IDENT_MASK (~0ull << 8)
1378#define DISPATCH_KEVENT_TIMEOUT_INITIALIZER(qos, note) \
1379	[qos] = { \
1380		.ident = DISPATCH_KEVENT_TIMEOUT_IDENT_MASK|(qos), \
1381		.filter = EVFILT_TIMER, \
1382		.flags = EV_ONESHOT, \
1383		.fflags = NOTE_ABSOLUTE|NOTE_NSECONDS|NOTE_LEEWAY|(note), \
1384	}
1385#define DISPATCH_KEVENT_TIMEOUT_INIT(qos, note) \
1386		DISPATCH_KEVENT_TIMEOUT_INITIALIZER(DISPATCH_TIMER_QOS_##qos, note)
1387
1388struct kevent64_s _dispatch_kevent_timeout[] = {
1389	DISPATCH_KEVENT_TIMEOUT_INIT(NORMAL, 0),
1390	DISPATCH_KEVENT_TIMEOUT_INIT(CRITICAL, NOTE_CRITICAL),
1391	DISPATCH_KEVENT_TIMEOUT_INIT(BACKGROUND, NOTE_BACKGROUND),
1392};
1393
1394#define DISPATCH_KEVENT_COALESCING_WINDOW_INIT(qos, ms) \
1395		[DISPATCH_TIMER_QOS_##qos] = 2ull * (ms) * NSEC_PER_MSEC
1396
1397static const uint64_t _dispatch_kevent_coalescing_window[] = {
1398	DISPATCH_KEVENT_COALESCING_WINDOW_INIT(NORMAL, 75),
1399	DISPATCH_KEVENT_COALESCING_WINDOW_INIT(CRITICAL, 1),
1400	DISPATCH_KEVENT_COALESCING_WINDOW_INIT(BACKGROUND, 100),
1401};
1402
1403#define _dispatch_timers_insert(tidx, dra, dr, dr_list, dta, dt, dt_list) ({ \
1404	typeof(dr) dri = NULL; typeof(dt) dti; \
1405	if (tidx != DISPATCH_TIMER_INDEX_DISARM) { \
1406		TAILQ_FOREACH(dri, &dra[tidx].dk_sources, dr_list) { \
1407			if (ds_timer(dr).target < ds_timer(dri).target) { \
1408				break; \
1409			} \
1410		} \
1411		TAILQ_FOREACH(dti, &dta[tidx].dt_sources, dt_list) { \
1412			if (ds_timer(dt).deadline < ds_timer(dti).deadline) { \
1413				break; \
1414			} \
1415		} \
1416		if (dti) { \
1417			TAILQ_INSERT_BEFORE(dti, dt, dt_list); \
1418		} else { \
1419			TAILQ_INSERT_TAIL(&dta[tidx].dt_sources, dt, dt_list); \
1420		} \
1421	} \
1422	if (dri) { \
1423		TAILQ_INSERT_BEFORE(dri, dr, dr_list); \
1424	} else { \
1425		TAILQ_INSERT_TAIL(&dra[tidx].dk_sources, dr, dr_list); \
1426	} \
1427	})
1428
1429#define _dispatch_timers_remove(tidx, dk, dra, dr, dr_list, dta, dt, dt_list) \
1430	({ \
1431	if (tidx != DISPATCH_TIMER_INDEX_DISARM) { \
1432		TAILQ_REMOVE(&dta[tidx].dt_sources, dt, dt_list); \
1433	} \
1434	TAILQ_REMOVE(dk ? &(*(dk)).dk_sources : &dra[tidx].dk_sources, dr, \
1435			dr_list); })
1436
1437#define _dispatch_timers_check(dra, dta) ({ \
1438	unsigned int qosm = _dispatch_timers_qos_mask; \
1439	bool update = false; \
1440	unsigned int tidx; \
1441	for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) { \
1442		if (!(qosm & 1 << DISPATCH_TIMER_QOS(tidx))){ \
1443			continue; \
1444		} \
1445		dispatch_timer_source_refs_t dr = (dispatch_timer_source_refs_t) \
1446				TAILQ_FIRST(&dra[tidx].dk_sources); \
1447		dispatch_timer_source_refs_t dt = (dispatch_timer_source_refs_t) \
1448				TAILQ_FIRST(&dta[tidx].dt_sources); \
1449		uint64_t target = dr ? ds_timer(dr).target : UINT64_MAX; \
1450		uint64_t deadline = dr ? ds_timer(dt).deadline : UINT64_MAX; \
1451		if (target != dta[tidx].target) { \
1452			dta[tidx].target = target; \
1453			update = true; \
1454		} \
1455		if (deadline != dta[tidx].deadline) { \
1456			dta[tidx].deadline = deadline; \
1457			update = true; \
1458		} \
1459	} \
1460	update; })
1461
1462static bool _dispatch_timers_reconfigure, _dispatch_timer_expired;
1463static unsigned int _dispatch_timers_qos_mask;
1464static bool _dispatch_timers_force_max_leeway;
1465
1466static void
1467_dispatch_timers_init(void)
1468{
1469#ifndef __LP64__
1470	unsigned int tidx;
1471	for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
1472		_dispatch_kevent_timer[tidx].dk_kevent.udata = \
1473				DISPATCH_KEVENT_TIMER_UDATA(tidx);
1474	}
1475#endif // __LP64__
1476	if (slowpath(getenv("LIBDISPATCH_TIMERS_FORCE_MAX_LEEWAY"))) {
1477		_dispatch_timers_force_max_leeway = true;
1478	}
1479}
1480
1481static inline void
1482_dispatch_timers_unregister(dispatch_source_t ds, dispatch_kevent_t dk)
1483{
1484	dispatch_source_refs_t dr = ds->ds_refs;
1485	unsigned int tidx = (unsigned int)dk->dk_kevent.ident;
1486
1487	if (slowpath(ds_timer_aggregate(ds))) {
1488		_dispatch_timer_aggregates_unregister(ds, tidx);
1489	}
1490	_dispatch_timers_remove(tidx, dk, _dispatch_kevent_timer, dr, dr_list,
1491			_dispatch_timer, (dispatch_timer_source_refs_t)dr, dt_list);
1492	if (tidx != DISPATCH_TIMER_INDEX_DISARM) {
1493		_dispatch_timers_reconfigure = true;
1494		_dispatch_timers_qos_mask |= 1 << DISPATCH_TIMER_QOS(tidx);
1495	}
1496}
1497
1498// Updates the ordered list of timers based on next fire date for changes to ds.
1499// Should only be called from the context of _dispatch_mgr_q.
1500static void
1501_dispatch_timers_update(dispatch_source_t ds)
1502{
1503	dispatch_kevent_t dk = ds->ds_dkev;
1504	dispatch_source_refs_t dr = ds->ds_refs;
1505	unsigned int tidx;
1506
1507	DISPATCH_ASSERT_ON_MANAGER_QUEUE();
1508
1509	// Do not reschedule timers unregistered with _dispatch_kevent_unregister()
1510	if (slowpath(!dk)) {
1511		return;
1512	}
1513	// Move timers that are disabled, suspended or have missed intervals to the
1514	// disarmed list, rearm after resume resp. source invoke will reenable them
1515	if (!ds_timer(dr).target || DISPATCH_OBJECT_SUSPENDED(ds) ||
1516			ds->ds_pending_data) {
1517		tidx = DISPATCH_TIMER_INDEX_DISARM;
1518		(void)dispatch_atomic_and2o(ds, ds_atomic_flags, ~DSF_ARMED, relaxed);
1519	} else {
1520		tidx = _dispatch_source_timer_idx(dr);
1521	}
1522	if (slowpath(ds_timer_aggregate(ds))) {
1523		_dispatch_timer_aggregates_register(ds);
1524	}
1525	if (slowpath(!ds->ds_is_installed)) {
1526		ds->ds_is_installed = true;
1527		if (tidx != DISPATCH_TIMER_INDEX_DISARM) {
1528			(void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, relaxed);
1529		}
1530		_dispatch_object_debug(ds, "%s", __func__);
1531		ds->ds_dkev = NULL;
1532		free(dk);
1533	} else {
1534		_dispatch_timers_unregister(ds, dk);
1535	}
1536	if (tidx != DISPATCH_TIMER_INDEX_DISARM) {
1537		_dispatch_timers_reconfigure = true;
1538		_dispatch_timers_qos_mask |= 1 << DISPATCH_TIMER_QOS(tidx);
1539	}
1540	if (dk != &_dispatch_kevent_timer[tidx]){
1541		ds->ds_dkev = &_dispatch_kevent_timer[tidx];
1542	}
1543	_dispatch_timers_insert(tidx, _dispatch_kevent_timer, dr, dr_list,
1544			_dispatch_timer, (dispatch_timer_source_refs_t)dr, dt_list);
1545	if (slowpath(ds_timer_aggregate(ds))) {
1546		_dispatch_timer_aggregates_update(ds, tidx);
1547	}
1548}
1549
1550static inline void
1551_dispatch_timers_run2(uint64_t nows[], unsigned int tidx)
1552{
1553	dispatch_source_refs_t dr;
1554	dispatch_source_t ds;
1555	uint64_t now, missed;
1556
1557	now = _dispatch_source_timer_now(nows, tidx);
1558	while ((dr = TAILQ_FIRST(&_dispatch_kevent_timer[tidx].dk_sources))) {
1559		ds = _dispatch_source_from_refs(dr);
1560		// We may find timers on the wrong list due to a pending update from
1561		// dispatch_source_set_timer. Force an update of the list in that case.
1562		if (tidx != ds->ds_ident_hack) {
1563			_dispatch_timers_update(ds);
1564			continue;
1565		}
1566		if (!ds_timer(dr).target) {
1567			// No configured timers on the list
1568			break;
1569		}
1570		if (ds_timer(dr).target > now) {
1571			// Done running timers for now.
1572			break;
1573		}
1574		// Remove timers that are suspended or have missed intervals from the
1575		// list, rearm after resume resp. source invoke will reenable them
1576		if (DISPATCH_OBJECT_SUSPENDED(ds) || ds->ds_pending_data) {
1577			_dispatch_timers_update(ds);
1578			continue;
1579		}
1580		// Calculate number of missed intervals.
1581		missed = (now - ds_timer(dr).target) / ds_timer(dr).interval;
1582		if (++missed > INT_MAX) {
1583			missed = INT_MAX;
1584		}
1585		if (ds_timer(dr).interval < INT64_MAX) {
1586			ds_timer(dr).target += missed * ds_timer(dr).interval;
1587			ds_timer(dr).deadline = ds_timer(dr).target + ds_timer(dr).leeway;
1588		} else {
1589			ds_timer(dr).target = UINT64_MAX;
1590			ds_timer(dr).deadline = UINT64_MAX;
1591		}
1592		_dispatch_timers_update(ds);
1593		ds_timer(dr).last_fire = now;
1594
1595		unsigned long data;
1596		data = dispatch_atomic_add2o(ds, ds_pending_data,
1597				(unsigned long)missed, relaxed);
1598		_dispatch_trace_timer_fire(dr, data, (unsigned long)missed);
1599		_dispatch_wakeup(ds);
1600	}
1601}
1602
1603DISPATCH_NOINLINE
1604static void
1605_dispatch_timers_run(uint64_t nows[])
1606{
1607	unsigned int tidx;
1608	for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
1609		if (!TAILQ_EMPTY(&_dispatch_kevent_timer[tidx].dk_sources)) {
1610			_dispatch_timers_run2(nows, tidx);
1611		}
1612	}
1613}
1614
1615static inline unsigned int
1616_dispatch_timers_get_delay(uint64_t nows[], struct dispatch_timer_s timer[],
1617		uint64_t *delay, uint64_t *leeway, int qos)
1618{
1619	unsigned int tidx, ridx = DISPATCH_TIMER_COUNT;
1620	uint64_t tmp, delta = UINT64_MAX, dldelta = UINT64_MAX;
1621
1622	for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
1623		if (qos >= 0 && qos != DISPATCH_TIMER_QOS(tidx)){
1624			continue;
1625		}
1626		uint64_t target = timer[tidx].target;
1627		if (target == UINT64_MAX) {
1628			continue;
1629		}
1630		uint64_t deadline = timer[tidx].deadline;
1631		if (qos >= 0) {
1632			// Timer pre-coalescing <rdar://problem/13222034>
1633			uint64_t window = _dispatch_kevent_coalescing_window[qos];
1634			uint64_t latest = deadline > window ? deadline - window : 0;
1635			dispatch_source_refs_t dri;
1636			TAILQ_FOREACH(dri, &_dispatch_kevent_timer[tidx].dk_sources,
1637					dr_list) {
1638				tmp = ds_timer(dri).target;
1639				if (tmp > latest) break;
1640				target = tmp;
1641			}
1642		}
1643		uint64_t now = _dispatch_source_timer_now(nows, tidx);
1644		if (target <= now) {
1645			delta = 0;
1646			break;
1647		}
1648		tmp = target - now;
1649		if (DISPATCH_TIMER_KIND(tidx) != DISPATCH_TIMER_KIND_WALL) {
1650			tmp = _dispatch_time_mach2nano(tmp);
1651		}
1652		if (tmp < INT64_MAX && tmp < delta) {
1653			ridx = tidx;
1654			delta = tmp;
1655		}
1656		dispatch_assert(target <= deadline);
1657		tmp = deadline - now;
1658		if (DISPATCH_TIMER_KIND(tidx) != DISPATCH_TIMER_KIND_WALL) {
1659			tmp = _dispatch_time_mach2nano(tmp);
1660		}
1661		if (tmp < INT64_MAX && tmp < dldelta) {
1662			dldelta = tmp;
1663		}
1664	}
1665	*delay = delta;
1666	*leeway = delta && delta < UINT64_MAX ? dldelta - delta : UINT64_MAX;
1667	return ridx;
1668}
1669
1670static bool
1671_dispatch_timers_program2(uint64_t nows[], struct kevent64_s *ke,
1672		unsigned int qos)
1673{
1674	unsigned int tidx;
1675	bool poll;
1676	uint64_t delay, leeway;
1677
1678	tidx = _dispatch_timers_get_delay(nows, _dispatch_timer, &delay, &leeway,
1679			(int)qos);
1680	poll = (delay == 0);
1681	if (poll || delay == UINT64_MAX) {
1682		_dispatch_trace_next_timer_set(NULL, qos);
1683		if (!ke->data) {
1684			return poll;
1685		}
1686		ke->data = 0;
1687		ke->flags |= EV_DELETE;
1688		ke->flags &= ~(EV_ADD|EV_ENABLE);
1689	} else {
1690		_dispatch_trace_next_timer_set(
1691				TAILQ_FIRST(&_dispatch_kevent_timer[tidx].dk_sources), qos);
1692		_dispatch_trace_next_timer_program(delay, qos);
1693		delay += _dispatch_source_timer_now(nows, DISPATCH_TIMER_KIND_WALL);
1694		if (slowpath(_dispatch_timers_force_max_leeway)) {
1695			ke->data = (int64_t)(delay + leeway);
1696			ke->ext[1] = 0;
1697		} else {
1698			ke->data = (int64_t)delay;
1699			ke->ext[1] = leeway;
1700		}
1701		ke->flags |= EV_ADD|EV_ENABLE;
1702		ke->flags &= ~EV_DELETE;
1703	}
1704	_dispatch_kq_update(ke);
1705	return poll;
1706}
1707
1708DISPATCH_NOINLINE
1709static bool
1710_dispatch_timers_program(uint64_t nows[])
1711{
1712	bool poll = false;
1713	unsigned int qos, qosm = _dispatch_timers_qos_mask;
1714	for (qos = 0; qos < DISPATCH_TIMER_QOS_COUNT; qos++) {
1715		if (!(qosm & 1 << qos)){
1716			continue;
1717		}
1718		poll |= _dispatch_timers_program2(nows, &_dispatch_kevent_timeout[qos],
1719				qos);
1720	}
1721	return poll;
1722}
1723
1724DISPATCH_NOINLINE
1725static bool
1726_dispatch_timers_configure(void)
1727{
1728	_dispatch_timer_aggregates_check();
1729	// Find out if there is a new target/deadline on the timer lists
1730	return _dispatch_timers_check(_dispatch_kevent_timer, _dispatch_timer);
1731}
1732
1733static void
1734_dispatch_timers_calendar_change(void)
1735{
1736	// calendar change may have gone past the wallclock deadline
1737	_dispatch_timer_expired = true;
1738	_dispatch_timers_qos_mask = ~0u;
1739}
1740
1741static void
1742_dispatch_timers_kevent(struct kevent64_s *ke)
1743{
1744	dispatch_assert(ke->data > 0);
1745	dispatch_assert((ke->ident & DISPATCH_KEVENT_TIMEOUT_IDENT_MASK) ==
1746			DISPATCH_KEVENT_TIMEOUT_IDENT_MASK);
1747	unsigned int qos = ke->ident & ~DISPATCH_KEVENT_TIMEOUT_IDENT_MASK;
1748	dispatch_assert(qos < DISPATCH_TIMER_QOS_COUNT);
1749	dispatch_assert(_dispatch_kevent_timeout[qos].data);
1750	_dispatch_kevent_timeout[qos].data = 0; // kevent deleted via EV_ONESHOT
1751	_dispatch_timer_expired = true;
1752	_dispatch_timers_qos_mask |= 1 << qos;
1753	_dispatch_trace_next_timer_wake(qos);
1754}
1755
1756static inline bool
1757_dispatch_mgr_timers(void)
1758{
1759	uint64_t nows[DISPATCH_TIMER_KIND_COUNT] = {};
1760	bool expired = slowpath(_dispatch_timer_expired);
1761	if (expired) {
1762		_dispatch_timers_run(nows);
1763	}
1764	bool reconfigure = slowpath(_dispatch_timers_reconfigure);
1765	if (reconfigure || expired) {
1766		if (reconfigure) {
1767			reconfigure = _dispatch_timers_configure();
1768			_dispatch_timers_reconfigure = false;
1769		}
1770		if (reconfigure || expired) {
1771			expired = _dispatch_timer_expired = _dispatch_timers_program(nows);
1772			expired = expired || _dispatch_mgr_q.dq_items_tail;
1773		}
1774		_dispatch_timers_qos_mask = 0;
1775	}
1776	return expired;
1777}
1778
1779#pragma mark -
1780#pragma mark dispatch_timer_aggregate
1781
1782typedef struct {
1783	TAILQ_HEAD(, dispatch_timer_source_aggregate_refs_s) dk_sources;
1784} dispatch_timer_aggregate_refs_s;
1785
1786typedef struct dispatch_timer_aggregate_s {
1787	DISPATCH_STRUCT_HEADER(queue);
1788	DISPATCH_QUEUE_HEADER;
1789	TAILQ_ENTRY(dispatch_timer_aggregate_s) dta_list;
1790	dispatch_timer_aggregate_refs_s
1791			dta_kevent_timer[DISPATCH_KEVENT_TIMER_COUNT];
1792	struct {
1793		DISPATCH_TIMER_STRUCT(dispatch_timer_source_aggregate_refs_s);
1794	} dta_timer[DISPATCH_TIMER_COUNT];
1795	struct dispatch_timer_s dta_timer_data[DISPATCH_TIMER_COUNT];
1796	unsigned int dta_refcount;
1797} dispatch_timer_aggregate_s;
1798
1799typedef TAILQ_HEAD(, dispatch_timer_aggregate_s) dispatch_timer_aggregates_s;
1800static dispatch_timer_aggregates_s _dispatch_timer_aggregates =
1801		TAILQ_HEAD_INITIALIZER(_dispatch_timer_aggregates);
1802
1803dispatch_timer_aggregate_t
1804dispatch_timer_aggregate_create(void)
1805{
1806	unsigned int tidx;
1807	dispatch_timer_aggregate_t dta = _dispatch_alloc(DISPATCH_VTABLE(queue),
1808			sizeof(struct dispatch_timer_aggregate_s));
1809	_dispatch_queue_init((dispatch_queue_t)dta);
1810	dta->do_targetq = _dispatch_get_root_queue(
1811			_DISPATCH_QOS_CLASS_USER_INITIATED, true);
1812	dta->dq_width = DISPATCH_QUEUE_WIDTH_MAX;
1813	//FIXME: aggregates need custom vtable
1814	//dta->dq_label = "timer-aggregate";
1815	for (tidx = 0; tidx < DISPATCH_KEVENT_TIMER_COUNT; tidx++) {
1816		TAILQ_INIT(&dta->dta_kevent_timer[tidx].dk_sources);
1817	}
1818	for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
1819		TAILQ_INIT(&dta->dta_timer[tidx].dt_sources);
1820		dta->dta_timer[tidx].target = UINT64_MAX;
1821		dta->dta_timer[tidx].deadline = UINT64_MAX;
1822		dta->dta_timer_data[tidx].target = UINT64_MAX;
1823		dta->dta_timer_data[tidx].deadline = UINT64_MAX;
1824	}
1825	return (dispatch_timer_aggregate_t)_dispatch_introspection_queue_create(
1826			(dispatch_queue_t)dta);
1827}
1828
1829typedef struct dispatch_timer_delay_s {
1830	dispatch_timer_t timer;
1831	uint64_t delay, leeway;
1832} *dispatch_timer_delay_t;
1833
1834static void
1835_dispatch_timer_aggregate_get_delay(void *ctxt)
1836{
1837	dispatch_timer_delay_t dtd = ctxt;
1838	struct { uint64_t nows[DISPATCH_TIMER_KIND_COUNT]; } dtn = {};
1839	_dispatch_timers_get_delay(dtn.nows, dtd->timer, &dtd->delay, &dtd->leeway,
1840			-1);
1841}
1842
1843uint64_t
1844dispatch_timer_aggregate_get_delay(dispatch_timer_aggregate_t dta,
1845		uint64_t *leeway_ptr)
1846{
1847	struct dispatch_timer_delay_s dtd = {
1848		.timer = dta->dta_timer_data,
1849	};
1850	dispatch_sync_f((dispatch_queue_t)dta, &dtd,
1851			_dispatch_timer_aggregate_get_delay);
1852	if (leeway_ptr) {
1853		*leeway_ptr = dtd.leeway;
1854	}
1855	return dtd.delay;
1856}
1857
1858static void
1859_dispatch_timer_aggregate_update(void *ctxt)
1860{
1861	dispatch_timer_aggregate_t dta = (void*)_dispatch_queue_get_current();
1862	dispatch_timer_t dtau = ctxt;
1863	unsigned int tidx;
1864	for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
1865		dta->dta_timer_data[tidx].target = dtau[tidx].target;
1866		dta->dta_timer_data[tidx].deadline = dtau[tidx].deadline;
1867	}
1868	free(dtau);
1869}
1870
1871DISPATCH_NOINLINE
1872static void
1873_dispatch_timer_aggregates_configure(void)
1874{
1875	dispatch_timer_aggregate_t dta;
1876	dispatch_timer_t dtau;
1877	TAILQ_FOREACH(dta, &_dispatch_timer_aggregates, dta_list) {
1878		if (!_dispatch_timers_check(dta->dta_kevent_timer, dta->dta_timer)) {
1879			continue;
1880		}
1881		dtau = _dispatch_calloc(DISPATCH_TIMER_COUNT, sizeof(*dtau));
1882		memcpy(dtau, dta->dta_timer, sizeof(dta->dta_timer));
1883		_dispatch_barrier_async_detached_f((dispatch_queue_t)dta, dtau,
1884				_dispatch_timer_aggregate_update);
1885	}
1886}
1887
1888static inline void
1889_dispatch_timer_aggregates_check(void)
1890{
1891	if (fastpath(TAILQ_EMPTY(&_dispatch_timer_aggregates))) {
1892		return;
1893	}
1894	_dispatch_timer_aggregates_configure();
1895}
1896
1897static void
1898_dispatch_timer_aggregates_register(dispatch_source_t ds)
1899{
1900	dispatch_timer_aggregate_t dta = ds_timer_aggregate(ds);
1901	if (!dta->dta_refcount++) {
1902		TAILQ_INSERT_TAIL(&_dispatch_timer_aggregates, dta, dta_list);
1903	}
1904}
1905
1906DISPATCH_NOINLINE
1907static void
1908_dispatch_timer_aggregates_update(dispatch_source_t ds, unsigned int tidx)
1909{
1910	dispatch_timer_aggregate_t dta = ds_timer_aggregate(ds);
1911	dispatch_timer_source_aggregate_refs_t dr;
1912	dr = (dispatch_timer_source_aggregate_refs_t)ds->ds_refs;
1913	_dispatch_timers_insert(tidx, dta->dta_kevent_timer, dr, dra_list,
1914			dta->dta_timer, dr, dta_list);
1915}
1916
1917DISPATCH_NOINLINE
1918static void
1919_dispatch_timer_aggregates_unregister(dispatch_source_t ds, unsigned int tidx)
1920{
1921	dispatch_timer_aggregate_t dta = ds_timer_aggregate(ds);
1922	dispatch_timer_source_aggregate_refs_t dr;
1923	dr = (dispatch_timer_source_aggregate_refs_t)ds->ds_refs;
1924	_dispatch_timers_remove(tidx, (dispatch_timer_aggregate_refs_s*)NULL,
1925			dta->dta_kevent_timer, dr, dra_list, dta->dta_timer, dr, dta_list);
1926	if (!--dta->dta_refcount) {
1927		TAILQ_REMOVE(&_dispatch_timer_aggregates, dta, dta_list);
1928	}
1929}
1930
1931#pragma mark -
1932#pragma mark dispatch_select
1933
1934static int _dispatch_kq;
1935
1936static unsigned int _dispatch_select_workaround;
1937static fd_set _dispatch_rfds;
1938static fd_set _dispatch_wfds;
1939static uint64_t*_dispatch_rfd_ptrs;
1940static uint64_t*_dispatch_wfd_ptrs;
1941
1942DISPATCH_NOINLINE
1943static bool
1944_dispatch_select_register(struct kevent64_s *kev)
1945{
1946
1947	// Must execute on manager queue
1948	DISPATCH_ASSERT_ON_MANAGER_QUEUE();
1949
1950	// If an EINVAL or ENOENT error occurred while adding/enabling a read or
1951	// write kevent, assume it was due to a type of filedescriptor not
1952	// supported by kqueue and fall back to select
1953	switch (kev->filter) {
1954	case EVFILT_READ:
1955		if ((kev->data == EINVAL || kev->data == ENOENT) &&
1956				dispatch_assume(kev->ident < FD_SETSIZE)) {
1957			FD_SET((int)kev->ident, &_dispatch_rfds);
1958			if (slowpath(!_dispatch_rfd_ptrs)) {
1959				_dispatch_rfd_ptrs = _dispatch_calloc(FD_SETSIZE,
1960						sizeof(*_dispatch_rfd_ptrs));
1961			}
1962			if (!_dispatch_rfd_ptrs[kev->ident]) {
1963				_dispatch_rfd_ptrs[kev->ident] = kev->udata;
1964				_dispatch_select_workaround++;
1965				_dispatch_debug("select workaround used to read fd %d: 0x%lx",
1966						(int)kev->ident, (long)kev->data);
1967			}
1968		}
1969		return true;
1970	case EVFILT_WRITE:
1971		if ((kev->data == EINVAL || kev->data == ENOENT) &&
1972				dispatch_assume(kev->ident < FD_SETSIZE)) {
1973			FD_SET((int)kev->ident, &_dispatch_wfds);
1974			if (slowpath(!_dispatch_wfd_ptrs)) {
1975				_dispatch_wfd_ptrs = _dispatch_calloc(FD_SETSIZE,
1976						sizeof(*_dispatch_wfd_ptrs));
1977			}
1978			if (!_dispatch_wfd_ptrs[kev->ident]) {
1979				_dispatch_wfd_ptrs[kev->ident] = kev->udata;
1980				_dispatch_select_workaround++;
1981				_dispatch_debug("select workaround used to write fd %d: 0x%lx",
1982						(int)kev->ident, (long)kev->data);
1983			}
1984		}
1985		return true;
1986	}
1987	return false;
1988}
1989
1990DISPATCH_NOINLINE
1991static bool
1992_dispatch_select_unregister(const struct kevent64_s *kev)
1993{
1994	// Must execute on manager queue
1995	DISPATCH_ASSERT_ON_MANAGER_QUEUE();
1996
1997	switch (kev->filter) {
1998	case EVFILT_READ:
1999		if (_dispatch_rfd_ptrs && kev->ident < FD_SETSIZE &&
2000				_dispatch_rfd_ptrs[kev->ident]) {
2001			FD_CLR((int)kev->ident, &_dispatch_rfds);
2002			_dispatch_rfd_ptrs[kev->ident] = 0;
2003			_dispatch_select_workaround--;
2004			return true;
2005		}
2006		break;
2007	case EVFILT_WRITE:
2008		if (_dispatch_wfd_ptrs && kev->ident < FD_SETSIZE &&
2009				_dispatch_wfd_ptrs[kev->ident]) {
2010			FD_CLR((int)kev->ident, &_dispatch_wfds);
2011			_dispatch_wfd_ptrs[kev->ident] = 0;
2012			_dispatch_select_workaround--;
2013			return true;
2014		}
2015		break;
2016	}
2017	return false;
2018}
2019
2020DISPATCH_NOINLINE
2021static bool
2022_dispatch_mgr_select(bool poll)
2023{
2024	static const struct timeval timeout_immediately = { 0, 0 };
2025	fd_set tmp_rfds, tmp_wfds;
2026	struct kevent64_s kev;
2027	int err, i, r;
2028	bool kevent_avail = false;
2029
2030	FD_COPY(&_dispatch_rfds, &tmp_rfds);
2031	FD_COPY(&_dispatch_wfds, &tmp_wfds);
2032
2033	r = select(FD_SETSIZE, &tmp_rfds, &tmp_wfds, NULL,
2034			poll ? (struct timeval*)&timeout_immediately : NULL);
2035	if (slowpath(r == -1)) {
2036		err = errno;
2037		if (err != EBADF) {
2038			if (err != EINTR) {
2039				(void)dispatch_assume_zero(err);
2040			}
2041			return false;
2042		}
2043		for (i = 0; i < FD_SETSIZE; i++) {
2044			if (i == _dispatch_kq) {
2045				continue;
2046			}
2047			if (!FD_ISSET(i, &_dispatch_rfds) && !FD_ISSET(i, &_dispatch_wfds)){
2048				continue;
2049			}
2050			r = dup(i);
2051			if (dispatch_assume(r != -1)) {
2052				close(r);
2053			} else {
2054				if (_dispatch_rfd_ptrs && _dispatch_rfd_ptrs[i]) {
2055					FD_CLR(i, &_dispatch_rfds);
2056					_dispatch_rfd_ptrs[i] = 0;
2057					_dispatch_select_workaround--;
2058				}
2059				if (_dispatch_wfd_ptrs && _dispatch_wfd_ptrs[i]) {
2060					FD_CLR(i, &_dispatch_wfds);
2061					_dispatch_wfd_ptrs[i] = 0;
2062					_dispatch_select_workaround--;
2063				}
2064			}
2065		}
2066		return false;
2067	}
2068	if (r > 0) {
2069		for (i = 0; i < FD_SETSIZE; i++) {
2070			if (FD_ISSET(i, &tmp_rfds)) {
2071				if (i == _dispatch_kq) {
2072					kevent_avail = true;
2073					continue;
2074				}
2075				FD_CLR(i, &_dispatch_rfds); // emulate EV_DISPATCH
2076				EV_SET64(&kev, i, EVFILT_READ,
2077						EV_ADD|EV_ENABLE|EV_DISPATCH, 0, 1,
2078						_dispatch_rfd_ptrs[i], 0, 0);
2079				_dispatch_kevent_drain(&kev);
2080			}
2081			if (FD_ISSET(i, &tmp_wfds)) {
2082				FD_CLR(i, &_dispatch_wfds); // emulate EV_DISPATCH
2083				EV_SET64(&kev, i, EVFILT_WRITE,
2084						EV_ADD|EV_ENABLE|EV_DISPATCH, 0, 1,
2085						_dispatch_wfd_ptrs[i], 0, 0);
2086				_dispatch_kevent_drain(&kev);
2087			}
2088		}
2089	}
2090	return kevent_avail;
2091}
2092
2093#pragma mark -
2094#pragma mark dispatch_kqueue
2095
2096static void
2097_dispatch_kq_init(void *context DISPATCH_UNUSED)
2098{
2099	static const struct kevent64_s kev = {
2100		.ident = 1,
2101		.filter = EVFILT_USER,
2102		.flags = EV_ADD|EV_CLEAR,
2103	};
2104
2105	_dispatch_safe_fork = false;
2106#if DISPATCH_USE_GUARDED_FD
2107	guardid_t guard = (uintptr_t)&kev;
2108	_dispatch_kq = guarded_kqueue_np(&guard, GUARD_CLOSE | GUARD_DUP);
2109#else
2110	_dispatch_kq = kqueue();
2111#endif
2112	if (_dispatch_kq == -1) {
2113		int err = errno;
2114		switch (err) {
2115		case EMFILE:
2116			DISPATCH_CLIENT_CRASH("kqueue() failure: "
2117					"process is out of file descriptors");
2118			break;
2119		case ENFILE:
2120			DISPATCH_CLIENT_CRASH("kqueue() failure: "
2121					"system is out of file descriptors");
2122			break;
2123		case ENOMEM:
2124			DISPATCH_CLIENT_CRASH("kqueue() failure: "
2125					"kernel is out of memory");
2126			break;
2127		default:
2128			(void)dispatch_assume_zero(err);
2129			DISPATCH_CRASH("kqueue() failure");
2130			break;
2131		}
2132	} else if (dispatch_assume(_dispatch_kq < FD_SETSIZE)) {
2133		// in case we fall back to select()
2134		FD_SET(_dispatch_kq, &_dispatch_rfds);
2135	}
2136
2137	(void)dispatch_assume_zero(kevent64(_dispatch_kq, &kev, 1, NULL, 0, 0,
2138			NULL));
2139	_dispatch_queue_push(_dispatch_mgr_q.do_targetq, &_dispatch_mgr_q, 0);
2140}
2141
2142static int
2143_dispatch_get_kq(void)
2144{
2145	static dispatch_once_t pred;
2146
2147	dispatch_once_f(&pred, NULL, _dispatch_kq_init);
2148
2149	return _dispatch_kq;
2150}
2151
2152DISPATCH_NOINLINE
2153static long
2154_dispatch_kq_update(const struct kevent64_s *kev)
2155{
2156	int r;
2157	struct kevent64_s kev_copy;
2158
2159	if (slowpath(_dispatch_select_workaround) && (kev->flags & EV_DELETE)) {
2160		if (_dispatch_select_unregister(kev)) {
2161			return 0;
2162		}
2163	}
2164	kev_copy = *kev;
2165	// This ensures we don't get a pending kevent back while registering
2166	// a new kevent
2167	kev_copy.flags |= EV_RECEIPT;
2168retry:
2169	r = dispatch_assume(kevent64(_dispatch_get_kq(), &kev_copy, 1,
2170			&kev_copy, 1, 0, NULL));
2171	if (slowpath(r == -1)) {
2172		int err = errno;
2173		switch (err) {
2174		case EINTR:
2175			goto retry;
2176		case EBADF:
2177			DISPATCH_CLIENT_CRASH("Do not close random Unix descriptors");
2178			break;
2179		default:
2180			(void)dispatch_assume_zero(err);
2181			break;
2182		}
2183		return err;
2184	}
2185	switch (kev_copy.data) {
2186	case 0:
2187		return 0;
2188	case EBADF:
2189	case EPERM:
2190	case EINVAL:
2191	case ENOENT:
2192		if ((kev->flags & (EV_ADD|EV_ENABLE)) && !(kev->flags & EV_DELETE)) {
2193			if (_dispatch_select_register(&kev_copy)) {
2194				return 0;
2195			}
2196		}
2197		// fall through
2198	default:
2199		kev_copy.flags |= kev->flags;
2200		_dispatch_kevent_drain(&kev_copy);
2201		break;
2202	}
2203	return (long)kev_copy.data;
2204}
2205
2206#pragma mark -
2207#pragma mark dispatch_mgr
2208
2209static struct kevent64_s *_dispatch_kevent_enable;
2210
2211static void inline
2212_dispatch_mgr_kevent_reenable(struct kevent64_s *ke)
2213{
2214	dispatch_assert(!_dispatch_kevent_enable || _dispatch_kevent_enable == ke);
2215	_dispatch_kevent_enable = ke;
2216}
2217
2218unsigned long
2219_dispatch_mgr_wakeup(dispatch_queue_t dq DISPATCH_UNUSED)
2220{
2221	if (_dispatch_queue_get_current() == &_dispatch_mgr_q) {
2222		return false;
2223	}
2224
2225	static const struct kevent64_s kev = {
2226		.ident = 1,
2227		.filter = EVFILT_USER,
2228		.fflags = NOTE_TRIGGER,
2229	};
2230
2231#if DISPATCH_DEBUG && DISPATCH_MGR_QUEUE_DEBUG
2232	_dispatch_debug("waking up the dispatch manager queue: %p", dq);
2233#endif
2234
2235	_dispatch_kq_update(&kev);
2236
2237	return false;
2238}
2239
2240DISPATCH_NOINLINE
2241static void
2242_dispatch_mgr_init(void)
2243{
2244	(void)dispatch_atomic_inc2o(&_dispatch_mgr_q, dq_running, relaxed);
2245	_dispatch_thread_setspecific(dispatch_queue_key, &_dispatch_mgr_q);
2246	_dispatch_queue_set_bound_thread(&_dispatch_mgr_q);
2247	_dispatch_mgr_priority_init();
2248	_dispatch_kevent_init();
2249	_dispatch_timers_init();
2250	_dispatch_mach_recv_msg_buf_init();
2251	_dispatch_memorystatus_init();
2252}
2253
2254DISPATCH_NOINLINE DISPATCH_NORETURN
2255static void
2256_dispatch_mgr_invoke(void)
2257{
2258	static const struct timespec timeout_immediately = { 0, 0 };
2259	struct kevent64_s kev;
2260	bool poll;
2261	int r;
2262
2263	for (;;) {
2264		_dispatch_mgr_queue_drain();
2265		poll = _dispatch_mgr_timers();
2266		if (slowpath(_dispatch_select_workaround)) {
2267			poll = _dispatch_mgr_select(poll);
2268			if (!poll) continue;
2269		}
2270		poll = poll || _dispatch_queue_class_probe(&_dispatch_mgr_q);
2271		r = kevent64(_dispatch_kq, _dispatch_kevent_enable,
2272				_dispatch_kevent_enable ? 1 : 0, &kev, 1, 0,
2273				poll ? &timeout_immediately : NULL);
2274		_dispatch_kevent_enable = NULL;
2275		if (slowpath(r == -1)) {
2276			int err = errno;
2277			switch (err) {
2278			case EINTR:
2279				break;
2280			case EBADF:
2281				DISPATCH_CLIENT_CRASH("Do not close random Unix descriptors");
2282				break;
2283			default:
2284				(void)dispatch_assume_zero(err);
2285				break;
2286			}
2287		} else if (r) {
2288			_dispatch_kevent_drain(&kev);
2289		}
2290	}
2291}
2292
2293DISPATCH_NORETURN
2294void
2295_dispatch_mgr_thread(dispatch_queue_t dq DISPATCH_UNUSED)
2296{
2297	_dispatch_mgr_init();
2298	// never returns, so burn bridges behind us & clear stack 2k ahead
2299	_dispatch_clear_stack(2048);
2300	_dispatch_mgr_invoke();
2301}
2302
2303#pragma mark -
2304#pragma mark dispatch_memorystatus
2305
2306#if DISPATCH_USE_MEMORYSTATUS_SOURCE
2307#define DISPATCH_MEMORYSTATUS_SOURCE_TYPE DISPATCH_SOURCE_TYPE_MEMORYSTATUS
2308#define DISPATCH_MEMORYSTATUS_SOURCE_MASK ( \
2309		DISPATCH_MEMORYSTATUS_PRESSURE_NORMAL | \
2310		DISPATCH_MEMORYSTATUS_PRESSURE_WARN)
2311#elif DISPATCH_USE_VM_PRESSURE_SOURCE
2312#define DISPATCH_MEMORYSTATUS_SOURCE_TYPE DISPATCH_SOURCE_TYPE_VM
2313#define DISPATCH_MEMORYSTATUS_SOURCE_MASK DISPATCH_VM_PRESSURE
2314#endif
2315
2316#if DISPATCH_USE_MEMORYSTATUS_SOURCE || DISPATCH_USE_VM_PRESSURE_SOURCE
2317static dispatch_source_t _dispatch_memorystatus_source;
2318
2319static void
2320_dispatch_memorystatus_handler(void *context DISPATCH_UNUSED)
2321{
2322#if DISPATCH_USE_MEMORYSTATUS_SOURCE
2323	unsigned long memorystatus;
2324	memorystatus = dispatch_source_get_data(_dispatch_memorystatus_source);
2325	if (memorystatus & DISPATCH_MEMORYSTATUS_PRESSURE_NORMAL) {
2326		_dispatch_continuation_cache_limit = DISPATCH_CONTINUATION_CACHE_LIMIT;
2327		_voucher_activity_heap_pressure_normal();
2328		return;
2329	}
2330	_dispatch_continuation_cache_limit =
2331			DISPATCH_CONTINUATION_CACHE_LIMIT_MEMORYSTATUS_PRESSURE_WARN;
2332	_voucher_activity_heap_pressure_warn();
2333#endif
2334	malloc_zone_pressure_relief(0,0);
2335}
2336
2337static void
2338_dispatch_memorystatus_init(void)
2339{
2340	_dispatch_memorystatus_source = dispatch_source_create(
2341			DISPATCH_MEMORYSTATUS_SOURCE_TYPE, 0,
2342			DISPATCH_MEMORYSTATUS_SOURCE_MASK,
2343			_dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT, true));
2344	dispatch_source_set_event_handler_f(_dispatch_memorystatus_source,
2345			_dispatch_memorystatus_handler);
2346	dispatch_resume(_dispatch_memorystatus_source);
2347}
2348#else
2349static inline void _dispatch_memorystatus_init(void) {}
2350#endif // DISPATCH_USE_MEMORYSTATUS_SOURCE || DISPATCH_USE_VM_PRESSURE_SOURCE
2351
2352#pragma mark -
2353#pragma mark dispatch_mach
2354
2355#if HAVE_MACH
2356
2357#if DISPATCH_DEBUG && DISPATCH_MACHPORT_DEBUG
2358#define _dispatch_debug_machport(name) \
2359		dispatch_debug_machport((name), __func__)
2360#else
2361#define _dispatch_debug_machport(name) ((void)(name))
2362#endif
2363
2364// Flags for all notifications that are registered/unregistered when a
2365// send-possible notification is requested/delivered
2366#define _DISPATCH_MACH_SP_FLAGS (DISPATCH_MACH_SEND_POSSIBLE| \
2367		DISPATCH_MACH_SEND_DEAD|DISPATCH_MACH_SEND_DELETED)
2368#define _DISPATCH_MACH_RECV_FLAGS (DISPATCH_MACH_RECV_MESSAGE| \
2369		DISPATCH_MACH_RECV_MESSAGE_DIRECT| \
2370		DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE)
2371#define _DISPATCH_MACH_RECV_DIRECT_FLAGS ( \
2372		DISPATCH_MACH_RECV_MESSAGE_DIRECT| \
2373		DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE)
2374
2375#define _DISPATCH_IS_POWER_OF_TWO(v) (!(v & (v - 1)) && v)
2376#define _DISPATCH_HASH(x, y) (_DISPATCH_IS_POWER_OF_TWO(y) ? \
2377		(MACH_PORT_INDEX(x) & ((y) - 1)) : (MACH_PORT_INDEX(x) % (y)))
2378
2379#define _DISPATCH_MACHPORT_HASH_SIZE 32
2380#define _DISPATCH_MACHPORT_HASH(x) \
2381		_DISPATCH_HASH((x), _DISPATCH_MACHPORT_HASH_SIZE)
2382
2383#ifndef MACH_RCV_LARGE_IDENTITY
2384#define MACH_RCV_LARGE_IDENTITY 0x00000008
2385#endif
2386#ifndef MACH_RCV_VOUCHER
2387#define MACH_RCV_VOUCHER 0x00000800
2388#endif
2389#define DISPATCH_MACH_RCV_TRAILER MACH_RCV_TRAILER_CTX
2390#define DISPATCH_MACH_RCV_OPTIONS ( \
2391		MACH_RCV_MSG | MACH_RCV_LARGE | MACH_RCV_LARGE_IDENTITY | \
2392		MACH_RCV_TRAILER_ELEMENTS(DISPATCH_MACH_RCV_TRAILER) | \
2393		MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0)) | \
2394		MACH_RCV_VOUCHER
2395
2396#define DISPATCH_MACH_KEVENT_ARMED(dk) ((dk)->dk_kevent.ext[0])
2397
2398static void _dispatch_kevent_machport_drain(struct kevent64_s *ke);
2399static void _dispatch_kevent_mach_msg_drain(struct kevent64_s *ke);
2400static void _dispatch_kevent_mach_msg_recv(mach_msg_header_t *hdr);
2401static void _dispatch_kevent_mach_msg_destroy(mach_msg_header_t *hdr);
2402static void _dispatch_source_merge_mach_msg(dispatch_source_t ds,
2403		dispatch_source_refs_t dr, dispatch_kevent_t dk,
2404		mach_msg_header_t *hdr, mach_msg_size_t siz);
2405static kern_return_t _dispatch_mach_notify_update(dispatch_kevent_t dk,
2406		uint32_t new_flags, uint32_t del_flags, uint32_t mask,
2407		mach_msg_id_t notify_msgid, mach_port_mscount_t notify_sync);
2408static void _dispatch_mach_notify_source_invoke(mach_msg_header_t *hdr);
2409static void _dispatch_mach_reply_kevent_unregister(dispatch_mach_t dm,
2410		dispatch_mach_reply_refs_t dmr, bool disconnected);
2411static void _dispatch_mach_kevent_unregister(dispatch_mach_t dm);
2412static inline void _dispatch_mach_msg_set_options(dispatch_object_t dou,
2413		mach_msg_option_t options);
2414static void _dispatch_mach_msg_recv(dispatch_mach_t dm,
2415		dispatch_mach_reply_refs_t dmr, mach_msg_header_t *hdr,
2416		mach_msg_size_t siz);
2417static void _dispatch_mach_merge_kevent(dispatch_mach_t dm,
2418		const struct kevent64_s *ke);
2419static inline mach_msg_option_t _dispatch_mach_checkin_options(void);
2420
2421static const size_t _dispatch_mach_recv_msg_size =
2422		DISPATCH_MACH_RECEIVE_MAX_INLINE_MESSAGE_SIZE;
2423static const size_t dispatch_mach_trailer_size =
2424		sizeof(dispatch_mach_trailer_t);
2425static mach_msg_size_t _dispatch_mach_recv_msg_buf_size;
2426static mach_port_t _dispatch_mach_portset, _dispatch_mach_recv_portset;
2427static mach_port_t _dispatch_mach_notify_port;
2428static struct kevent64_s _dispatch_mach_recv_kevent = {
2429	.filter = EVFILT_MACHPORT,
2430	.flags = EV_ADD|EV_ENABLE|EV_DISPATCH,
2431	.fflags = DISPATCH_MACH_RCV_OPTIONS,
2432};
2433static dispatch_source_t _dispatch_mach_notify_source;
2434static const
2435struct dispatch_source_type_s _dispatch_source_type_mach_recv_direct = {
2436	.ke = {
2437		.filter = EVFILT_MACHPORT,
2438		.flags = EV_CLEAR,
2439		.fflags = DISPATCH_MACH_RECV_MESSAGE_DIRECT,
2440	},
2441};
2442
2443static void
2444_dispatch_mach_recv_msg_buf_init(void)
2445{
2446	mach_vm_size_t vm_size = mach_vm_round_page(
2447			_dispatch_mach_recv_msg_size + dispatch_mach_trailer_size);
2448	_dispatch_mach_recv_msg_buf_size = (mach_msg_size_t)vm_size;
2449	mach_vm_address_t vm_addr = vm_page_size;
2450	kern_return_t kr;
2451
2452	while (slowpath(kr = mach_vm_allocate(mach_task_self(), &vm_addr, vm_size,
2453			VM_FLAGS_ANYWHERE))) {
2454		if (kr != KERN_NO_SPACE) {
2455			(void)dispatch_assume_zero(kr);
2456			DISPATCH_CLIENT_CRASH("Could not allocate mach msg receive buffer");
2457		}
2458		_dispatch_temporary_resource_shortage();
2459		vm_addr = vm_page_size;
2460	}
2461	_dispatch_mach_recv_kevent.ext[0] = (uintptr_t)vm_addr;
2462	_dispatch_mach_recv_kevent.ext[1] = vm_size;
2463}
2464
2465static inline void*
2466_dispatch_get_mach_recv_msg_buf(void)
2467{
2468	return (void*)_dispatch_mach_recv_kevent.ext[0];
2469}
2470
2471static void
2472_dispatch_mach_recv_portset_init(void *context DISPATCH_UNUSED)
2473{
2474	kern_return_t kr;
2475
2476	kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET,
2477			&_dispatch_mach_recv_portset);
2478	DISPATCH_VERIFY_MIG(kr);
2479	if (dispatch_assume_zero(kr)) {
2480		DISPATCH_CLIENT_CRASH(
2481				"mach_port_allocate() failed: cannot create port set");
2482	}
2483	dispatch_assert(_dispatch_get_mach_recv_msg_buf());
2484	dispatch_assert(dispatch_mach_trailer_size ==
2485			REQUESTED_TRAILER_SIZE_NATIVE(MACH_RCV_TRAILER_ELEMENTS(
2486			DISPATCH_MACH_RCV_TRAILER)));
2487	_dispatch_mach_recv_kevent.ident = _dispatch_mach_recv_portset;
2488	_dispatch_kq_update(&_dispatch_mach_recv_kevent);
2489
2490	kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE,
2491			&_dispatch_mach_notify_port);
2492	DISPATCH_VERIFY_MIG(kr);
2493	if (dispatch_assume_zero(kr)) {
2494		DISPATCH_CLIENT_CRASH(
2495				"mach_port_allocate() failed: cannot create receive right");
2496	}
2497	_dispatch_mach_notify_source = dispatch_source_create(
2498			&_dispatch_source_type_mach_recv_direct,
2499			_dispatch_mach_notify_port, 0, &_dispatch_mgr_q);
2500	static const struct dispatch_continuation_s dc = {
2501		.dc_func = (void*)_dispatch_mach_notify_source_invoke,
2502	};
2503	_dispatch_mach_notify_source->ds_refs->ds_handler[DS_EVENT_HANDLER] =
2504			(dispatch_continuation_t)&dc;
2505	dispatch_assert(_dispatch_mach_notify_source);
2506	dispatch_resume(_dispatch_mach_notify_source);
2507}
2508
2509static mach_port_t
2510_dispatch_get_mach_recv_portset(void)
2511{
2512	static dispatch_once_t pred;
2513	dispatch_once_f(&pred, NULL, _dispatch_mach_recv_portset_init);
2514	return _dispatch_mach_recv_portset;
2515}
2516
2517static void
2518_dispatch_mach_portset_init(void *context DISPATCH_UNUSED)
2519{
2520	struct kevent64_s kev = {
2521		.filter = EVFILT_MACHPORT,
2522		.flags = EV_ADD,
2523	};
2524	kern_return_t kr;
2525
2526	kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET,
2527			&_dispatch_mach_portset);
2528	DISPATCH_VERIFY_MIG(kr);
2529	if (dispatch_assume_zero(kr)) {
2530		DISPATCH_CLIENT_CRASH(
2531				"mach_port_allocate() failed: cannot create port set");
2532	}
2533	kev.ident = _dispatch_mach_portset;
2534	_dispatch_kq_update(&kev);
2535}
2536
2537static mach_port_t
2538_dispatch_get_mach_portset(void)
2539{
2540	static dispatch_once_t pred;
2541	dispatch_once_f(&pred, NULL, _dispatch_mach_portset_init);
2542	return _dispatch_mach_portset;
2543}
2544
2545static kern_return_t
2546_dispatch_mach_portset_update(dispatch_kevent_t dk, mach_port_t mps)
2547{
2548	mach_port_t mp = (mach_port_t)dk->dk_kevent.ident;
2549	kern_return_t kr;
2550
2551	_dispatch_debug_machport(mp);
2552	kr = mach_port_move_member(mach_task_self(), mp, mps);
2553	if (slowpath(kr)) {
2554		DISPATCH_VERIFY_MIG(kr);
2555		switch (kr) {
2556		case KERN_INVALID_RIGHT:
2557			if (mps) {
2558				_dispatch_bug_mach_client("_dispatch_kevent_machport_enable: "
2559						"mach_port_move_member() failed ", kr);
2560				break;
2561			}
2562			//fall through
2563		case KERN_INVALID_NAME:
2564#if DISPATCH_DEBUG
2565			_dispatch_log("Corruption: Mach receive right 0x%x destroyed "
2566					"prematurely", mp);
2567#endif
2568			break;
2569		default:
2570			(void)dispatch_assume_zero(kr);
2571			break;
2572		}
2573	}
2574	return mps ? kr : 0;
2575}
2576
2577static void
2578_dispatch_kevent_mach_recv_reenable(struct kevent64_s *ke DISPATCH_UNUSED)
2579{
2580#if (TARGET_IPHONE_SIMULATOR && \
2581		IPHONE_SIMULATOR_HOST_MIN_VERSION_REQUIRED < 1090) || \
2582		(!TARGET_OS_IPHONE && __MAC_OS_X_VERSION_MIN_REQUIRED < 1090)
2583	// delete and re-add kevent to workaround <rdar://problem/13924256>
2584	if (ke->ext[1] != _dispatch_mach_recv_kevent.ext[1]) {
2585		struct kevent64_s kev = _dispatch_mach_recv_kevent;
2586		kev.flags = EV_DELETE;
2587		_dispatch_kq_update(&kev);
2588	}
2589#endif
2590	_dispatch_mgr_kevent_reenable(&_dispatch_mach_recv_kevent);
2591}
2592
2593static kern_return_t
2594_dispatch_kevent_machport_resume(dispatch_kevent_t dk, uint32_t new_flags,
2595		uint32_t del_flags)
2596{
2597	kern_return_t kr = 0;
2598	dispatch_assert_zero(new_flags & del_flags);
2599	if ((new_flags & _DISPATCH_MACH_RECV_FLAGS) ||
2600			(del_flags & _DISPATCH_MACH_RECV_FLAGS)) {
2601		mach_port_t mps;
2602		if (new_flags & _DISPATCH_MACH_RECV_DIRECT_FLAGS) {
2603			mps = _dispatch_get_mach_recv_portset();
2604		} else if ((new_flags & DISPATCH_MACH_RECV_MESSAGE) ||
2605				((del_flags & _DISPATCH_MACH_RECV_DIRECT_FLAGS) &&
2606				(dk->dk_kevent.fflags & DISPATCH_MACH_RECV_MESSAGE))) {
2607			mps = _dispatch_get_mach_portset();
2608		} else {
2609			mps = MACH_PORT_NULL;
2610		}
2611		kr = _dispatch_mach_portset_update(dk, mps);
2612	}
2613	return kr;
2614}
2615
2616static kern_return_t
2617_dispatch_kevent_mach_notify_resume(dispatch_kevent_t dk, uint32_t new_flags,
2618		uint32_t del_flags)
2619{
2620	kern_return_t kr = 0;
2621	dispatch_assert_zero(new_flags & del_flags);
2622	if ((new_flags & _DISPATCH_MACH_SP_FLAGS) ||
2623			(del_flags & _DISPATCH_MACH_SP_FLAGS)) {
2624		// Requesting a (delayed) non-sync send-possible notification
2625		// registers for both immediate dead-name notification and delayed-arm
2626		// send-possible notification for the port.
2627		// The send-possible notification is armed when a mach_msg() with the
2628		// the MACH_SEND_NOTIFY to the port times out.
2629		// If send-possible is unavailable, fall back to immediate dead-name
2630		// registration rdar://problem/2527840&9008724
2631		kr = _dispatch_mach_notify_update(dk, new_flags, del_flags,
2632				_DISPATCH_MACH_SP_FLAGS, MACH_NOTIFY_SEND_POSSIBLE,
2633				MACH_NOTIFY_SEND_POSSIBLE == MACH_NOTIFY_DEAD_NAME ? 1 : 0);
2634	}
2635	return kr;
2636}
2637
2638static inline void
2639_dispatch_kevent_mach_portset(struct kevent64_s *ke)
2640{
2641	if (ke->ident == _dispatch_mach_recv_portset) {
2642		return _dispatch_kevent_mach_msg_drain(ke);
2643	} else if (ke->ident == _dispatch_mach_portset) {
2644		return _dispatch_kevent_machport_drain(ke);
2645	} else {
2646		return _dispatch_kevent_error(ke);
2647	}
2648}
2649
2650DISPATCH_NOINLINE
2651static void
2652_dispatch_kevent_machport_drain(struct kevent64_s *ke)
2653{
2654	mach_port_t name = (mach_port_name_t)ke->data;
2655	dispatch_kevent_t dk;
2656	struct kevent64_s kev;
2657
2658	_dispatch_debug_machport(name);
2659	dk = _dispatch_kevent_find(name, EVFILT_MACHPORT);
2660	if (!dispatch_assume(dk)) {
2661		return;
2662	}
2663	_dispatch_mach_portset_update(dk, MACH_PORT_NULL); // emulate EV_DISPATCH
2664
2665	EV_SET64(&kev, name, EVFILT_MACHPORT, EV_ADD|EV_ENABLE|EV_DISPATCH,
2666			DISPATCH_MACH_RECV_MESSAGE, 0, (uintptr_t)dk, 0, 0);
2667	_dispatch_kevent_debug(&kev, __func__);
2668	_dispatch_kevent_merge(&kev);
2669}
2670
2671DISPATCH_NOINLINE
2672static void
2673_dispatch_kevent_mach_msg_drain(struct kevent64_s *ke)
2674{
2675	mach_msg_header_t *hdr = (mach_msg_header_t*)ke->ext[0];
2676	mach_msg_size_t siz, msgsiz;
2677	mach_msg_return_t kr = (mach_msg_return_t)ke->fflags;
2678
2679	_dispatch_kevent_mach_recv_reenable(ke);
2680	if (!dispatch_assume(hdr)) {
2681		DISPATCH_CRASH("EVFILT_MACHPORT with no message");
2682	}
2683	if (fastpath(!kr)) {
2684		return _dispatch_kevent_mach_msg_recv(hdr);
2685	} else if (kr != MACH_RCV_TOO_LARGE) {
2686		goto out;
2687	}
2688	if (!dispatch_assume(ke->ext[1] <= UINT_MAX -
2689			dispatch_mach_trailer_size)) {
2690		DISPATCH_CRASH("EVFILT_MACHPORT with overlarge message");
2691	}
2692	siz = (mach_msg_size_t)ke->ext[1] + dispatch_mach_trailer_size;
2693	hdr = malloc(siz);
2694	if (ke->data) {
2695		if (!dispatch_assume(hdr)) {
2696			// Kernel will discard message too large to fit
2697			hdr = _dispatch_get_mach_recv_msg_buf();
2698			siz = _dispatch_mach_recv_msg_buf_size;
2699		}
2700		mach_port_t name = (mach_port_name_t)ke->data;
2701		const mach_msg_option_t options = ((DISPATCH_MACH_RCV_OPTIONS |
2702				MACH_RCV_TIMEOUT) & ~MACH_RCV_LARGE);
2703		kr = mach_msg(hdr, options, 0, siz, name, MACH_MSG_TIMEOUT_NONE,
2704				MACH_PORT_NULL);
2705		if (fastpath(!kr)) {
2706			return _dispatch_kevent_mach_msg_recv(hdr);
2707		} else if (kr == MACH_RCV_TOO_LARGE) {
2708			_dispatch_log("BUG in libdispatch client: "
2709					"_dispatch_kevent_mach_msg_drain: dropped message too "
2710					"large to fit in memory: id = 0x%x, size = %lld",
2711					hdr->msgh_id, ke->ext[1]);
2712			kr = MACH_MSG_SUCCESS;
2713		}
2714	} else {
2715		// We don't know which port in the portset contains the large message,
2716		// so need to receive all messages pending on the portset to ensure the
2717		// large message is drained. <rdar://problem/13950432>
2718		bool received = false;
2719		for (;;) {
2720			if (!dispatch_assume(hdr)) {
2721				DISPATCH_CLIENT_CRASH("Message too large to fit in memory");
2722			}
2723			const mach_msg_option_t options = (DISPATCH_MACH_RCV_OPTIONS |
2724					MACH_RCV_TIMEOUT);
2725			kr = mach_msg(hdr, options, 0, siz, _dispatch_mach_recv_portset,
2726					MACH_MSG_TIMEOUT_NONE, MACH_PORT_NULL);
2727			if ((!kr || kr == MACH_RCV_TOO_LARGE) && !dispatch_assume(
2728					hdr->msgh_size <= UINT_MAX - dispatch_mach_trailer_size)) {
2729				DISPATCH_CRASH("Overlarge message");
2730			}
2731			if (fastpath(!kr)) {
2732				msgsiz = hdr->msgh_size + dispatch_mach_trailer_size;
2733				if (msgsiz < siz) {
2734					void *shrink = realloc(hdr, msgsiz);
2735					if (shrink) hdr = shrink;
2736				}
2737				_dispatch_kevent_mach_msg_recv(hdr);
2738				hdr = NULL;
2739				received = true;
2740			} else if (kr == MACH_RCV_TOO_LARGE) {
2741				siz = hdr->msgh_size + dispatch_mach_trailer_size;
2742			} else {
2743				if (kr == MACH_RCV_TIMED_OUT && received) {
2744					kr = MACH_MSG_SUCCESS;
2745				}
2746				break;
2747			}
2748			hdr = reallocf(hdr, siz);
2749		}
2750	}
2751	if (hdr != _dispatch_get_mach_recv_msg_buf()) {
2752		free(hdr);
2753	}
2754out:
2755	if (slowpath(kr)) {
2756		_dispatch_bug_mach_client("_dispatch_kevent_mach_msg_drain: "
2757				"message reception failed", kr);
2758	}
2759}
2760
2761static void
2762_dispatch_kevent_mach_msg_recv(mach_msg_header_t *hdr)
2763{
2764	dispatch_source_refs_t dri;
2765	dispatch_kevent_t dk;
2766	mach_port_t name = hdr->msgh_local_port;
2767	mach_msg_size_t siz = hdr->msgh_size + dispatch_mach_trailer_size;
2768
2769	if (!dispatch_assume(hdr->msgh_size <= UINT_MAX -
2770			dispatch_mach_trailer_size)) {
2771		_dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
2772				"received overlarge message");
2773		return _dispatch_kevent_mach_msg_destroy(hdr);
2774	}
2775	if (!dispatch_assume(name)) {
2776		_dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
2777				"received message with MACH_PORT_NULL port");
2778		return _dispatch_kevent_mach_msg_destroy(hdr);
2779	}
2780	_dispatch_debug_machport(name);
2781	dk = _dispatch_kevent_find(name, EVFILT_MACHPORT);
2782	if (!dispatch_assume(dk)) {
2783		_dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
2784				"received message with unknown kevent");
2785		return _dispatch_kevent_mach_msg_destroy(hdr);
2786	}
2787	_dispatch_kevent_debug(&dk->dk_kevent, __func__);
2788	TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) {
2789		dispatch_source_t dsi = _dispatch_source_from_refs(dri);
2790		if (dsi->ds_pending_data_mask & _DISPATCH_MACH_RECV_DIRECT_FLAGS) {
2791			return _dispatch_source_merge_mach_msg(dsi, dri, dk, hdr, siz);
2792		}
2793	}
2794	_dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
2795			"received message with no listeners");
2796	return _dispatch_kevent_mach_msg_destroy(hdr);
2797}
2798
2799static void
2800_dispatch_kevent_mach_msg_destroy(mach_msg_header_t *hdr)
2801{
2802	if (hdr) {
2803		mach_msg_destroy(hdr);
2804		if (hdr != _dispatch_get_mach_recv_msg_buf()) {
2805			free(hdr);
2806		}
2807	}
2808}
2809
2810static void
2811_dispatch_source_merge_mach_msg(dispatch_source_t ds, dispatch_source_refs_t dr,
2812		dispatch_kevent_t dk, mach_msg_header_t *hdr, mach_msg_size_t siz)
2813{
2814	if (ds == _dispatch_mach_notify_source) {
2815		_dispatch_mach_notify_source_invoke(hdr);
2816		return _dispatch_kevent_mach_msg_destroy(hdr);
2817	}
2818	dispatch_mach_reply_refs_t dmr = NULL;
2819	if (dk->dk_kevent.fflags & DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE) {
2820		dmr = (dispatch_mach_reply_refs_t)dr;
2821	}
2822	return _dispatch_mach_msg_recv((dispatch_mach_t)ds, dmr, hdr, siz);
2823}
2824
2825DISPATCH_ALWAYS_INLINE
2826static inline void
2827_dispatch_mach_notify_merge(mach_port_t name, uint32_t flag, bool final)
2828{
2829	dispatch_source_refs_t dri, dr_next;
2830	dispatch_kevent_t dk;
2831	struct kevent64_s kev;
2832	bool unreg;
2833
2834	dk = _dispatch_kevent_find(name, DISPATCH_EVFILT_MACH_NOTIFICATION);
2835	if (!dk) {
2836		return;
2837	}
2838
2839	// Update notification registration state.
2840	dk->dk_kevent.data &= ~_DISPATCH_MACH_SP_FLAGS;
2841	EV_SET64(&kev, name, DISPATCH_EVFILT_MACH_NOTIFICATION, EV_ADD|EV_ENABLE,
2842			flag, 0, (uintptr_t)dk, 0, 0);
2843	if (final) {
2844		// This can never happen again
2845		unreg = true;
2846	} else {
2847		// Re-register for notification before delivery
2848		unreg = _dispatch_kevent_resume(dk, flag, 0);
2849	}
2850	DISPATCH_MACH_KEVENT_ARMED(dk) = 0;
2851	TAILQ_FOREACH_SAFE(dri, &dk->dk_sources, dr_list, dr_next) {
2852		dispatch_source_t dsi = _dispatch_source_from_refs(dri);
2853		if (dx_type(dsi) == DISPATCH_MACH_CHANNEL_TYPE) {
2854			dispatch_mach_t dm = (dispatch_mach_t)dsi;
2855			_dispatch_mach_merge_kevent(dm, &kev);
2856			if (unreg && dm->dm_dkev) {
2857				_dispatch_mach_kevent_unregister(dm);
2858			}
2859		} else {
2860			_dispatch_source_merge_kevent(dsi, &kev);
2861			if (unreg) {
2862				_dispatch_source_kevent_unregister(dsi);
2863			}
2864		}
2865		if (!dr_next || DISPATCH_MACH_KEVENT_ARMED(dk)) {
2866			// current merge is last in list (dk might have been freed)
2867			// or it re-armed the notification
2868			return;
2869		}
2870	}
2871}
2872
2873static kern_return_t
2874_dispatch_mach_notify_update(dispatch_kevent_t dk, uint32_t new_flags,
2875		uint32_t del_flags, uint32_t mask, mach_msg_id_t notify_msgid,
2876		mach_port_mscount_t notify_sync)
2877{
2878	mach_port_t previous, port = (mach_port_t)dk->dk_kevent.ident;
2879	typeof(dk->dk_kevent.data) prev = dk->dk_kevent.data;
2880	kern_return_t kr, krr = 0;
2881
2882	// Update notification registration state.
2883	dk->dk_kevent.data |= (new_flags | dk->dk_kevent.fflags) & mask;
2884	dk->dk_kevent.data &= ~(del_flags & mask);
2885
2886	_dispatch_debug_machport(port);
2887	if ((dk->dk_kevent.data & mask) && !(prev & mask)) {
2888		// initialize _dispatch_mach_notify_port:
2889		(void)_dispatch_get_mach_recv_portset();
2890		_dispatch_debug("machport[0x%08x]: registering for send-possible "
2891				"notification", port);
2892		previous = MACH_PORT_NULL;
2893		krr = mach_port_request_notification(mach_task_self(), port,
2894				notify_msgid, notify_sync, _dispatch_mach_notify_port,
2895				MACH_MSG_TYPE_MAKE_SEND_ONCE, &previous);
2896		DISPATCH_VERIFY_MIG(krr);
2897
2898		switch(krr) {
2899		case KERN_INVALID_NAME:
2900		case KERN_INVALID_RIGHT:
2901			// Supress errors & clear registration state
2902			dk->dk_kevent.data &= ~mask;
2903			break;
2904		default:
2905			// Else, we dont expect any errors from mach. Log any errors
2906			if (dispatch_assume_zero(krr)) {
2907				// log the error & clear registration state
2908				dk->dk_kevent.data &= ~mask;
2909			} else if (dispatch_assume_zero(previous)) {
2910				// Another subsystem has beat libdispatch to requesting the
2911				// specified Mach notification on this port. We should
2912				// technically cache the previous port and message it when the
2913				// kernel messages our port. Or we can just say screw those
2914				// subsystems and deallocate the previous port.
2915				// They should adopt libdispatch :-P
2916				kr = mach_port_deallocate(mach_task_self(), previous);
2917				DISPATCH_VERIFY_MIG(kr);
2918				(void)dispatch_assume_zero(kr);
2919				previous = MACH_PORT_NULL;
2920			}
2921		}
2922	} else if (!(dk->dk_kevent.data & mask) && (prev & mask)) {
2923		_dispatch_debug("machport[0x%08x]: unregistering for send-possible "
2924				"notification", port);
2925		previous = MACH_PORT_NULL;
2926		kr = mach_port_request_notification(mach_task_self(), port,
2927				notify_msgid, notify_sync, MACH_PORT_NULL,
2928				MACH_MSG_TYPE_MOVE_SEND_ONCE, &previous);
2929		DISPATCH_VERIFY_MIG(kr);
2930
2931		switch (kr) {
2932		case KERN_INVALID_NAME:
2933		case KERN_INVALID_RIGHT:
2934		case KERN_INVALID_ARGUMENT:
2935			break;
2936		default:
2937			if (dispatch_assume_zero(kr)) {
2938				// log the error
2939			}
2940		}
2941	} else {
2942		return 0;
2943	}
2944	if (slowpath(previous)) {
2945		// the kernel has not consumed the send-once right yet
2946		(void)dispatch_assume_zero(
2947				_dispatch_send_consume_send_once_right(previous));
2948	}
2949	return krr;
2950}
2951
2952static void
2953_dispatch_mach_host_notify_update(void *context DISPATCH_UNUSED)
2954{
2955	(void)_dispatch_get_mach_recv_portset();
2956	_dispatch_debug("registering for calendar-change notification");
2957	kern_return_t kr = host_request_notification(_dispatch_get_mach_host_port(),
2958			HOST_NOTIFY_CALENDAR_CHANGE, _dispatch_mach_notify_port);
2959	DISPATCH_VERIFY_MIG(kr);
2960	(void)dispatch_assume_zero(kr);
2961}
2962
2963static void
2964_dispatch_mach_host_calendar_change_register(void)
2965{
2966	static dispatch_once_t pred;
2967	dispatch_once_f(&pred, NULL, _dispatch_mach_host_notify_update);
2968}
2969
2970static void
2971_dispatch_mach_notify_source_invoke(mach_msg_header_t *hdr)
2972{
2973	mig_reply_error_t reply;
2974	dispatch_assert(sizeof(mig_reply_error_t) == sizeof(union
2975		__ReplyUnion___dispatch_libdispatch_internal_protocol_subsystem));
2976	dispatch_assert(sizeof(mig_reply_error_t) < _dispatch_mach_recv_msg_size);
2977	boolean_t success = libdispatch_internal_protocol_server(hdr, &reply.Head);
2978	if (!success && reply.RetCode == MIG_BAD_ID && hdr->msgh_id == 950) {
2979		// host_notify_reply.defs: host_calendar_changed
2980		_dispatch_debug("calendar-change notification");
2981		_dispatch_timers_calendar_change();
2982		_dispatch_mach_host_notify_update(NULL);
2983		success = TRUE;
2984		reply.RetCode = KERN_SUCCESS;
2985	}
2986	if (dispatch_assume(success) && reply.RetCode != MIG_NO_REPLY) {
2987		(void)dispatch_assume_zero(reply.RetCode);
2988	}
2989}
2990
2991kern_return_t
2992_dispatch_mach_notify_port_deleted(mach_port_t notify DISPATCH_UNUSED,
2993		mach_port_name_t name)
2994{
2995#if DISPATCH_DEBUG
2996	_dispatch_log("Corruption: Mach send/send-once/dead-name right 0x%x "
2997			"deleted prematurely", name);
2998#endif
2999
3000	_dispatch_debug_machport(name);
3001	_dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_DELETED, true);
3002
3003	return KERN_SUCCESS;
3004}
3005
3006kern_return_t
3007_dispatch_mach_notify_dead_name(mach_port_t notify DISPATCH_UNUSED,
3008		mach_port_name_t name)
3009{
3010	kern_return_t kr;
3011
3012	_dispatch_debug("machport[0x%08x]: dead-name notification", name);
3013	_dispatch_debug_machport(name);
3014	_dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_DEAD, true);
3015
3016	// the act of receiving a dead name notification allocates a dead-name
3017	// right that must be deallocated
3018	kr = mach_port_deallocate(mach_task_self(), name);
3019	DISPATCH_VERIFY_MIG(kr);
3020	//(void)dispatch_assume_zero(kr);
3021
3022	return KERN_SUCCESS;
3023}
3024
3025kern_return_t
3026_dispatch_mach_notify_send_possible(mach_port_t notify DISPATCH_UNUSED,
3027		mach_port_name_t name)
3028{
3029	_dispatch_debug("machport[0x%08x]: send-possible notification", name);
3030	_dispatch_debug_machport(name);
3031	_dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_POSSIBLE, false);
3032
3033	return KERN_SUCCESS;
3034}
3035
3036#pragma mark -
3037#pragma mark dispatch_mach_t
3038
3039#define DISPATCH_MACH_NEVER_CONNECTED (UINT32_MAX/2)
3040#define DISPATCH_MACH_REGISTER_FOR_REPLY 0x2
3041#define DISPATCH_MACH_OPTIONS_MASK 0xffff
3042
3043static mach_port_t _dispatch_mach_msg_get_remote_port(dispatch_object_t dou);
3044static void _dispatch_mach_msg_disconnected(dispatch_mach_t dm,
3045		mach_port_t local_port, mach_port_t remote_port);
3046static dispatch_mach_msg_t _dispatch_mach_msg_create_reply_disconnected(
3047		dispatch_object_t dou, dispatch_mach_reply_refs_t dmr);
3048static bool _dispatch_mach_reconnect_invoke(dispatch_mach_t dm,
3049		dispatch_object_t dou);
3050static inline mach_msg_header_t* _dispatch_mach_msg_get_msg(
3051		dispatch_mach_msg_t dmsg);
3052static void _dispatch_mach_push(dispatch_object_t dm, dispatch_object_t dou,
3053		pthread_priority_t pp);
3054
3055static dispatch_mach_t
3056_dispatch_mach_create(const char *label, dispatch_queue_t q, void *context,
3057		dispatch_mach_handler_function_t handler, bool handler_is_block)
3058{
3059	dispatch_mach_t dm;
3060	dispatch_mach_refs_t dr;
3061
3062	dm = _dispatch_alloc(DISPATCH_VTABLE(mach),
3063			sizeof(struct dispatch_mach_s));
3064	_dispatch_queue_init((dispatch_queue_t)dm);
3065	dm->dq_label = label;
3066
3067	dm->do_ref_cnt++; // the reference _dispatch_mach_cancel_invoke holds
3068	dm->do_ref_cnt++; // since channel is created suspended
3069	dm->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_INTERVAL;
3070	dm->do_targetq = &_dispatch_mgr_q;
3071
3072	dr = _dispatch_calloc(1ul, sizeof(struct dispatch_mach_refs_s));
3073	dr->dr_source_wref = _dispatch_ptr2wref(dm);
3074	dr->dm_handler_func = handler;
3075	dr->dm_handler_ctxt = context;
3076	dm->ds_refs = dr;
3077	dm->dm_handler_is_block = handler_is_block;
3078
3079	dm->dm_refs = _dispatch_calloc(1ul,
3080			sizeof(struct dispatch_mach_send_refs_s));
3081	dm->dm_refs->dr_source_wref = _dispatch_ptr2wref(dm);
3082	dm->dm_refs->dm_disconnect_cnt = DISPATCH_MACH_NEVER_CONNECTED;
3083	TAILQ_INIT(&dm->dm_refs->dm_replies);
3084
3085	// First item on the channel sets the user-specified target queue
3086	dispatch_set_target_queue(dm, q);
3087	_dispatch_object_debug(dm, "%s", __func__);
3088	return dm;
3089}
3090
3091dispatch_mach_t
3092dispatch_mach_create(const char *label, dispatch_queue_t q,
3093		dispatch_mach_handler_t handler)
3094{
3095	dispatch_block_t bb = _dispatch_Block_copy((void*)handler);
3096	return _dispatch_mach_create(label, q, bb,
3097			(dispatch_mach_handler_function_t)_dispatch_Block_invoke(bb), true);
3098}
3099
3100dispatch_mach_t
3101dispatch_mach_create_f(const char *label, dispatch_queue_t q, void *context,
3102		dispatch_mach_handler_function_t handler)
3103{
3104	return _dispatch_mach_create(label, q, context, handler, false);
3105}
3106
3107void
3108_dispatch_mach_dispose(dispatch_mach_t dm)
3109{
3110	_dispatch_object_debug(dm, "%s", __func__);
3111	dispatch_mach_refs_t dr = dm->ds_refs;
3112	if (dm->dm_handler_is_block && dr->dm_handler_ctxt) {
3113		Block_release(dr->dm_handler_ctxt);
3114	}
3115	free(dr);
3116	free(dm->dm_refs);
3117	_dispatch_queue_destroy(dm);
3118}
3119
3120void
3121dispatch_mach_connect(dispatch_mach_t dm, mach_port_t receive,
3122		mach_port_t send, dispatch_mach_msg_t checkin)
3123{
3124	dispatch_mach_send_refs_t dr = dm->dm_refs;
3125	dispatch_kevent_t dk;
3126
3127	if (MACH_PORT_VALID(receive)) {
3128		dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s));
3129		dk->dk_kevent = _dispatch_source_type_mach_recv_direct.ke;
3130		dk->dk_kevent.ident = receive;
3131		dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
3132		dk->dk_kevent.udata = (uintptr_t)dk;
3133		TAILQ_INIT(&dk->dk_sources);
3134		dm->ds_dkev = dk;
3135		dm->ds_pending_data_mask = dk->dk_kevent.fflags;
3136		_dispatch_retain(dm); // the reference the manager queue holds
3137	}
3138	dr->dm_send = send;
3139	if (MACH_PORT_VALID(send)) {
3140		if (checkin) {
3141			dispatch_retain(checkin);
3142			mach_msg_option_t options = _dispatch_mach_checkin_options();
3143			_dispatch_mach_msg_set_options(checkin, options);
3144			dr->dm_checkin_port = _dispatch_mach_msg_get_remote_port(checkin);
3145		}
3146		dr->dm_checkin = checkin;
3147	}
3148	// monitor message reply ports
3149	dm->ds_pending_data_mask |= DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE;
3150	if (slowpath(!dispatch_atomic_cmpxchg2o(dr, dm_disconnect_cnt,
3151			DISPATCH_MACH_NEVER_CONNECTED, 0, release))) {
3152		DISPATCH_CLIENT_CRASH("Channel already connected");
3153	}
3154	_dispatch_object_debug(dm, "%s", __func__);
3155	return dispatch_resume(dm);
3156}
3157
3158DISPATCH_NOINLINE
3159static void
3160_dispatch_mach_reply_kevent_unregister(dispatch_mach_t dm,
3161		dispatch_mach_reply_refs_t dmr, bool disconnected)
3162{
3163	dispatch_mach_msg_t dmsgr = NULL;
3164	if (disconnected) {
3165		dmsgr = _dispatch_mach_msg_create_reply_disconnected(NULL, dmr);
3166	}
3167	dispatch_kevent_t dk = dmr->dmr_dkev;
3168	TAILQ_REMOVE(&dk->dk_sources, (dispatch_source_refs_t)dmr, dr_list);
3169	_dispatch_kevent_unregister(dk, DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE);
3170	TAILQ_REMOVE(&dm->dm_refs->dm_replies, dmr, dmr_list);
3171	if (dmr->dmr_voucher) _voucher_release(dmr->dmr_voucher);
3172	free(dmr);
3173	if (dmsgr) _dispatch_mach_push(dm, dmsgr, dmsgr->dmsg_priority);
3174}
3175
3176DISPATCH_NOINLINE
3177static void
3178_dispatch_mach_reply_kevent_register(dispatch_mach_t dm, mach_port_t reply,
3179		dispatch_mach_msg_t dmsg)
3180{
3181	dispatch_kevent_t dk;
3182	dispatch_mach_reply_refs_t dmr;
3183
3184	dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s));
3185	dk->dk_kevent = _dispatch_source_type_mach_recv_direct.ke;
3186	dk->dk_kevent.ident = reply;
3187	dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
3188	dk->dk_kevent.fflags = DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE;
3189	dk->dk_kevent.udata = (uintptr_t)dk;
3190	TAILQ_INIT(&dk->dk_sources);
3191
3192	dmr = _dispatch_calloc(1ul, sizeof(struct dispatch_mach_reply_refs_s));
3193	dmr->dr_source_wref = _dispatch_ptr2wref(dm);
3194	dmr->dmr_dkev = dk;
3195	if (dmsg->dmsg_voucher) {
3196		dmr->dmr_voucher =_voucher_retain(dmsg->dmsg_voucher);
3197	}
3198	dmr->dmr_priority = dmsg->dmsg_priority;
3199	// make reply context visible to leaks rdar://11777199
3200	dmr->dmr_ctxt = dmsg->do_ctxt;
3201
3202	_dispatch_debug("machport[0x%08x]: registering for reply, ctxt %p", reply,
3203			dmsg->do_ctxt);
3204	uint32_t flags;
3205	bool do_resume = _dispatch_kevent_register(&dmr->dmr_dkev, &flags);
3206	TAILQ_INSERT_TAIL(&dmr->dmr_dkev->dk_sources, (dispatch_source_refs_t)dmr,
3207			dr_list);
3208	TAILQ_INSERT_TAIL(&dm->dm_refs->dm_replies, dmr, dmr_list);
3209	if (do_resume && _dispatch_kevent_resume(dmr->dmr_dkev, flags, 0)) {
3210		_dispatch_mach_reply_kevent_unregister(dm, dmr, true);
3211	}
3212}
3213
3214DISPATCH_NOINLINE
3215static void
3216_dispatch_mach_kevent_unregister(dispatch_mach_t dm)
3217{
3218	dispatch_kevent_t dk = dm->dm_dkev;
3219	dm->dm_dkev = NULL;
3220	TAILQ_REMOVE(&dk->dk_sources, (dispatch_source_refs_t)dm->dm_refs,
3221			dr_list);
3222	dm->ds_pending_data_mask &= ~(unsigned long)
3223			(DISPATCH_MACH_SEND_POSSIBLE|DISPATCH_MACH_SEND_DEAD);
3224	_dispatch_kevent_unregister(dk,
3225			DISPATCH_MACH_SEND_POSSIBLE|DISPATCH_MACH_SEND_DEAD);
3226}
3227
3228DISPATCH_NOINLINE
3229static void
3230_dispatch_mach_kevent_register(dispatch_mach_t dm, mach_port_t send)
3231{
3232	dispatch_kevent_t dk;
3233
3234	dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s));
3235	dk->dk_kevent = _dispatch_source_type_mach_send.ke;
3236	dk->dk_kevent.ident = send;
3237	dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
3238	dk->dk_kevent.fflags = DISPATCH_MACH_SEND_POSSIBLE|DISPATCH_MACH_SEND_DEAD;
3239	dk->dk_kevent.udata = (uintptr_t)dk;
3240	TAILQ_INIT(&dk->dk_sources);
3241
3242	dm->ds_pending_data_mask |= dk->dk_kevent.fflags;
3243
3244	uint32_t flags;
3245	bool do_resume = _dispatch_kevent_register(&dk, &flags);
3246	TAILQ_INSERT_TAIL(&dk->dk_sources,
3247			(dispatch_source_refs_t)dm->dm_refs, dr_list);
3248	dm->dm_dkev = dk;
3249	if (do_resume && _dispatch_kevent_resume(dm->dm_dkev, flags, 0)) {
3250		_dispatch_mach_kevent_unregister(dm);
3251	}
3252}
3253
3254static inline void
3255_dispatch_mach_push(dispatch_object_t dm, dispatch_object_t dou,
3256		pthread_priority_t pp)
3257{
3258	return _dispatch_queue_push(dm._dq, dou, pp);
3259}
3260
3261static inline void
3262_dispatch_mach_msg_set_options(dispatch_object_t dou, mach_msg_option_t options)
3263{
3264	dou._do->do_suspend_cnt = (unsigned int)options;
3265}
3266
3267static inline mach_msg_option_t
3268_dispatch_mach_msg_get_options(dispatch_object_t dou)
3269{
3270	mach_msg_option_t options = (mach_msg_option_t)dou._do->do_suspend_cnt;
3271	return options;
3272}
3273
3274static inline void
3275_dispatch_mach_msg_set_reason(dispatch_object_t dou, mach_error_t err,
3276		unsigned long reason)
3277{
3278	dispatch_assert_zero(reason & ~(unsigned long)code_emask);
3279	dou._do->do_suspend_cnt =  (unsigned int)((err || !reason) ? err :
3280			 err_local|err_sub(0x3e0)|(mach_error_t)reason);
3281}
3282
3283static inline unsigned long
3284_dispatch_mach_msg_get_reason(dispatch_object_t dou, mach_error_t *err_ptr)
3285{
3286	mach_error_t err = (mach_error_t)dou._do->do_suspend_cnt;
3287	dou._do->do_suspend_cnt = 0;
3288	if ((err & system_emask) == err_local && err_get_sub(err) == 0x3e0) {
3289		*err_ptr = 0;
3290		return err_get_code(err);
3291	}
3292	*err_ptr = err;
3293	return err ? DISPATCH_MACH_MESSAGE_SEND_FAILED : DISPATCH_MACH_MESSAGE_SENT;
3294}
3295
3296static void
3297_dispatch_mach_msg_recv(dispatch_mach_t dm, dispatch_mach_reply_refs_t dmr,
3298		mach_msg_header_t *hdr, mach_msg_size_t siz)
3299{
3300	_dispatch_debug_machport(hdr->msgh_remote_port);
3301	_dispatch_debug("machport[0x%08x]: received msg id 0x%x, reply on 0x%08x",
3302			hdr->msgh_local_port, hdr->msgh_id, hdr->msgh_remote_port);
3303	if (slowpath(dm->ds_atomic_flags & DSF_CANCELED)) {
3304		return _dispatch_kevent_mach_msg_destroy(hdr);
3305	}
3306	dispatch_mach_msg_t dmsg;
3307	voucher_t voucher;
3308	pthread_priority_t priority;
3309	void *ctxt = NULL;
3310	if (dmr) {
3311		_voucher_mach_msg_clear(hdr, false); // deallocate reply message voucher
3312		voucher = dmr->dmr_voucher;
3313		dmr->dmr_voucher = NULL; // transfer reference
3314		priority = dmr->dmr_priority;
3315		ctxt = dmr->dmr_ctxt;
3316		_dispatch_mach_reply_kevent_unregister(dm, dmr, false);
3317	} else {
3318		voucher = voucher_create_with_mach_msg(hdr);
3319		priority = _voucher_get_priority(voucher);
3320	}
3321	dispatch_mach_msg_destructor_t destructor;
3322	destructor = (hdr == _dispatch_get_mach_recv_msg_buf()) ?
3323			DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT :
3324			DISPATCH_MACH_MSG_DESTRUCTOR_FREE;
3325	dmsg = dispatch_mach_msg_create(hdr, siz, destructor, NULL);
3326	dmsg->dmsg_voucher = voucher;
3327	dmsg->dmsg_priority = priority;
3328	dmsg->do_ctxt = ctxt;
3329	_dispatch_mach_msg_set_reason(dmsg, 0, DISPATCH_MACH_MESSAGE_RECEIVED);
3330	_dispatch_voucher_debug("mach-msg[%p] create", voucher, dmsg);
3331	_dispatch_voucher_ktrace_dmsg_push(dmsg);
3332	return _dispatch_mach_push(dm, dmsg, dmsg->dmsg_priority);
3333}
3334
3335static inline mach_port_t
3336_dispatch_mach_msg_get_remote_port(dispatch_object_t dou)
3337{
3338	mach_msg_header_t *hdr = _dispatch_mach_msg_get_msg(dou._dmsg);
3339	mach_port_t remote = hdr->msgh_remote_port;
3340	return remote;
3341}
3342
3343static inline void
3344_dispatch_mach_msg_disconnected(dispatch_mach_t dm, mach_port_t local_port,
3345		mach_port_t remote_port)
3346{
3347	mach_msg_header_t *hdr;
3348	dispatch_mach_msg_t dmsg;
3349	dmsg = dispatch_mach_msg_create(NULL, sizeof(mach_msg_header_t),
3350			DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT, &hdr);
3351	if (local_port) hdr->msgh_local_port = local_port;
3352	if (remote_port) hdr->msgh_remote_port = remote_port;
3353	_dispatch_mach_msg_set_reason(dmsg, 0, DISPATCH_MACH_DISCONNECTED);
3354	return _dispatch_mach_push(dm, dmsg, dmsg->dmsg_priority);
3355}
3356
3357static inline dispatch_mach_msg_t
3358_dispatch_mach_msg_create_reply_disconnected(dispatch_object_t dou,
3359		dispatch_mach_reply_refs_t dmr)
3360{
3361	dispatch_mach_msg_t dmsg = dou._dmsg, dmsgr;
3362	if (dmsg && !dmsg->dmsg_reply) return NULL;
3363	mach_msg_header_t *hdr;
3364	dmsgr = dispatch_mach_msg_create(NULL, sizeof(mach_msg_header_t),
3365			DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT, &hdr);
3366	if (dmsg) {
3367		hdr->msgh_local_port = dmsg->dmsg_reply;
3368		if (dmsg->dmsg_voucher) {
3369			dmsgr->dmsg_voucher = _voucher_retain(dmsg->dmsg_voucher);
3370		}
3371		dmsgr->dmsg_priority = dmsg->dmsg_priority;
3372		dmsgr->do_ctxt = dmsg->do_ctxt;
3373	} else {
3374		hdr->msgh_local_port = (mach_port_t)dmr->dmr_dkev->dk_kevent.ident;
3375		dmsgr->dmsg_voucher = dmr->dmr_voucher;
3376		dmr->dmr_voucher = NULL;  // transfer reference
3377		dmsgr->dmsg_priority = dmr->dmr_priority;
3378		dmsgr->do_ctxt = dmr->dmr_ctxt;
3379	}
3380	_dispatch_mach_msg_set_reason(dmsgr, 0, DISPATCH_MACH_DISCONNECTED);
3381	return dmsgr;
3382}
3383
3384DISPATCH_NOINLINE
3385static void
3386_dispatch_mach_msg_not_sent(dispatch_mach_t dm, dispatch_object_t dou)
3387{
3388	dispatch_mach_msg_t dmsg = dou._dmsg, dmsgr;
3389	dmsgr = _dispatch_mach_msg_create_reply_disconnected(dmsg, NULL);
3390	_dispatch_mach_msg_set_reason(dmsg, 0, DISPATCH_MACH_MESSAGE_NOT_SENT);
3391	_dispatch_mach_push(dm, dmsg, dmsg->dmsg_priority);
3392	if (dmsgr) _dispatch_mach_push(dm, dmsgr, dmsgr->dmsg_priority);
3393}
3394
3395DISPATCH_NOINLINE
3396static dispatch_object_t
3397_dispatch_mach_msg_send(dispatch_mach_t dm, dispatch_object_t dou)
3398{
3399	dispatch_mach_send_refs_t dr = dm->dm_refs;
3400	dispatch_mach_msg_t dmsg = dou._dmsg, dmsgr = NULL;
3401	voucher_t voucher = dmsg->dmsg_voucher;
3402	mach_voucher_t ipc_kvoucher = MACH_VOUCHER_NULL;
3403	bool clear_voucher = false, kvoucher_move_send = false;
3404	dr->dm_needs_mgr = 0;
3405	if (slowpath(dr->dm_checkin) && dmsg != dr->dm_checkin) {
3406		// send initial checkin message
3407		if (dm->dm_dkev && slowpath(_dispatch_queue_get_current() !=
3408				&_dispatch_mgr_q)) {
3409			// send kevent must be uninstalled on the manager queue
3410			dr->dm_needs_mgr = 1;
3411			goto out;
3412		}
3413		dr->dm_checkin = _dispatch_mach_msg_send(dm, dr->dm_checkin)._dmsg;
3414		if (slowpath(dr->dm_checkin)) {
3415			goto out;
3416		}
3417	}
3418	mach_msg_header_t *msg = _dispatch_mach_msg_get_msg(dmsg);
3419	mach_msg_return_t kr = 0;
3420	mach_port_t reply = dmsg->dmsg_reply;
3421	mach_msg_option_t opts = 0, msg_opts = _dispatch_mach_msg_get_options(dmsg);
3422	if (!slowpath(msg_opts & DISPATCH_MACH_REGISTER_FOR_REPLY)) {
3423		opts = MACH_SEND_MSG | (msg_opts & ~DISPATCH_MACH_OPTIONS_MASK);
3424		if (MACH_MSGH_BITS_REMOTE(msg->msgh_bits) !=
3425				MACH_MSG_TYPE_MOVE_SEND_ONCE) {
3426			if (dmsg != dr->dm_checkin) {
3427				msg->msgh_remote_port = dr->dm_send;
3428			}
3429			if (_dispatch_queue_get_current() == &_dispatch_mgr_q) {
3430				if (slowpath(!dm->dm_dkev)) {
3431					_dispatch_mach_kevent_register(dm, msg->msgh_remote_port);
3432				}
3433				if (fastpath(dm->dm_dkev)) {
3434					if (DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev)) {
3435						goto out;
3436					}
3437					opts |= MACH_SEND_NOTIFY;
3438				}
3439			}
3440			opts |= MACH_SEND_TIMEOUT;
3441			if (dmsg->dmsg_priority != _voucher_get_priority(voucher)) {
3442				ipc_kvoucher = _voucher_create_mach_voucher_with_priority(
3443						voucher, dmsg->dmsg_priority);
3444			}
3445			_dispatch_voucher_debug("mach-msg[%p] msg_set", voucher, dmsg);
3446			if (ipc_kvoucher) {
3447				kvoucher_move_send = true;
3448				clear_voucher = _voucher_mach_msg_set_mach_voucher(msg,
3449						ipc_kvoucher, kvoucher_move_send);
3450			} else {
3451				clear_voucher = _voucher_mach_msg_set(msg, voucher);
3452			}
3453		}
3454		_voucher_activity_trace_msg(voucher, msg, send);
3455		_dispatch_debug_machport(msg->msgh_remote_port);
3456		if (reply) _dispatch_debug_machport(reply);
3457		kr = mach_msg(msg, opts, msg->msgh_size, 0, MACH_PORT_NULL, 0,
3458				MACH_PORT_NULL);
3459		_dispatch_debug("machport[0x%08x]: sent msg id 0x%x, ctxt %p, "
3460				"opts 0x%x, msg_opts 0x%x, kvoucher 0x%08x, reply on 0x%08x: "
3461				"%s - 0x%x", msg->msgh_remote_port, msg->msgh_id, dmsg->do_ctxt,
3462				opts, msg_opts, msg->msgh_voucher_port, reply,
3463				mach_error_string(kr), kr);
3464		if (clear_voucher) {
3465			if (kr == MACH_SEND_INVALID_VOUCHER && msg->msgh_voucher_port) {
3466				DISPATCH_CRASH("Voucher port corruption");
3467			}
3468			mach_voucher_t kv;
3469			kv = _voucher_mach_msg_clear(msg, kvoucher_move_send);
3470			if (kvoucher_move_send) ipc_kvoucher = kv;
3471		}
3472	}
3473	if (kr == MACH_SEND_TIMED_OUT && (opts & MACH_SEND_TIMEOUT)) {
3474		if (opts & MACH_SEND_NOTIFY) {
3475			_dispatch_debug("machport[0x%08x]: send-possible notification "
3476					"armed", (mach_port_t)dm->dm_dkev->dk_kevent.ident);
3477			DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev) = 1;
3478		} else {
3479			// send kevent must be installed on the manager queue
3480			dr->dm_needs_mgr = 1;
3481		}
3482		if (ipc_kvoucher) {
3483			_dispatch_kvoucher_debug("reuse on re-send", ipc_kvoucher);
3484			voucher_t ipc_voucher;
3485			ipc_voucher = _voucher_create_with_priority_and_mach_voucher(
3486					voucher, dmsg->dmsg_priority, ipc_kvoucher);
3487			_dispatch_voucher_debug("mach-msg[%p] replace voucher[%p]",
3488					ipc_voucher, dmsg, voucher);
3489			if (dmsg->dmsg_voucher) _voucher_release(dmsg->dmsg_voucher);
3490			dmsg->dmsg_voucher = ipc_voucher;
3491		}
3492		goto out;
3493	} else if (ipc_kvoucher && (kr || !kvoucher_move_send)) {
3494		_voucher_dealloc_mach_voucher(ipc_kvoucher);
3495	}
3496	if (fastpath(!kr) && reply &&
3497			!(dm->ds_dkev && dm->ds_dkev->dk_kevent.ident == reply)) {
3498		if (_dispatch_queue_get_current() != &_dispatch_mgr_q) {
3499			// reply receive kevent must be installed on the manager queue
3500			dr->dm_needs_mgr = 1;
3501			_dispatch_mach_msg_set_options(dmsg, msg_opts |
3502					DISPATCH_MACH_REGISTER_FOR_REPLY);
3503			goto out;
3504		}
3505		_dispatch_mach_reply_kevent_register(dm, reply, dmsg);
3506	}
3507	if (slowpath(dmsg == dr->dm_checkin) && dm->dm_dkev) {
3508		_dispatch_mach_kevent_unregister(dm);
3509	}
3510	if (slowpath(kr)) {
3511		// Send failed, so reply was never connected <rdar://problem/14309159>
3512		dmsgr = _dispatch_mach_msg_create_reply_disconnected(dmsg, NULL);
3513	}
3514	_dispatch_mach_msg_set_reason(dmsg, kr, 0);
3515	_dispatch_mach_push(dm, dmsg, dmsg->dmsg_priority);
3516	if (dmsgr) _dispatch_mach_push(dm, dmsgr, dmsgr->dmsg_priority);
3517	dmsg = NULL;
3518out:
3519	return (dispatch_object_t)dmsg;
3520}
3521
3522DISPATCH_ALWAYS_INLINE
3523static inline void
3524_dispatch_mach_send_push_wakeup(dispatch_mach_t dm, dispatch_object_t dou,
3525		bool wakeup)
3526{
3527	dispatch_mach_send_refs_t dr = dm->dm_refs;
3528	struct dispatch_object_s *prev, *dc = dou._do;
3529	dc->do_next = NULL;
3530
3531	prev = dispatch_atomic_xchg2o(dr, dm_tail, dc, release);
3532	if (fastpath(prev)) {
3533		prev->do_next = dc;
3534	} else {
3535		dr->dm_head = dc;
3536	}
3537	if (wakeup || !prev) {
3538		_dispatch_wakeup(dm);
3539	}
3540}
3541
3542DISPATCH_ALWAYS_INLINE
3543static inline void
3544_dispatch_mach_send_push(dispatch_mach_t dm, dispatch_object_t dou)
3545{
3546	return _dispatch_mach_send_push_wakeup(dm, dou, false);
3547}
3548
3549DISPATCH_NOINLINE
3550static void
3551_dispatch_mach_send_drain(dispatch_mach_t dm)
3552{
3553	dispatch_mach_send_refs_t dr = dm->dm_refs;
3554	struct dispatch_object_s *dc = NULL, *next_dc = NULL;
3555	while (dr->dm_tail) {
3556		_dispatch_wait_until(dc = fastpath(dr->dm_head));
3557		do {
3558			next_dc = fastpath(dc->do_next);
3559			dr->dm_head = next_dc;
3560			if (!next_dc && !dispatch_atomic_cmpxchg2o(dr, dm_tail, dc, NULL,
3561					relaxed)) {
3562				_dispatch_wait_until(next_dc = fastpath(dc->do_next));
3563				dr->dm_head = next_dc;
3564			}
3565			if (!DISPATCH_OBJ_IS_VTABLE(dc)) {
3566				if ((long)dc->do_vtable & DISPATCH_OBJ_BARRIER_BIT) {
3567					// send barrier
3568					// leave send queue locked until barrier has completed
3569					return _dispatch_mach_push(dm, dc,
3570							((dispatch_continuation_t)dc)->dc_priority);
3571				}
3572#if DISPATCH_MACH_SEND_SYNC
3573				if (slowpath((long)dc->do_vtable & DISPATCH_OBJ_SYNC_SLOW_BIT)){
3574					_dispatch_thread_semaphore_signal(
3575							(_dispatch_thread_semaphore_t)dc->do_ctxt);
3576					continue;
3577				}
3578#endif // DISPATCH_MACH_SEND_SYNC
3579				if (slowpath(!_dispatch_mach_reconnect_invoke(dm, dc))) {
3580					goto out;
3581				}
3582				continue;
3583			}
3584			_dispatch_voucher_ktrace_dmsg_pop((dispatch_mach_msg_t)dc);
3585			if (slowpath(dr->dm_disconnect_cnt) ||
3586					slowpath(dm->ds_atomic_flags & DSF_CANCELED)) {
3587				_dispatch_mach_msg_not_sent(dm, dc);
3588				continue;
3589			}
3590			if (slowpath(dc = _dispatch_mach_msg_send(dm, dc)._do)) {
3591				goto out;
3592			}
3593		} while ((dc = next_dc));
3594	}
3595out:
3596	// if this is not a complete drain, we must undo some things
3597	if (slowpath(dc)) {
3598		if (!next_dc &&
3599				!dispatch_atomic_cmpxchg2o(dr, dm_tail, NULL, dc, relaxed)) {
3600			// wait for enqueue slow path to finish
3601			_dispatch_wait_until(next_dc = fastpath(dr->dm_head));
3602			dc->do_next = next_dc;
3603		}
3604		dr->dm_head = dc;
3605	}
3606	(void)dispatch_atomic_dec2o(dr, dm_sending, release);
3607	_dispatch_wakeup(dm);
3608}
3609
3610static inline void
3611_dispatch_mach_send(dispatch_mach_t dm)
3612{
3613	dispatch_mach_send_refs_t dr = dm->dm_refs;
3614	if (!fastpath(dr->dm_tail) || !fastpath(dispatch_atomic_cmpxchg2o(dr,
3615			dm_sending, 0, 1, acquire))) {
3616		return;
3617	}
3618	_dispatch_object_debug(dm, "%s", __func__);
3619	_dispatch_mach_send_drain(dm);
3620}
3621
3622DISPATCH_NOINLINE
3623static void
3624_dispatch_mach_merge_kevent(dispatch_mach_t dm, const struct kevent64_s *ke)
3625{
3626	if (!(ke->fflags & dm->ds_pending_data_mask)) {
3627		return;
3628	}
3629	_dispatch_mach_send(dm);
3630}
3631
3632static inline mach_msg_option_t
3633_dispatch_mach_checkin_options(void)
3634{
3635	mach_msg_option_t options = 0;
3636#if DISPATCH_USE_CHECKIN_NOIMPORTANCE
3637	options = MACH_SEND_NOIMPORTANCE; // <rdar://problem/16996737>
3638#endif
3639	return options;
3640}
3641
3642
3643static inline mach_msg_option_t
3644_dispatch_mach_send_options(void)
3645{
3646	mach_msg_option_t options = 0;
3647	return options;
3648}
3649
3650DISPATCH_NOINLINE
3651void
3652dispatch_mach_send(dispatch_mach_t dm, dispatch_mach_msg_t dmsg,
3653		mach_msg_option_t options)
3654{
3655	dispatch_mach_send_refs_t dr = dm->dm_refs;
3656	if (slowpath(dmsg->do_next != DISPATCH_OBJECT_LISTLESS)) {
3657		DISPATCH_CLIENT_CRASH("Message already enqueued");
3658	}
3659	dispatch_retain(dmsg);
3660	dispatch_assert_zero(options & DISPATCH_MACH_OPTIONS_MASK);
3661	options |= _dispatch_mach_send_options();
3662	_dispatch_mach_msg_set_options(dmsg, options & ~DISPATCH_MACH_OPTIONS_MASK);
3663	mach_msg_header_t *msg = _dispatch_mach_msg_get_msg(dmsg);
3664	dmsg->dmsg_reply = (MACH_MSGH_BITS_LOCAL(msg->msgh_bits) ==
3665			MACH_MSG_TYPE_MAKE_SEND_ONCE &&
3666			MACH_PORT_VALID(msg->msgh_local_port) ? msg->msgh_local_port :
3667			MACH_PORT_NULL);
3668	bool is_reply = (MACH_MSGH_BITS_REMOTE(msg->msgh_bits) ==
3669			MACH_MSG_TYPE_MOVE_SEND_ONCE);
3670	dmsg->dmsg_priority = _dispatch_priority_propagate();
3671	dmsg->dmsg_voucher = _voucher_copy();
3672	_dispatch_voucher_debug("mach-msg[%p] set", dmsg->dmsg_voucher, dmsg);
3673	if ((!is_reply && slowpath(dr->dm_tail)) ||
3674			slowpath(dr->dm_disconnect_cnt) ||
3675			slowpath(dm->ds_atomic_flags & DSF_CANCELED) ||
3676			slowpath(!dispatch_atomic_cmpxchg2o(dr, dm_sending, 0, 1,
3677					acquire))) {
3678		_dispatch_voucher_ktrace_dmsg_push(dmsg);
3679		return _dispatch_mach_send_push(dm, dmsg);
3680	}
3681	if (slowpath(dmsg = _dispatch_mach_msg_send(dm, dmsg)._dmsg)) {
3682		(void)dispatch_atomic_dec2o(dr, dm_sending, release);
3683		_dispatch_voucher_ktrace_dmsg_push(dmsg);
3684		return _dispatch_mach_send_push_wakeup(dm, dmsg, true);
3685	}
3686	if (!is_reply && slowpath(dr->dm_tail)) {
3687		return _dispatch_mach_send_drain(dm);
3688	}
3689	(void)dispatch_atomic_dec2o(dr, dm_sending, release);
3690	_dispatch_wakeup(dm);
3691}
3692
3693static void
3694_dispatch_mach_disconnect(dispatch_mach_t dm)
3695{
3696	dispatch_mach_send_refs_t dr = dm->dm_refs;
3697	if (dm->dm_dkev) {
3698		_dispatch_mach_kevent_unregister(dm);
3699	}
3700	if (MACH_PORT_VALID(dr->dm_send)) {
3701		_dispatch_mach_msg_disconnected(dm, MACH_PORT_NULL, dr->dm_send);
3702	}
3703	dr->dm_send = MACH_PORT_NULL;
3704	if (dr->dm_checkin) {
3705		_dispatch_mach_msg_not_sent(dm, dr->dm_checkin);
3706		dr->dm_checkin = NULL;
3707	}
3708	if (!TAILQ_EMPTY(&dm->dm_refs->dm_replies)) {
3709		dispatch_mach_reply_refs_t dmr, tmp;
3710		TAILQ_FOREACH_SAFE(dmr, &dm->dm_refs->dm_replies, dmr_list, tmp){
3711			_dispatch_mach_reply_kevent_unregister(dm, dmr, true);
3712		}
3713	}
3714}
3715
3716DISPATCH_NOINLINE
3717static bool
3718_dispatch_mach_cancel(dispatch_mach_t dm)
3719{
3720	dispatch_mach_send_refs_t dr = dm->dm_refs;
3721	if (!fastpath(dispatch_atomic_cmpxchg2o(dr, dm_sending, 0, 1, acquire))) {
3722		return false;
3723	}
3724	_dispatch_object_debug(dm, "%s", __func__);
3725	_dispatch_mach_disconnect(dm);
3726	if (dm->ds_dkev) {
3727		mach_port_t local_port = (mach_port_t)dm->ds_dkev->dk_kevent.ident;
3728		_dispatch_source_kevent_unregister((dispatch_source_t)dm);
3729		_dispatch_mach_msg_disconnected(dm, local_port, MACH_PORT_NULL);
3730	}
3731	(void)dispatch_atomic_dec2o(dr, dm_sending, release);
3732	return true;
3733}
3734
3735DISPATCH_NOINLINE
3736static bool
3737_dispatch_mach_reconnect_invoke(dispatch_mach_t dm, dispatch_object_t dou)
3738{
3739	if (dm->dm_dkev || !TAILQ_EMPTY(&dm->dm_refs->dm_replies)) {
3740		if (slowpath(_dispatch_queue_get_current() != &_dispatch_mgr_q)) {
3741			// send/reply kevents must be uninstalled on the manager queue
3742			return false;
3743		}
3744	}
3745	_dispatch_mach_disconnect(dm);
3746	dispatch_mach_send_refs_t dr = dm->dm_refs;
3747	dr->dm_checkin = dou._dc->dc_data;
3748	dr->dm_send = (mach_port_t)dou._dc->dc_other;
3749	_dispatch_continuation_free(dou._dc);
3750	(void)dispatch_atomic_dec2o(dr, dm_disconnect_cnt, relaxed);
3751	_dispatch_object_debug(dm, "%s", __func__);
3752	return true;
3753}
3754
3755DISPATCH_NOINLINE
3756void
3757dispatch_mach_reconnect(dispatch_mach_t dm, mach_port_t send,
3758		dispatch_mach_msg_t checkin)
3759{
3760	dispatch_mach_send_refs_t dr = dm->dm_refs;
3761	(void)dispatch_atomic_inc2o(dr, dm_disconnect_cnt, relaxed);
3762	if (MACH_PORT_VALID(send) && checkin) {
3763		dispatch_retain(checkin);
3764		mach_msg_option_t options = _dispatch_mach_checkin_options();
3765		_dispatch_mach_msg_set_options(checkin, options);
3766		dr->dm_checkin_port = _dispatch_mach_msg_get_remote_port(checkin);
3767	} else {
3768		checkin = NULL;
3769		dr->dm_checkin_port = MACH_PORT_NULL;
3770	}
3771	dispatch_continuation_t dc = _dispatch_continuation_alloc();
3772	dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT);
3773	dc->dc_func = (void*)_dispatch_mach_reconnect_invoke;
3774	dc->dc_ctxt = dc;
3775	dc->dc_data = checkin;
3776	dc->dc_other = (void*)(uintptr_t)send;
3777	return _dispatch_mach_send_push(dm, dc);
3778}
3779
3780#if DISPATCH_MACH_SEND_SYNC
3781DISPATCH_NOINLINE
3782static void
3783_dispatch_mach_send_sync_slow(dispatch_mach_t dm)
3784{
3785	_dispatch_thread_semaphore_t sema = _dispatch_get_thread_semaphore();
3786	struct dispatch_object_s dc = {
3787		.do_vtable = (void *)(DISPATCH_OBJ_SYNC_SLOW_BIT),
3788		.do_ctxt = (void*)sema,
3789	};
3790	_dispatch_mach_send_push(dm, &dc);
3791	_dispatch_thread_semaphore_wait(sema);
3792	_dispatch_put_thread_semaphore(sema);
3793}
3794#endif // DISPATCH_MACH_SEND_SYNC
3795
3796DISPATCH_NOINLINE
3797mach_port_t
3798dispatch_mach_get_checkin_port(dispatch_mach_t dm)
3799{
3800	dispatch_mach_send_refs_t dr = dm->dm_refs;
3801	if (slowpath(dm->ds_atomic_flags & DSF_CANCELED)) {
3802		return MACH_PORT_DEAD;
3803	}
3804	return dr->dm_checkin_port;
3805}
3806
3807DISPATCH_NOINLINE
3808static void
3809_dispatch_mach_connect_invoke(dispatch_mach_t dm)
3810{
3811	dispatch_mach_refs_t dr = dm->ds_refs;
3812	_dispatch_client_callout4(dr->dm_handler_ctxt,
3813			DISPATCH_MACH_CONNECTED, NULL, 0, dr->dm_handler_func);
3814	dm->dm_connect_handler_called = 1;
3815}
3816
3817DISPATCH_NOINLINE
3818void
3819_dispatch_mach_msg_invoke(dispatch_mach_msg_t dmsg)
3820{
3821	dispatch_mach_t dm = (dispatch_mach_t)_dispatch_queue_get_current();
3822	dispatch_mach_refs_t dr = dm->ds_refs;
3823	mach_error_t err;
3824	unsigned long reason = _dispatch_mach_msg_get_reason(dmsg, &err);
3825
3826	dmsg->do_next = DISPATCH_OBJECT_LISTLESS;
3827	_dispatch_thread_setspecific(dispatch_queue_key, dm->do_targetq);
3828	_dispatch_voucher_ktrace_dmsg_pop(dmsg);
3829	_dispatch_voucher_debug("mach-msg[%p] adopt", dmsg->dmsg_voucher, dmsg);
3830	_dispatch_adopt_priority_and_replace_voucher(dmsg->dmsg_priority,
3831			dmsg->dmsg_voucher, DISPATCH_PRIORITY_ENFORCE);
3832	dmsg->dmsg_voucher = NULL;
3833	if (slowpath(!dm->dm_connect_handler_called)) {
3834		_dispatch_mach_connect_invoke(dm);
3835	}
3836	_dispatch_client_callout4(dr->dm_handler_ctxt, reason, dmsg, err,
3837			dr->dm_handler_func);
3838	_dispatch_thread_setspecific(dispatch_queue_key, (dispatch_queue_t)dm);
3839	_dispatch_introspection_queue_item_complete(dmsg);
3840	dispatch_release(dmsg);
3841}
3842
3843DISPATCH_NOINLINE
3844void
3845_dispatch_mach_barrier_invoke(void *ctxt)
3846{
3847	dispatch_mach_t dm = (dispatch_mach_t)_dispatch_queue_get_current();
3848	dispatch_mach_refs_t dr = dm->ds_refs;
3849	struct dispatch_continuation_s *dc = ctxt;
3850	void *context = dc->dc_data;
3851	dispatch_function_t barrier = dc->dc_other;
3852	bool send_barrier = ((long)dc->do_vtable & DISPATCH_OBJ_BARRIER_BIT);
3853
3854	_dispatch_thread_setspecific(dispatch_queue_key, dm->do_targetq);
3855	if (slowpath(!dm->dm_connect_handler_called)) {
3856		_dispatch_mach_connect_invoke(dm);
3857	}
3858	_dispatch_client_callout(context, barrier);
3859	_dispatch_client_callout4(dr->dm_handler_ctxt,
3860			DISPATCH_MACH_BARRIER_COMPLETED, NULL, 0, dr->dm_handler_func);
3861	_dispatch_thread_setspecific(dispatch_queue_key, (dispatch_queue_t)dm);
3862	if (send_barrier) {
3863		(void)dispatch_atomic_dec2o(dm->dm_refs, dm_sending, release);
3864	}
3865}
3866
3867DISPATCH_NOINLINE
3868void
3869dispatch_mach_send_barrier_f(dispatch_mach_t dm, void *context,
3870		dispatch_function_t barrier)
3871{
3872	dispatch_continuation_t dc = _dispatch_continuation_alloc();
3873	dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
3874	dc->dc_func = _dispatch_mach_barrier_invoke;
3875	dc->dc_ctxt = dc;
3876	dc->dc_data = context;
3877	dc->dc_other = barrier;
3878	_dispatch_continuation_voucher_set(dc, 0);
3879	_dispatch_continuation_priority_set(dc, 0, 0);
3880
3881	dispatch_mach_send_refs_t dr = dm->dm_refs;
3882	if (slowpath(dr->dm_tail) || slowpath(!dispatch_atomic_cmpxchg2o(dr,
3883			dm_sending, 0, 1, acquire))) {
3884		return _dispatch_mach_send_push(dm, dc);
3885	}
3886	// leave send queue locked until barrier has completed
3887	return _dispatch_mach_push(dm, dc, dc->dc_priority);
3888}
3889
3890DISPATCH_NOINLINE
3891void
3892dispatch_mach_receive_barrier_f(dispatch_mach_t dm, void *context,
3893		dispatch_function_t barrier)
3894{
3895	dispatch_continuation_t dc = _dispatch_continuation_alloc();
3896	dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT);
3897	dc->dc_func = _dispatch_mach_barrier_invoke;
3898	dc->dc_ctxt = dc;
3899	dc->dc_data = context;
3900	dc->dc_other = barrier;
3901	_dispatch_continuation_voucher_set(dc, 0);
3902	_dispatch_continuation_priority_set(dc, 0, 0);
3903
3904	return _dispatch_mach_push(dm, dc, dc->dc_priority);
3905}
3906
3907DISPATCH_NOINLINE
3908void
3909dispatch_mach_send_barrier(dispatch_mach_t dm, dispatch_block_t barrier)
3910{
3911	dispatch_mach_send_barrier_f(dm, _dispatch_Block_copy(barrier),
3912			_dispatch_call_block_and_release);
3913}
3914
3915DISPATCH_NOINLINE
3916void
3917dispatch_mach_receive_barrier(dispatch_mach_t dm, dispatch_block_t barrier)
3918{
3919	dispatch_mach_receive_barrier_f(dm, _dispatch_Block_copy(barrier),
3920			_dispatch_call_block_and_release);
3921}
3922
3923DISPATCH_NOINLINE
3924static void
3925_dispatch_mach_cancel_invoke(dispatch_mach_t dm)
3926{
3927	dispatch_mach_refs_t dr = dm->ds_refs;
3928	if (slowpath(!dm->dm_connect_handler_called)) {
3929		_dispatch_mach_connect_invoke(dm);
3930	}
3931	_dispatch_client_callout4(dr->dm_handler_ctxt,
3932			DISPATCH_MACH_CANCELED, NULL, 0, dr->dm_handler_func);
3933	dm->dm_cancel_handler_called = 1;
3934	_dispatch_release(dm); // the retain is done at creation time
3935}
3936
3937DISPATCH_NOINLINE
3938void
3939dispatch_mach_cancel(dispatch_mach_t dm)
3940{
3941	dispatch_source_cancel((dispatch_source_t)dm);
3942}
3943
3944DISPATCH_ALWAYS_INLINE
3945static inline dispatch_queue_t
3946_dispatch_mach_invoke2(dispatch_object_t dou,
3947		_dispatch_thread_semaphore_t *sema_ptr DISPATCH_UNUSED)
3948{
3949	dispatch_mach_t dm = dou._dm;
3950
3951	// This function performs all mach channel actions. Each action is
3952	// responsible for verifying that it takes place on the appropriate queue.
3953	// If the current queue is not the correct queue for this action, the
3954	// correct queue will be returned and the invoke will be re-driven on that
3955	// queue.
3956
3957	// The order of tests here in invoke and in probe should be consistent.
3958
3959	dispatch_queue_t dq = _dispatch_queue_get_current();
3960	dispatch_mach_send_refs_t dr = dm->dm_refs;
3961
3962	if (slowpath(!dm->ds_is_installed)) {
3963		// The channel needs to be installed on the manager queue.
3964		if (dq != &_dispatch_mgr_q) {
3965			return &_dispatch_mgr_q;
3966		}
3967		if (dm->ds_dkev) {
3968			_dispatch_source_kevent_register((dispatch_source_t)dm);
3969		}
3970		dm->ds_is_installed = true;
3971		_dispatch_mach_send(dm);
3972		// Apply initial target queue change
3973		_dispatch_queue_drain(dou);
3974		if (dm->dq_items_tail) {
3975			return dm->do_targetq;
3976		}
3977	} else if (dm->dq_items_tail) {
3978		// The channel has pending messages to deliver to the target queue.
3979		if (dq != dm->do_targetq) {
3980			return dm->do_targetq;
3981		}
3982		dispatch_queue_t tq = dm->do_targetq;
3983		if (slowpath(_dispatch_queue_drain(dou))) {
3984			DISPATCH_CLIENT_CRASH("Sync onto mach channel");
3985		}
3986		if (slowpath(tq != dm->do_targetq)) {
3987			// An item on the channel changed the target queue
3988			return dm->do_targetq;
3989		}
3990	} else if (dr->dm_sending) {
3991		// Sending and uninstallation below require the send lock, the channel
3992		// will be woken up when the lock is dropped <rdar://15132939&15203957>
3993		return NULL;
3994	} else if (dr->dm_tail) {
3995		if (slowpath(dr->dm_needs_mgr) || (slowpath(dr->dm_disconnect_cnt) &&
3996				(dm->dm_dkev || !TAILQ_EMPTY(&dm->dm_refs->dm_replies)))) {
3997			// Send/reply kevents need to be installed or uninstalled
3998			if (dq != &_dispatch_mgr_q) {
3999				return &_dispatch_mgr_q;
4000			}
4001		}
4002		if (!(dm->dm_dkev && DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev)) ||
4003				(dm->ds_atomic_flags & DSF_CANCELED) || dr->dm_disconnect_cnt) {
4004			// The channel has pending messages to send.
4005			_dispatch_mach_send(dm);
4006		}
4007	} else if (dm->ds_atomic_flags & DSF_CANCELED){
4008		// The channel has been cancelled and needs to be uninstalled from the
4009		// manager queue. After uninstallation, the cancellation handler needs
4010		// to be delivered to the target queue.
4011		if (dm->ds_dkev || dm->dm_dkev || dr->dm_send ||
4012				!TAILQ_EMPTY(&dm->dm_refs->dm_replies)) {
4013			if (dq != &_dispatch_mgr_q) {
4014				return &_dispatch_mgr_q;
4015			}
4016			if (!_dispatch_mach_cancel(dm)) {
4017				return NULL;
4018			}
4019		}
4020		if (!dm->dm_cancel_handler_called) {
4021			if (dq != dm->do_targetq) {
4022				return dm->do_targetq;
4023			}
4024			_dispatch_mach_cancel_invoke(dm);
4025		}
4026	}
4027	return NULL;
4028}
4029
4030DISPATCH_NOINLINE
4031void
4032_dispatch_mach_invoke(dispatch_mach_t dm)
4033{
4034	_dispatch_queue_class_invoke(dm, _dispatch_mach_invoke2);
4035}
4036
4037unsigned long
4038_dispatch_mach_probe(dispatch_mach_t dm)
4039{
4040	// This function determines whether the mach channel needs to be invoked.
4041	// The order of tests here in probe and in invoke should be consistent.
4042
4043	dispatch_mach_send_refs_t dr = dm->dm_refs;
4044
4045	if (slowpath(!dm->ds_is_installed)) {
4046		// The channel needs to be installed on the manager queue.
4047		return true;
4048	} else if (_dispatch_queue_class_probe(dm)) {
4049		// The source has pending messages to deliver to the target queue.
4050		return true;
4051	} else if (dr->dm_sending) {
4052		// Sending and uninstallation below require the send lock, the channel
4053		// will be woken up when the lock is dropped <rdar://15132939&15203957>
4054		return false;
4055	} else if (dr->dm_tail &&
4056			(!(dm->dm_dkev && DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev)) ||
4057			(dm->ds_atomic_flags & DSF_CANCELED) || dr->dm_disconnect_cnt)) {
4058		// The channel has pending messages to send.
4059		return true;
4060	} else if (dm->ds_atomic_flags & DSF_CANCELED) {
4061		if (dm->ds_dkev || dm->dm_dkev || dr->dm_send ||
4062				!TAILQ_EMPTY(&dm->dm_refs->dm_replies) ||
4063				!dm->dm_cancel_handler_called) {
4064			// The channel needs to be uninstalled from the manager queue, or
4065			// the cancellation handler needs to be delivered to the target
4066			// queue.
4067			return true;
4068		}
4069	}
4070	// Nothing to do.
4071	return false;
4072}
4073
4074#pragma mark -
4075#pragma mark dispatch_mach_msg_t
4076
4077dispatch_mach_msg_t
4078dispatch_mach_msg_create(mach_msg_header_t *msg, size_t size,
4079		dispatch_mach_msg_destructor_t destructor, mach_msg_header_t **msg_ptr)
4080{
4081	if (slowpath(size < sizeof(mach_msg_header_t)) ||
4082			slowpath(destructor && !msg)) {
4083		DISPATCH_CLIENT_CRASH("Empty message");
4084	}
4085	dispatch_mach_msg_t dmsg = _dispatch_alloc(DISPATCH_VTABLE(mach_msg),
4086			sizeof(struct dispatch_mach_msg_s) +
4087			(destructor ? 0 : size - sizeof(dmsg->dmsg_msg)));
4088	if (destructor) {
4089		dmsg->dmsg_msg = msg;
4090	} else if (msg) {
4091		memcpy(dmsg->dmsg_buf, msg, size);
4092	}
4093	dmsg->do_next = DISPATCH_OBJECT_LISTLESS;
4094	dmsg->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,
4095			false);
4096	dmsg->dmsg_destructor = destructor;
4097	dmsg->dmsg_size = size;
4098	if (msg_ptr) {
4099		*msg_ptr = _dispatch_mach_msg_get_msg(dmsg);
4100	}
4101	return dmsg;
4102}
4103
4104void
4105_dispatch_mach_msg_dispose(dispatch_mach_msg_t dmsg)
4106{
4107	if (dmsg->dmsg_voucher) {
4108		_voucher_release(dmsg->dmsg_voucher);
4109		dmsg->dmsg_voucher = NULL;
4110	}
4111	switch (dmsg->dmsg_destructor) {
4112	case DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT:
4113		break;
4114	case DISPATCH_MACH_MSG_DESTRUCTOR_FREE:
4115		free(dmsg->dmsg_msg);
4116		break;
4117	case DISPATCH_MACH_MSG_DESTRUCTOR_VM_DEALLOCATE: {
4118		mach_vm_size_t vm_size = dmsg->dmsg_size;
4119		mach_vm_address_t vm_addr = (uintptr_t)dmsg->dmsg_msg;
4120		(void)dispatch_assume_zero(mach_vm_deallocate(mach_task_self(),
4121				vm_addr, vm_size));
4122		break;
4123	}}
4124}
4125
4126static inline mach_msg_header_t*
4127_dispatch_mach_msg_get_msg(dispatch_mach_msg_t dmsg)
4128{
4129	return dmsg->dmsg_destructor ? dmsg->dmsg_msg :
4130			(mach_msg_header_t*)dmsg->dmsg_buf;
4131}
4132
4133mach_msg_header_t*
4134dispatch_mach_msg_get_msg(dispatch_mach_msg_t dmsg, size_t *size_ptr)
4135{
4136	if (size_ptr) {
4137		*size_ptr = dmsg->dmsg_size;
4138	}
4139	return _dispatch_mach_msg_get_msg(dmsg);
4140}
4141
4142size_t
4143_dispatch_mach_msg_debug(dispatch_mach_msg_t dmsg, char* buf, size_t bufsiz)
4144{
4145	size_t offset = 0;
4146	offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
4147			dx_kind(dmsg), dmsg);
4148	offset += dsnprintf(&buf[offset], bufsiz - offset, "xrefcnt = 0x%x, "
4149			"refcnt = 0x%x, ", dmsg->do_xref_cnt + 1, dmsg->do_ref_cnt + 1);
4150	offset += dsnprintf(&buf[offset], bufsiz - offset, "opts/err = 0x%x, "
4151			"msgh[%p] = { ", dmsg->do_suspend_cnt, dmsg->dmsg_buf);
4152	mach_msg_header_t *hdr = _dispatch_mach_msg_get_msg(dmsg);
4153	if (hdr->msgh_id) {
4154		offset += dsnprintf(&buf[offset], bufsiz - offset, "id 0x%x, ",
4155				hdr->msgh_id);
4156	}
4157	if (hdr->msgh_size) {
4158		offset += dsnprintf(&buf[offset], bufsiz - offset, "size %u, ",
4159				hdr->msgh_size);
4160	}
4161	if (hdr->msgh_bits) {
4162		offset += dsnprintf(&buf[offset], bufsiz - offset, "bits <l %u, r %u",
4163				MACH_MSGH_BITS_LOCAL(hdr->msgh_bits),
4164				MACH_MSGH_BITS_REMOTE(hdr->msgh_bits));
4165		if (MACH_MSGH_BITS_OTHER(hdr->msgh_bits)) {
4166			offset += dsnprintf(&buf[offset], bufsiz - offset, ", o 0x%x",
4167					MACH_MSGH_BITS_OTHER(hdr->msgh_bits));
4168		}
4169		offset += dsnprintf(&buf[offset], bufsiz - offset, ">, ");
4170	}
4171	if (hdr->msgh_local_port && hdr->msgh_remote_port) {
4172		offset += dsnprintf(&buf[offset], bufsiz - offset, "local 0x%x, "
4173				"remote 0x%x", hdr->msgh_local_port, hdr->msgh_remote_port);
4174	} else if (hdr->msgh_local_port) {
4175		offset += dsnprintf(&buf[offset], bufsiz - offset, "local 0x%x",
4176				hdr->msgh_local_port);
4177	} else if (hdr->msgh_remote_port) {
4178		offset += dsnprintf(&buf[offset], bufsiz - offset, "remote 0x%x",
4179				hdr->msgh_remote_port);
4180	} else {
4181		offset += dsnprintf(&buf[offset], bufsiz - offset, "no ports");
4182	}
4183	offset += dsnprintf(&buf[offset], bufsiz - offset, " } }");
4184	return offset;
4185}
4186
4187#pragma mark -
4188#pragma mark dispatch_mig_server
4189
4190mach_msg_return_t
4191dispatch_mig_server(dispatch_source_t ds, size_t maxmsgsz,
4192		dispatch_mig_callback_t callback)
4193{
4194	mach_msg_options_t options = MACH_RCV_MSG | MACH_RCV_TIMEOUT
4195		| MACH_RCV_TRAILER_ELEMENTS(MACH_RCV_TRAILER_CTX)
4196		| MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0) | MACH_RCV_VOUCHER;
4197	mach_msg_options_t tmp_options;
4198	mig_reply_error_t *bufTemp, *bufRequest, *bufReply;
4199	mach_msg_return_t kr = 0;
4200	uint64_t assertion_token = 0;
4201	unsigned int cnt = 1000; // do not stall out serial queues
4202	boolean_t demux_success;
4203	bool received = false;
4204	size_t rcv_size = maxmsgsz + MAX_TRAILER_SIZE;
4205
4206	// XXX FIXME -- allocate these elsewhere
4207	bufRequest = alloca(rcv_size);
4208	bufReply = alloca(rcv_size);
4209	bufReply->Head.msgh_size = 0;
4210	bufRequest->RetCode = 0;
4211
4212#if DISPATCH_DEBUG
4213	options |= MACH_RCV_LARGE; // rdar://problem/8422992
4214#endif
4215	tmp_options = options;
4216	// XXX FIXME -- change this to not starve out the target queue
4217	for (;;) {
4218		if (DISPATCH_OBJECT_SUSPENDED(ds) || (--cnt == 0)) {
4219			options &= ~MACH_RCV_MSG;
4220			tmp_options &= ~MACH_RCV_MSG;
4221
4222			if (!(tmp_options & MACH_SEND_MSG)) {
4223				goto out;
4224			}
4225		}
4226		kr = mach_msg(&bufReply->Head, tmp_options, bufReply->Head.msgh_size,
4227				(mach_msg_size_t)rcv_size, (mach_port_t)ds->ds_ident_hack, 0,0);
4228
4229		tmp_options = options;
4230
4231		if (slowpath(kr)) {
4232			switch (kr) {
4233			case MACH_SEND_INVALID_DEST:
4234			case MACH_SEND_TIMED_OUT:
4235				if (bufReply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX) {
4236					mach_msg_destroy(&bufReply->Head);
4237				}
4238				break;
4239			case MACH_RCV_TIMED_OUT:
4240				// Don't return an error if a message was sent this time or
4241				// a message was successfully received previously
4242				// rdar://problems/7363620&7791738
4243				if(bufReply->Head.msgh_remote_port || received) {
4244					kr = MACH_MSG_SUCCESS;
4245				}
4246				break;
4247			case MACH_RCV_INVALID_NAME:
4248				break;
4249#if DISPATCH_DEBUG
4250			case MACH_RCV_TOO_LARGE:
4251				// receive messages that are too large and log their id and size
4252				// rdar://problem/8422992
4253				tmp_options &= ~MACH_RCV_LARGE;
4254				size_t large_size = bufReply->Head.msgh_size + MAX_TRAILER_SIZE;
4255				void *large_buf = malloc(large_size);
4256				if (large_buf) {
4257					rcv_size = large_size;
4258					bufReply = large_buf;
4259				}
4260				if (!mach_msg(&bufReply->Head, tmp_options, 0,
4261						(mach_msg_size_t)rcv_size,
4262						(mach_port_t)ds->ds_ident_hack, 0, 0)) {
4263					_dispatch_log("BUG in libdispatch client: "
4264							"dispatch_mig_server received message larger than "
4265							"requested size %zd: id = 0x%x, size = %d",
4266							maxmsgsz, bufReply->Head.msgh_id,
4267							bufReply->Head.msgh_size);
4268				}
4269				if (large_buf) {
4270					free(large_buf);
4271				}
4272				// fall through
4273#endif
4274			default:
4275				_dispatch_bug_mach_client(
4276						"dispatch_mig_server: mach_msg() failed", kr);
4277				break;
4278			}
4279			goto out;
4280		}
4281
4282		if (!(tmp_options & MACH_RCV_MSG)) {
4283			goto out;
4284		}
4285
4286		if (assertion_token) {
4287#if DISPATCH_USE_IMPORTANCE_ASSERTION
4288			int r = proc_importance_assertion_complete(assertion_token);
4289			(void)dispatch_assume_zero(r);
4290#endif
4291			assertion_token = 0;
4292		}
4293		received = true;
4294
4295		bufTemp = bufRequest;
4296		bufRequest = bufReply;
4297		bufReply = bufTemp;
4298
4299#if DISPATCH_USE_IMPORTANCE_ASSERTION
4300		int r = proc_importance_assertion_begin_with_msg(&bufRequest->Head,
4301				NULL, &assertion_token);
4302		if (r && slowpath(r != EIO)) {
4303			(void)dispatch_assume_zero(r);
4304		}
4305#endif
4306		_voucher_replace(voucher_create_with_mach_msg(&bufRequest->Head));
4307		demux_success = callback(&bufRequest->Head, &bufReply->Head);
4308
4309		if (!demux_success) {
4310			// destroy the request - but not the reply port
4311			bufRequest->Head.msgh_remote_port = 0;
4312			mach_msg_destroy(&bufRequest->Head);
4313		} else if (!(bufReply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX)) {
4314			// if MACH_MSGH_BITS_COMPLEX is _not_ set, then bufReply->RetCode
4315			// is present
4316			if (slowpath(bufReply->RetCode)) {
4317				if (bufReply->RetCode == MIG_NO_REPLY) {
4318					continue;
4319				}
4320
4321				// destroy the request - but not the reply port
4322				bufRequest->Head.msgh_remote_port = 0;
4323				mach_msg_destroy(&bufRequest->Head);
4324			}
4325		}
4326
4327		if (bufReply->Head.msgh_remote_port) {
4328			tmp_options |= MACH_SEND_MSG;
4329			if (MACH_MSGH_BITS_REMOTE(bufReply->Head.msgh_bits) !=
4330					MACH_MSG_TYPE_MOVE_SEND_ONCE) {
4331				tmp_options |= MACH_SEND_TIMEOUT;
4332			}
4333		}
4334	}
4335
4336out:
4337	if (assertion_token) {
4338#if DISPATCH_USE_IMPORTANCE_ASSERTION
4339		int r = proc_importance_assertion_complete(assertion_token);
4340		(void)dispatch_assume_zero(r);
4341#endif
4342	}
4343
4344	return kr;
4345}
4346
4347#endif /* HAVE_MACH */
4348
4349#pragma mark -
4350#pragma mark dispatch_source_debug
4351
4352DISPATCH_NOINLINE
4353static const char *
4354_evfiltstr(short filt)
4355{
4356	switch (filt) {
4357#define _evfilt2(f) case (f): return #f
4358	_evfilt2(EVFILT_READ);
4359	_evfilt2(EVFILT_WRITE);
4360	_evfilt2(EVFILT_AIO);
4361	_evfilt2(EVFILT_VNODE);
4362	_evfilt2(EVFILT_PROC);
4363	_evfilt2(EVFILT_SIGNAL);
4364	_evfilt2(EVFILT_TIMER);
4365#ifdef EVFILT_VM
4366	_evfilt2(EVFILT_VM);
4367#endif
4368#ifdef EVFILT_MEMORYSTATUS
4369	_evfilt2(EVFILT_MEMORYSTATUS);
4370#endif
4371#if HAVE_MACH
4372	_evfilt2(EVFILT_MACHPORT);
4373	_evfilt2(DISPATCH_EVFILT_MACH_NOTIFICATION);
4374#endif
4375	_evfilt2(EVFILT_FS);
4376	_evfilt2(EVFILT_USER);
4377
4378	_evfilt2(DISPATCH_EVFILT_TIMER);
4379	_evfilt2(DISPATCH_EVFILT_CUSTOM_ADD);
4380	_evfilt2(DISPATCH_EVFILT_CUSTOM_OR);
4381	default:
4382		return "EVFILT_missing";
4383	}
4384}
4385
4386static size_t
4387_dispatch_source_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz)
4388{
4389	dispatch_queue_t target = ds->do_targetq;
4390	return dsnprintf(buf, bufsiz, "target = %s[%p], ident = 0x%lx, "
4391			"pending_data = 0x%lx, pending_data_mask = 0x%lx, ",
4392			target && target->dq_label ? target->dq_label : "", target,
4393			ds->ds_ident_hack, ds->ds_pending_data, ds->ds_pending_data_mask);
4394}
4395
4396static size_t
4397_dispatch_timer_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz)
4398{
4399	dispatch_source_refs_t dr = ds->ds_refs;
4400	return dsnprintf(buf, bufsiz, "timer = { target = 0x%llx, deadline = 0x%llx,"
4401			" last_fire = 0x%llx, interval = 0x%llx, flags = 0x%lx }, ",
4402			ds_timer(dr).target, ds_timer(dr).deadline, ds_timer(dr).last_fire,
4403			ds_timer(dr).interval, ds_timer(dr).flags);
4404}
4405
4406size_t
4407_dispatch_source_debug(dispatch_source_t ds, char* buf, size_t bufsiz)
4408{
4409	size_t offset = 0;
4410	offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
4411			dx_kind(ds), ds);
4412	offset += _dispatch_object_debug_attr(ds, &buf[offset], bufsiz - offset);
4413	offset += _dispatch_source_debug_attr(ds, &buf[offset], bufsiz - offset);
4414	if (ds->ds_is_timer) {
4415		offset += _dispatch_timer_debug_attr(ds, &buf[offset], bufsiz - offset);
4416	}
4417	offset += dsnprintf(&buf[offset], bufsiz - offset, "filter = %s }",
4418			ds->ds_dkev ? _evfiltstr(ds->ds_dkev->dk_kevent.filter) : "????");
4419	return offset;
4420}
4421
4422static size_t
4423_dispatch_mach_debug_attr(dispatch_mach_t dm, char* buf, size_t bufsiz)
4424{
4425	dispatch_queue_t target = dm->do_targetq;
4426	return dsnprintf(buf, bufsiz, "target = %s[%p], receive = 0x%x, "
4427			"send = 0x%x, send-possible = 0x%x%s, checkin = 0x%x%s, "
4428			"sending = %d, disconnected = %d, canceled = %d ",
4429			target && target->dq_label ? target->dq_label : "", target,
4430			dm->ds_dkev ?(mach_port_t)dm->ds_dkev->dk_kevent.ident:0,
4431			dm->dm_refs->dm_send,
4432			dm->dm_dkev ?(mach_port_t)dm->dm_dkev->dk_kevent.ident:0,
4433			dm->dm_dkev && DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev) ?
4434			" (armed)" : "", dm->dm_refs->dm_checkin_port,
4435			dm->dm_refs->dm_checkin ? " (pending)" : "",
4436			dm->dm_refs->dm_sending, dm->dm_refs->dm_disconnect_cnt,
4437			(bool)(dm->ds_atomic_flags & DSF_CANCELED));
4438}
4439size_t
4440_dispatch_mach_debug(dispatch_mach_t dm, char* buf, size_t bufsiz)
4441{
4442	size_t offset = 0;
4443	offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
4444			dm->dq_label && !dm->dm_cancel_handler_called ? dm->dq_label :
4445			dx_kind(dm), dm);
4446	offset += _dispatch_object_debug_attr(dm, &buf[offset], bufsiz - offset);
4447	offset += _dispatch_mach_debug_attr(dm, &buf[offset], bufsiz - offset);
4448	offset += dsnprintf(&buf[offset], bufsiz - offset, "}");
4449	return offset;
4450}
4451
4452#if DISPATCH_DEBUG
4453static void
4454_dispatch_kevent_debug(struct kevent64_s* kev, const char* str)
4455{
4456	_dispatch_log("kevent[%p] = { ident = 0x%llx, filter = %s, flags = 0x%x, "
4457			"fflags = 0x%x, data = 0x%llx, udata = 0x%llx, ext[0] = 0x%llx, "
4458			"ext[1] = 0x%llx }: %s", kev, kev->ident, _evfiltstr(kev->filter),
4459			kev->flags, kev->fflags, kev->data, kev->udata, kev->ext[0],
4460			kev->ext[1], str);
4461}
4462
4463static void
4464_dispatch_kevent_debugger2(void *context)
4465{
4466	struct sockaddr sa;
4467	socklen_t sa_len = sizeof(sa);
4468	int c, fd = (int)(long)context;
4469	unsigned int i;
4470	dispatch_kevent_t dk;
4471	dispatch_source_t ds;
4472	dispatch_source_refs_t dr;
4473	FILE *debug_stream;
4474
4475	c = accept(fd, &sa, &sa_len);
4476	if (c == -1) {
4477		if (errno != EAGAIN) {
4478			(void)dispatch_assume_zero(errno);
4479		}
4480		return;
4481	}
4482#if 0
4483	int r = fcntl(c, F_SETFL, 0); // disable non-blocking IO
4484	if (r == -1) {
4485		(void)dispatch_assume_zero(errno);
4486	}
4487#endif
4488	debug_stream = fdopen(c, "a");
4489	if (!dispatch_assume(debug_stream)) {
4490		close(c);
4491		return;
4492	}
4493
4494	fprintf(debug_stream, "HTTP/1.0 200 OK\r\n");
4495	fprintf(debug_stream, "Content-type: text/html\r\n");
4496	fprintf(debug_stream, "Pragma: nocache\r\n");
4497	fprintf(debug_stream, "\r\n");
4498	fprintf(debug_stream, "<html>\n");
4499	fprintf(debug_stream, "<head><title>PID %u</title></head>\n", getpid());
4500	fprintf(debug_stream, "<body>\n<ul>\n");
4501
4502	//fprintf(debug_stream, "<tr><td>DK</td><td>DK</td><td>DK</td><td>DK</td>"
4503	//		"<td>DK</td><td>DK</td><td>DK</td></tr>\n");
4504
4505	for (i = 0; i < DSL_HASH_SIZE; i++) {
4506		if (TAILQ_EMPTY(&_dispatch_sources[i])) {
4507			continue;
4508		}
4509		TAILQ_FOREACH(dk, &_dispatch_sources[i], dk_list) {
4510			fprintf(debug_stream, "\t<br><li>DK %p ident %lu filter %s flags "
4511					"0x%hx fflags 0x%x data 0x%lx udata %p\n",
4512					dk, (unsigned long)dk->dk_kevent.ident,
4513					_evfiltstr(dk->dk_kevent.filter), dk->dk_kevent.flags,
4514					dk->dk_kevent.fflags, (unsigned long)dk->dk_kevent.data,
4515					(void*)dk->dk_kevent.udata);
4516			fprintf(debug_stream, "\t\t<ul>\n");
4517			TAILQ_FOREACH(dr, &dk->dk_sources, dr_list) {
4518				ds = _dispatch_source_from_refs(dr);
4519				fprintf(debug_stream, "\t\t\t<li>DS %p refcnt 0x%x suspend "
4520						"0x%x data 0x%lx mask 0x%lx flags 0x%x</li>\n",
4521						ds, ds->do_ref_cnt + 1, ds->do_suspend_cnt,
4522						ds->ds_pending_data, ds->ds_pending_data_mask,
4523						ds->ds_atomic_flags);
4524				if (ds->do_suspend_cnt == DISPATCH_OBJECT_SUSPEND_LOCK) {
4525					dispatch_queue_t dq = ds->do_targetq;
4526					fprintf(debug_stream, "\t\t<br>DQ: %p refcnt 0x%x suspend "
4527							"0x%x label: %s\n", dq, dq->do_ref_cnt + 1,
4528							dq->do_suspend_cnt, dq->dq_label ? dq->dq_label:"");
4529				}
4530			}
4531			fprintf(debug_stream, "\t\t</ul>\n");
4532			fprintf(debug_stream, "\t</li>\n");
4533		}
4534	}
4535	fprintf(debug_stream, "</ul>\n</body>\n</html>\n");
4536	fflush(debug_stream);
4537	fclose(debug_stream);
4538}
4539
4540static void
4541_dispatch_kevent_debugger2_cancel(void *context)
4542{
4543	int ret, fd = (int)(long)context;
4544
4545	ret = close(fd);
4546	if (ret != -1) {
4547		(void)dispatch_assume_zero(errno);
4548	}
4549}
4550
4551static void
4552_dispatch_kevent_debugger(void *context DISPATCH_UNUSED)
4553{
4554	union {
4555		struct sockaddr_in sa_in;
4556		struct sockaddr sa;
4557	} sa_u = {
4558		.sa_in = {
4559			.sin_family = AF_INET,
4560			.sin_addr = { htonl(INADDR_LOOPBACK), },
4561		},
4562	};
4563	dispatch_source_t ds;
4564	const char *valstr;
4565	int val, r, fd, sock_opt = 1;
4566	socklen_t slen = sizeof(sa_u);
4567
4568	if (issetugid()) {
4569		return;
4570	}
4571	valstr = getenv("LIBDISPATCH_DEBUGGER");
4572	if (!valstr) {
4573		return;
4574	}
4575	val = atoi(valstr);
4576	if (val == 2) {
4577		sa_u.sa_in.sin_addr.s_addr = 0;
4578	}
4579	fd = socket(PF_INET, SOCK_STREAM, 0);
4580	if (fd == -1) {
4581		(void)dispatch_assume_zero(errno);
4582		return;
4583	}
4584	r = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&sock_opt,
4585			(socklen_t) sizeof sock_opt);
4586	if (r == -1) {
4587		(void)dispatch_assume_zero(errno);
4588		goto out_bad;
4589	}
4590#if 0
4591	r = fcntl(fd, F_SETFL, O_NONBLOCK);
4592	if (r == -1) {
4593		(void)dispatch_assume_zero(errno);
4594		goto out_bad;
4595	}
4596#endif
4597	r = bind(fd, &sa_u.sa, sizeof(sa_u));
4598	if (r == -1) {
4599		(void)dispatch_assume_zero(errno);
4600		goto out_bad;
4601	}
4602	r = listen(fd, SOMAXCONN);
4603	if (r == -1) {
4604		(void)dispatch_assume_zero(errno);
4605		goto out_bad;
4606	}
4607	r = getsockname(fd, &sa_u.sa, &slen);
4608	if (r == -1) {
4609		(void)dispatch_assume_zero(errno);
4610		goto out_bad;
4611	}
4612
4613	ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, (uintptr_t)fd, 0,
4614			&_dispatch_mgr_q);
4615	if (dispatch_assume(ds)) {
4616		_dispatch_log("LIBDISPATCH: debug port: %hu",
4617				(in_port_t)ntohs(sa_u.sa_in.sin_port));
4618
4619		/* ownership of fd transfers to ds */
4620		dispatch_set_context(ds, (void *)(long)fd);
4621		dispatch_source_set_event_handler_f(ds, _dispatch_kevent_debugger2);
4622		dispatch_source_set_cancel_handler_f(ds,
4623				_dispatch_kevent_debugger2_cancel);
4624		dispatch_resume(ds);
4625
4626		return;
4627	}
4628out_bad:
4629	close(fd);
4630}
4631
4632#if HAVE_MACH
4633
4634#ifndef MACH_PORT_TYPE_SPREQUEST
4635#define MACH_PORT_TYPE_SPREQUEST 0x40000000
4636#endif
4637
4638DISPATCH_NOINLINE
4639void
4640dispatch_debug_machport(mach_port_t name, const char* str)
4641{
4642	mach_port_type_t type;
4643	mach_msg_bits_t ns = 0, nr = 0, nso = 0, nd = 0;
4644	unsigned int dnreqs = 0, dnrsiz;
4645	kern_return_t kr = mach_port_type(mach_task_self(), name, &type);
4646	if (kr) {
4647		_dispatch_log("machport[0x%08x] = { error(0x%x) \"%s\" }: %s", name,
4648				kr, mach_error_string(kr), str);
4649		return;
4650	}
4651	if (type & MACH_PORT_TYPE_SEND) {
4652		(void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
4653				MACH_PORT_RIGHT_SEND, &ns));
4654	}
4655	if (type & MACH_PORT_TYPE_SEND_ONCE) {
4656		(void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
4657				MACH_PORT_RIGHT_SEND_ONCE, &nso));
4658	}
4659	if (type & MACH_PORT_TYPE_DEAD_NAME) {
4660		(void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
4661				MACH_PORT_RIGHT_DEAD_NAME, &nd));
4662	}
4663	if (type & (MACH_PORT_TYPE_RECEIVE|MACH_PORT_TYPE_SEND)) {
4664		kr = mach_port_dnrequest_info(mach_task_self(), name, &dnrsiz, &dnreqs);
4665		if (kr != KERN_INVALID_RIGHT) (void)dispatch_assume_zero(kr);
4666	}
4667	if (type & MACH_PORT_TYPE_RECEIVE) {
4668		mach_port_status_t status = { .mps_pset = 0, };
4669		mach_msg_type_number_t cnt = MACH_PORT_RECEIVE_STATUS_COUNT;
4670		(void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
4671				MACH_PORT_RIGHT_RECEIVE, &nr));
4672		(void)dispatch_assume_zero(mach_port_get_attributes(mach_task_self(),
4673				name, MACH_PORT_RECEIVE_STATUS, (void*)&status, &cnt));
4674		_dispatch_log("machport[0x%08x] = { R(%03u) S(%03u) SO(%03u) D(%03u) "
4675				"dnreqs(%03u) spreq(%s) nsreq(%s) pdreq(%s) srights(%s) "
4676				"sorights(%03u) qlim(%03u) msgcount(%03u) mkscount(%03u) "
4677				"seqno(%03u) }: %s", name, nr, ns, nso, nd, dnreqs,
4678				type & MACH_PORT_TYPE_SPREQUEST ? "Y":"N",
4679				status.mps_nsrequest ? "Y":"N", status.mps_pdrequest ? "Y":"N",
4680				status.mps_srights ? "Y":"N", status.mps_sorights,
4681				status.mps_qlimit, status.mps_msgcount, status.mps_mscount,
4682				status.mps_seqno, str);
4683	} else if (type & (MACH_PORT_TYPE_SEND|MACH_PORT_TYPE_SEND_ONCE|
4684			MACH_PORT_TYPE_DEAD_NAME)) {
4685		_dispatch_log("machport[0x%08x] = { R(%03u) S(%03u) SO(%03u) D(%03u) "
4686				"dnreqs(%03u) spreq(%s) }: %s", name, nr, ns, nso, nd, dnreqs,
4687				type & MACH_PORT_TYPE_SPREQUEST ? "Y":"N", str);
4688	} else {
4689		_dispatch_log("machport[0x%08x] = { type(0x%08x) }: %s", name, type,
4690				str);
4691	}
4692}
4693
4694#endif // HAVE_MACH
4695
4696#endif // DISPATCH_DEBUG
4697