1/*
2 * Copyright (c) 2009-2013 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21#include "internal.h"
22
23#ifndef DISPATCH_IO_DEBUG
24#define DISPATCH_IO_DEBUG DISPATCH_DEBUG
25#endif
26
27#if DISPATCH_IO_DEBUG
28#define _dispatch_fd_debug(msg, fd, args...) \
29	_dispatch_debug("fd[0x%x]: " msg, (fd), ##args)
30#else
31#define _dispatch_fd_debug(msg, fd, args...)
32#endif
33
34#if USE_OBJC
35#define _dispatch_io_data_retain(x) _dispatch_objc_retain(x)
36#define _dispatch_io_data_release(x) _dispatch_objc_release(x)
37#else
38#define _dispatch_io_data_retain(x) dispatch_retain(x)
39#define _dispatch_io_data_release(x) dispatch_release(x)
40#endif
41
42typedef void (^dispatch_fd_entry_init_callback_t)(dispatch_fd_entry_t fd_entry);
43
44DISPATCH_EXPORT DISPATCH_NOTHROW
45void _dispatch_iocntl(uint32_t param, uint64_t value);
46
47static dispatch_operation_t _dispatch_operation_create(
48		dispatch_op_direction_t direction, dispatch_io_t channel, off_t offset,
49		size_t length, dispatch_data_t data, dispatch_queue_t queue,
50		dispatch_io_handler_t handler);
51static void _dispatch_operation_enqueue(dispatch_operation_t op,
52		dispatch_op_direction_t direction, dispatch_data_t data);
53static dispatch_source_t _dispatch_operation_timer(dispatch_queue_t tq,
54		dispatch_operation_t op);
55static inline void _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry);
56static inline void _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry);
57static void _dispatch_fd_entry_init_async(dispatch_fd_t fd,
58		dispatch_fd_entry_init_callback_t completion_callback);
59static dispatch_fd_entry_t _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd,
60		uintptr_t hash);
61static dispatch_fd_entry_t _dispatch_fd_entry_create_with_path(
62		dispatch_io_path_data_t path_data, dev_t dev, mode_t mode);
63static int _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry,
64		dispatch_io_t channel);
65static void _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry,
66		dispatch_io_t channel);
67static void _dispatch_stream_init(dispatch_fd_entry_t fd_entry,
68		dispatch_queue_t tq);
69static void _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry,
70		dispatch_op_direction_t direction);
71static void _dispatch_disk_init(dispatch_fd_entry_t fd_entry, dev_t dev);
72static void _dispatch_stream_enqueue_operation(dispatch_stream_t stream,
73		dispatch_operation_t operation, dispatch_data_t data);
74static void _dispatch_disk_enqueue_operation(dispatch_disk_t dsk,
75		dispatch_operation_t operation, dispatch_data_t data);
76static void _dispatch_stream_cleanup_operations(dispatch_stream_t stream,
77		dispatch_io_t channel);
78static void _dispatch_disk_cleanup_operations(dispatch_disk_t disk,
79		dispatch_io_t channel);
80static void _dispatch_stream_source_handler(void *ctx);
81static void _dispatch_stream_queue_handler(void *ctx);
82static void _dispatch_stream_handler(void *ctx);
83static void _dispatch_disk_handler(void *ctx);
84static void _dispatch_disk_perform(void *ctxt);
85static void _dispatch_operation_advise(dispatch_operation_t op,
86		size_t chunk_size);
87static int _dispatch_operation_perform(dispatch_operation_t op);
88static void _dispatch_operation_deliver_data(dispatch_operation_t op,
89		dispatch_op_flags_t flags);
90
91// Macros to wrap syscalls which return -1 on error, and retry on EINTR
92#define _dispatch_io_syscall_switch_noerr(_err, _syscall, ...) do { \
93		switch (((_err) = (((_syscall) == -1) ? errno : 0))) { \
94		case EINTR: continue; \
95		__VA_ARGS__ \
96		} \
97		break; \
98	} while (1)
99#define _dispatch_io_syscall_switch(__err, __syscall, ...) do { \
100		_dispatch_io_syscall_switch_noerr(__err, __syscall, \
101		case 0: break; \
102		__VA_ARGS__ \
103		); \
104	} while (0)
105#define _dispatch_io_syscall(__syscall) do { int __err; \
106		_dispatch_io_syscall_switch(__err, __syscall); \
107	} while (0)
108
109enum {
110	DISPATCH_OP_COMPLETE = 1,
111	DISPATCH_OP_DELIVER,
112	DISPATCH_OP_DELIVER_AND_COMPLETE,
113	DISPATCH_OP_COMPLETE_RESUME,
114	DISPATCH_OP_RESUME,
115	DISPATCH_OP_ERR,
116	DISPATCH_OP_FD_ERR,
117};
118
119#define _dispatch_io_Block_copy(x) \
120		((typeof(x))_dispatch_Block_copy((dispatch_block_t)(x)))
121
122#pragma mark -
123#pragma mark dispatch_io_hashtables
124
125#if TARGET_OS_EMBEDDED
126#define DIO_HASH_SIZE  64u // must be a power of two
127#else
128#define DIO_HASH_SIZE 256u // must be a power of two
129#endif
130#define DIO_HASH(x) ((uintptr_t)(x) & (DIO_HASH_SIZE - 1))
131
132// Global hashtable of dev_t -> disk_s mappings
133DISPATCH_CACHELINE_ALIGN
134static TAILQ_HEAD(, dispatch_disk_s) _dispatch_io_devs[DIO_HASH_SIZE];
135// Global hashtable of fd -> fd_entry_s mappings
136DISPATCH_CACHELINE_ALIGN
137static TAILQ_HEAD(, dispatch_fd_entry_s) _dispatch_io_fds[DIO_HASH_SIZE];
138
139static dispatch_once_t  _dispatch_io_devs_lockq_pred;
140static dispatch_queue_t _dispatch_io_devs_lockq;
141static dispatch_queue_t _dispatch_io_fds_lockq;
142
143static void
144_dispatch_io_fds_lockq_init(void *context DISPATCH_UNUSED)
145{
146	_dispatch_io_fds_lockq = dispatch_queue_create(
147			"com.apple.libdispatch-io.fd_lockq", NULL);
148	unsigned int i;
149	for (i = 0; i < DIO_HASH_SIZE; i++) {
150		TAILQ_INIT(&_dispatch_io_fds[i]);
151	}
152}
153
154static void
155_dispatch_io_devs_lockq_init(void *context DISPATCH_UNUSED)
156{
157	_dispatch_io_devs_lockq = dispatch_queue_create(
158			"com.apple.libdispatch-io.dev_lockq", NULL);
159	unsigned int i;
160	for (i = 0; i < DIO_HASH_SIZE; i++) {
161		TAILQ_INIT(&_dispatch_io_devs[i]);
162	}
163}
164
165#pragma mark -
166#pragma mark dispatch_io_defaults
167
168enum {
169	DISPATCH_IOCNTL_CHUNK_PAGES = 1,
170	DISPATCH_IOCNTL_LOW_WATER_CHUNKS,
171	DISPATCH_IOCNTL_INITIAL_DELIVERY,
172	DISPATCH_IOCNTL_MAX_PENDING_IO_REQS,
173};
174
175static struct dispatch_io_defaults_s {
176	size_t chunk_pages, low_water_chunks, max_pending_io_reqs;
177	bool initial_delivery;
178} dispatch_io_defaults = {
179	.chunk_pages = DIO_MAX_CHUNK_PAGES,
180	.low_water_chunks = DIO_DEFAULT_LOW_WATER_CHUNKS,
181	.max_pending_io_reqs = DIO_MAX_PENDING_IO_REQS,
182};
183
184#define _dispatch_iocntl_set_default(p, v) do { \
185		dispatch_io_defaults.p = (typeof(dispatch_io_defaults.p))(v); \
186	} while (0)
187
188void
189_dispatch_iocntl(uint32_t param, uint64_t value)
190{
191	switch (param) {
192	case DISPATCH_IOCNTL_CHUNK_PAGES:
193		_dispatch_iocntl_set_default(chunk_pages, value);
194		break;
195	case DISPATCH_IOCNTL_LOW_WATER_CHUNKS:
196		_dispatch_iocntl_set_default(low_water_chunks, value);
197		break;
198	case DISPATCH_IOCNTL_INITIAL_DELIVERY:
199		_dispatch_iocntl_set_default(initial_delivery, value);
200	case DISPATCH_IOCNTL_MAX_PENDING_IO_REQS:
201		_dispatch_iocntl_set_default(max_pending_io_reqs, value);
202		break;
203	}
204}
205
206#pragma mark -
207#pragma mark dispatch_io_t
208
209static dispatch_io_t
210_dispatch_io_create(dispatch_io_type_t type)
211{
212	dispatch_io_t channel = _dispatch_alloc(DISPATCH_VTABLE(io),
213			sizeof(struct dispatch_io_s));
214	channel->do_next = DISPATCH_OBJECT_LISTLESS;
215	channel->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,
216			true);
217	channel->params.type = type;
218	channel->params.high = SIZE_MAX;
219	channel->params.low = dispatch_io_defaults.low_water_chunks *
220			dispatch_io_defaults.chunk_pages * PAGE_SIZE;
221	channel->queue = dispatch_queue_create("com.apple.libdispatch-io.channelq",
222			NULL);
223	return channel;
224}
225
226static void
227_dispatch_io_init(dispatch_io_t channel, dispatch_fd_entry_t fd_entry,
228		dispatch_queue_t queue, int err, void (^cleanup_handler)(int))
229{
230	// Enqueue the cleanup handler on the suspended close queue
231	if (cleanup_handler) {
232		_dispatch_retain(queue);
233		dispatch_async(!err ? fd_entry->close_queue : channel->queue, ^{
234			dispatch_async(queue, ^{
235				_dispatch_fd_debug("cleanup handler invoke", -1);
236				cleanup_handler(err);
237			});
238			_dispatch_release(queue);
239		});
240	}
241	if (fd_entry) {
242		channel->fd_entry = fd_entry;
243		dispatch_retain(fd_entry->barrier_queue);
244		dispatch_retain(fd_entry->barrier_group);
245		channel->barrier_queue = fd_entry->barrier_queue;
246		channel->barrier_group = fd_entry->barrier_group;
247	} else {
248		// Still need to create a barrier queue, since all operations go
249		// through it
250		channel->barrier_queue = dispatch_queue_create(
251				"com.apple.libdispatch-io.barrierq", NULL);
252		channel->barrier_group = dispatch_group_create();
253	}
254}
255
256void
257_dispatch_io_dispose(dispatch_io_t channel)
258{
259	_dispatch_object_debug(channel, "%s", __func__);
260	if (channel->fd_entry &&
261			!(channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED))) {
262		if (channel->fd_entry->path_data) {
263			// This modification is safe since path_data->channel is checked
264			// only on close_queue (which is still suspended at this point)
265			channel->fd_entry->path_data->channel = NULL;
266		}
267		// Cleanup handlers will only run when all channels related to this
268		// fd are complete
269		_dispatch_fd_entry_release(channel->fd_entry);
270	}
271	if (channel->queue) {
272		dispatch_release(channel->queue);
273	}
274	if (channel->barrier_queue) {
275		dispatch_release(channel->barrier_queue);
276	}
277	if (channel->barrier_group) {
278		dispatch_release(channel->barrier_group);
279	}
280}
281
282static int
283_dispatch_io_validate_type(dispatch_io_t channel, mode_t mode)
284{
285	int err = 0;
286	if (S_ISDIR(mode)) {
287		err = EISDIR;
288	} else if (channel->params.type == DISPATCH_IO_RANDOM &&
289			(S_ISFIFO(mode) || S_ISSOCK(mode))) {
290		err = ESPIPE;
291	}
292	return err;
293}
294
295static int
296_dispatch_io_get_error(dispatch_operation_t op, dispatch_io_t channel,
297		bool ignore_closed)
298{
299	// On _any_ queue
300	int err;
301	if (op) {
302		channel = op->channel;
303	}
304	if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
305		if (!ignore_closed || channel->atomic_flags & DIO_STOPPED) {
306			err = ECANCELED;
307		} else {
308			err = 0;
309		}
310	} else {
311		err = op ? op->fd_entry->err : channel->err;
312	}
313	return err;
314}
315
316#pragma mark -
317#pragma mark dispatch_io_channels
318
319dispatch_io_t
320dispatch_io_create(dispatch_io_type_t type, dispatch_fd_t fd,
321		dispatch_queue_t queue, void (^cleanup_handler)(int))
322{
323	if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) {
324		return NULL;
325	}
326	_dispatch_fd_debug("io create", fd);
327	dispatch_io_t channel = _dispatch_io_create(type);
328	channel->fd = fd;
329	channel->fd_actual = fd;
330	dispatch_suspend(channel->queue);
331	_dispatch_retain(queue);
332	_dispatch_retain(channel);
333	_dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
334		// On barrier queue
335		int err = fd_entry->err;
336		if (!err) {
337			err = _dispatch_io_validate_type(channel, fd_entry->stat.mode);
338		}
339		if (!err && type == DISPATCH_IO_RANDOM) {
340			off_t f_ptr;
341			_dispatch_io_syscall_switch_noerr(err,
342				f_ptr = lseek(fd_entry->fd, 0, SEEK_CUR),
343				case 0: channel->f_ptr = f_ptr; break;
344				default: (void)dispatch_assume_zero(err); break;
345			);
346		}
347		channel->err = err;
348		_dispatch_fd_entry_retain(fd_entry);
349		_dispatch_io_init(channel, fd_entry, queue, err, cleanup_handler);
350		dispatch_resume(channel->queue);
351		_dispatch_object_debug(channel, "%s", __func__);
352		_dispatch_release(channel);
353		_dispatch_release(queue);
354	});
355	_dispatch_object_debug(channel, "%s", __func__);
356	return channel;
357}
358
359dispatch_io_t
360dispatch_io_create_f(dispatch_io_type_t type, dispatch_fd_t fd,
361		dispatch_queue_t queue, void *context,
362		void (*cleanup_handler)(void *context, int error))
363{
364	return dispatch_io_create(type, fd, queue, !cleanup_handler ? NULL :
365			^(int error){ cleanup_handler(context, error); });
366}
367
368dispatch_io_t
369dispatch_io_create_with_path(dispatch_io_type_t type, const char *path,
370		int oflag, mode_t mode, dispatch_queue_t queue,
371		void (^cleanup_handler)(int error))
372{
373	if ((type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) ||
374			!(path && *path == '/')) {
375		return NULL;
376	}
377	size_t pathlen = strlen(path);
378	dispatch_io_path_data_t path_data = malloc(sizeof(*path_data) + pathlen+1);
379	if (!path_data) {
380		return NULL;
381	}
382	_dispatch_fd_debug("io create with path %s", -1, path);
383	dispatch_io_t channel = _dispatch_io_create(type);
384	channel->fd = -1;
385	channel->fd_actual = -1;
386	path_data->channel = channel;
387	path_data->oflag = oflag;
388	path_data->mode = mode;
389	path_data->pathlen = pathlen;
390	memcpy(path_data->path, path, pathlen + 1);
391	_dispatch_retain(queue);
392	_dispatch_retain(channel);
393	dispatch_async(channel->queue, ^{
394		int err = 0;
395		struct stat st;
396		_dispatch_io_syscall_switch_noerr(err,
397			(path_data->oflag & O_NOFOLLOW) == O_NOFOLLOW ||
398					(path_data->oflag & O_SYMLINK) == O_SYMLINK ?
399					lstat(path_data->path, &st) : stat(path_data->path, &st),
400			case 0:
401				err = _dispatch_io_validate_type(channel, st.st_mode);
402				break;
403			default:
404				if ((path_data->oflag & O_CREAT) &&
405						(*(path_data->path + path_data->pathlen - 1) != '/')) {
406					// Check parent directory
407					char *c = strrchr(path_data->path, '/');
408					dispatch_assert(c);
409					*c = 0;
410					int perr;
411					_dispatch_io_syscall_switch_noerr(perr,
412						stat(path_data->path, &st),
413						case 0:
414							// Since the parent directory exists, open() will
415							// create a regular file after the fd_entry has
416							// been filled in
417							st.st_mode = S_IFREG;
418							err = 0;
419							break;
420					);
421					*c = '/';
422				}
423				break;
424		);
425		channel->err = err;
426		if (err) {
427			free(path_data);
428			_dispatch_io_init(channel, NULL, queue, err, cleanup_handler);
429			_dispatch_release(channel);
430			_dispatch_release(queue);
431			return;
432		}
433		dispatch_suspend(channel->queue);
434		dispatch_once_f(&_dispatch_io_devs_lockq_pred, NULL,
435				_dispatch_io_devs_lockq_init);
436		dispatch_async(_dispatch_io_devs_lockq, ^{
437			dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create_with_path(
438					path_data, st.st_dev, st.st_mode);
439			_dispatch_io_init(channel, fd_entry, queue, 0, cleanup_handler);
440			dispatch_resume(channel->queue);
441			_dispatch_object_debug(channel, "%s", __func__);
442			_dispatch_release(channel);
443			_dispatch_release(queue);
444		});
445	});
446	_dispatch_object_debug(channel, "%s", __func__);
447	return channel;
448}
449
450dispatch_io_t
451dispatch_io_create_with_path_f(dispatch_io_type_t type, const char *path,
452		int oflag, mode_t mode, dispatch_queue_t queue, void *context,
453		void (*cleanup_handler)(void *context, int error))
454{
455	return dispatch_io_create_with_path(type, path, oflag, mode, queue,
456			!cleanup_handler ? NULL :
457			^(int error){ cleanup_handler(context, error); });
458}
459
460dispatch_io_t
461dispatch_io_create_with_io(dispatch_io_type_t type, dispatch_io_t in_channel,
462		dispatch_queue_t queue, void (^cleanup_handler)(int error))
463{
464	if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) {
465		return NULL;
466	}
467	_dispatch_fd_debug("io create with io %p", -1, in_channel);
468	dispatch_io_t channel = _dispatch_io_create(type);
469	dispatch_suspend(channel->queue);
470	_dispatch_retain(queue);
471	_dispatch_retain(channel);
472	_dispatch_retain(in_channel);
473	dispatch_async(in_channel->queue, ^{
474		int err0 = _dispatch_io_get_error(NULL, in_channel, false);
475		if (err0) {
476			channel->err = err0;
477			_dispatch_io_init(channel, NULL, queue, err0, cleanup_handler);
478			dispatch_resume(channel->queue);
479			_dispatch_release(channel);
480			_dispatch_release(in_channel);
481			_dispatch_release(queue);
482			return;
483		}
484		dispatch_async(in_channel->barrier_queue, ^{
485			int err = _dispatch_io_get_error(NULL, in_channel, false);
486			// If there is no error, the fd_entry for the in_channel is valid.
487			// Since we are running on in_channel's queue, the fd_entry has been
488			// fully resolved and will stay valid for the duration of this block
489			if (!err) {
490				err = in_channel->err;
491				if (!err) {
492					err = in_channel->fd_entry->err;
493				}
494			}
495			if (!err) {
496				err = _dispatch_io_validate_type(channel,
497						in_channel->fd_entry->stat.mode);
498			}
499			if (!err && type == DISPATCH_IO_RANDOM && in_channel->fd != -1) {
500				off_t f_ptr;
501				_dispatch_io_syscall_switch_noerr(err,
502					f_ptr = lseek(in_channel->fd_entry->fd, 0, SEEK_CUR),
503					case 0: channel->f_ptr = f_ptr; break;
504					default: (void)dispatch_assume_zero(err); break;
505				);
506			}
507			channel->err = err;
508			if (err) {
509				_dispatch_io_init(channel, NULL, queue, err, cleanup_handler);
510				dispatch_resume(channel->queue);
511				_dispatch_release(channel);
512				_dispatch_release(in_channel);
513				_dispatch_release(queue);
514				return;
515			}
516			if (in_channel->fd == -1) {
517				// in_channel was created from path
518				channel->fd = -1;
519				channel->fd_actual = -1;
520				mode_t mode = in_channel->fd_entry->stat.mode;
521				dev_t dev = in_channel->fd_entry->stat.dev;
522				size_t path_data_len = sizeof(struct dispatch_io_path_data_s) +
523						in_channel->fd_entry->path_data->pathlen + 1;
524				dispatch_io_path_data_t path_data = malloc(path_data_len);
525				memcpy(path_data, in_channel->fd_entry->path_data,
526						path_data_len);
527				path_data->channel = channel;
528				// lockq_io_devs is known to already exist
529				dispatch_async(_dispatch_io_devs_lockq, ^{
530					dispatch_fd_entry_t fd_entry;
531					fd_entry = _dispatch_fd_entry_create_with_path(path_data,
532							dev, mode);
533					_dispatch_io_init(channel, fd_entry, queue, 0,
534							cleanup_handler);
535					dispatch_resume(channel->queue);
536					_dispatch_release(channel);
537					_dispatch_release(queue);
538				});
539			} else {
540				dispatch_fd_entry_t fd_entry = in_channel->fd_entry;
541				channel->fd = in_channel->fd;
542				channel->fd_actual = in_channel->fd_actual;
543				_dispatch_fd_entry_retain(fd_entry);
544				_dispatch_io_init(channel, fd_entry, queue, 0, cleanup_handler);
545				dispatch_resume(channel->queue);
546				_dispatch_release(channel);
547				_dispatch_release(queue);
548			}
549			_dispatch_release(in_channel);
550			_dispatch_object_debug(channel, "%s", __func__);
551		});
552	});
553	_dispatch_object_debug(channel, "%s", __func__);
554	return channel;
555}
556
557dispatch_io_t
558dispatch_io_create_with_io_f(dispatch_io_type_t type, dispatch_io_t in_channel,
559		dispatch_queue_t queue, void *context,
560		void (*cleanup_handler)(void *context, int error))
561{
562	return dispatch_io_create_with_io(type, in_channel, queue,
563			!cleanup_handler ? NULL :
564			^(int error){ cleanup_handler(context, error); });
565}
566
567#pragma mark -
568#pragma mark dispatch_io_accessors
569
570void
571dispatch_io_set_high_water(dispatch_io_t channel, size_t high_water)
572{
573	_dispatch_retain(channel);
574	dispatch_async(channel->queue, ^{
575		_dispatch_fd_debug("io set high water", channel->fd);
576		if (channel->params.low > high_water) {
577			channel->params.low = high_water;
578		}
579		channel->params.high = high_water ? high_water : 1;
580		_dispatch_release(channel);
581	});
582}
583
584void
585dispatch_io_set_low_water(dispatch_io_t channel, size_t low_water)
586{
587	_dispatch_retain(channel);
588	dispatch_async(channel->queue, ^{
589		_dispatch_fd_debug("io set low water", channel->fd);
590		if (channel->params.high < low_water) {
591			channel->params.high = low_water ? low_water : 1;
592		}
593		channel->params.low = low_water;
594		_dispatch_release(channel);
595	});
596}
597
598void
599dispatch_io_set_interval(dispatch_io_t channel, uint64_t interval,
600		unsigned long flags)
601{
602	_dispatch_retain(channel);
603	dispatch_async(channel->queue, ^{
604		_dispatch_fd_debug("io set interval", channel->fd);
605		channel->params.interval = interval < INT64_MAX ? interval : INT64_MAX;
606		channel->params.interval_flags = flags;
607		_dispatch_release(channel);
608	});
609}
610
611void
612_dispatch_io_set_target_queue(dispatch_io_t channel, dispatch_queue_t dq)
613{
614	_dispatch_retain(dq);
615	_dispatch_retain(channel);
616	dispatch_async(channel->queue, ^{
617		dispatch_queue_t prev_dq = channel->do_targetq;
618		channel->do_targetq = dq;
619		_dispatch_release(prev_dq);
620		_dispatch_object_debug(channel, "%s", __func__);
621		_dispatch_release(channel);
622	});
623}
624
625dispatch_fd_t
626dispatch_io_get_descriptor(dispatch_io_t channel)
627{
628	if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
629		return -1;
630	}
631	dispatch_fd_t fd = channel->fd_actual;
632	if (fd == -1 && _dispatch_thread_getspecific(dispatch_io_key) == channel &&
633			!_dispatch_io_get_error(NULL, channel, false)) {
634		dispatch_fd_entry_t fd_entry = channel->fd_entry;
635		(void)_dispatch_fd_entry_open(fd_entry, channel);
636	}
637	return channel->fd_actual;
638}
639
640#pragma mark -
641#pragma mark dispatch_io_operations
642
643static void
644_dispatch_io_stop(dispatch_io_t channel)
645{
646	_dispatch_fd_debug("io stop", channel->fd);
647	(void)dispatch_atomic_or2o(channel, atomic_flags, DIO_STOPPED, relaxed);
648	_dispatch_retain(channel);
649	dispatch_async(channel->queue, ^{
650		dispatch_async(channel->barrier_queue, ^{
651			_dispatch_object_debug(channel, "%s", __func__);
652			dispatch_fd_entry_t fd_entry = channel->fd_entry;
653			if (fd_entry) {
654				_dispatch_fd_debug("io stop cleanup", channel->fd);
655				_dispatch_fd_entry_cleanup_operations(fd_entry, channel);
656				if (!(channel->atomic_flags & DIO_CLOSED)) {
657					channel->fd_entry = NULL;
658					_dispatch_fd_entry_release(fd_entry);
659				}
660			} else if (channel->fd != -1) {
661				// Stop after close, need to check if fd_entry still exists
662				_dispatch_retain(channel);
663				dispatch_async(_dispatch_io_fds_lockq, ^{
664					_dispatch_object_debug(channel, "%s", __func__);
665					_dispatch_fd_debug("io stop after close cleanup",
666							channel->fd);
667					dispatch_fd_entry_t fdi;
668					uintptr_t hash = DIO_HASH(channel->fd);
669					TAILQ_FOREACH(fdi, &_dispatch_io_fds[hash], fd_list) {
670						if (fdi->fd == channel->fd) {
671							_dispatch_fd_entry_cleanup_operations(fdi, channel);
672							break;
673						}
674					}
675					_dispatch_release(channel);
676				});
677			}
678			_dispatch_release(channel);
679		});
680	});
681}
682
683void
684dispatch_io_close(dispatch_io_t channel, unsigned long flags)
685{
686	if (flags & DISPATCH_IO_STOP) {
687		// Don't stop an already stopped channel
688		if (channel->atomic_flags & DIO_STOPPED) {
689			return;
690		}
691		return _dispatch_io_stop(channel);
692	}
693	// Don't close an already closed or stopped channel
694	if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
695		return;
696	}
697	_dispatch_retain(channel);
698	dispatch_async(channel->queue, ^{
699		dispatch_async(channel->barrier_queue, ^{
700			_dispatch_object_debug(channel, "%s", __func__);
701			_dispatch_fd_debug("io close", channel->fd);
702			if (!(channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED))) {
703				(void)dispatch_atomic_or2o(channel, atomic_flags, DIO_CLOSED,
704						relaxed);
705				dispatch_fd_entry_t fd_entry = channel->fd_entry;
706				if (fd_entry) {
707					if (!fd_entry->path_data) {
708						channel->fd_entry = NULL;
709					}
710					_dispatch_fd_entry_release(fd_entry);
711				}
712			}
713			_dispatch_release(channel);
714		});
715	});
716}
717
718void
719dispatch_io_barrier(dispatch_io_t channel, dispatch_block_t barrier)
720{
721	_dispatch_retain(channel);
722	dispatch_async(channel->queue, ^{
723		dispatch_queue_t io_q = channel->do_targetq;
724		dispatch_queue_t barrier_queue = channel->barrier_queue;
725		dispatch_group_t barrier_group = channel->barrier_group;
726		dispatch_async(barrier_queue, ^{
727			dispatch_suspend(barrier_queue);
728			dispatch_group_notify(barrier_group, io_q, ^{
729				_dispatch_object_debug(channel, "%s", __func__);
730				_dispatch_thread_setspecific(dispatch_io_key, channel);
731				barrier();
732				_dispatch_thread_setspecific(dispatch_io_key, NULL);
733				dispatch_resume(barrier_queue);
734				_dispatch_release(channel);
735			});
736		});
737	});
738}
739
740void
741dispatch_io_barrier_f(dispatch_io_t channel, void *context,
742		dispatch_function_t barrier)
743{
744	return dispatch_io_barrier(channel, ^{ barrier(context); });
745}
746
747void
748dispatch_io_read(dispatch_io_t channel, off_t offset, size_t length,
749		dispatch_queue_t queue, dispatch_io_handler_t handler)
750{
751	_dispatch_retain(channel);
752	_dispatch_retain(queue);
753	dispatch_async(channel->queue, ^{
754		dispatch_operation_t op;
755		op = _dispatch_operation_create(DOP_DIR_READ, channel, offset,
756				length, dispatch_data_empty, queue, handler);
757		if (op) {
758			dispatch_queue_t barrier_q = channel->barrier_queue;
759			dispatch_async(barrier_q, ^{
760				_dispatch_operation_enqueue(op, DOP_DIR_READ,
761						dispatch_data_empty);
762			});
763		}
764		_dispatch_release(channel);
765		_dispatch_release(queue);
766	});
767}
768
769void
770dispatch_io_read_f(dispatch_io_t channel, off_t offset, size_t length,
771		dispatch_queue_t queue, void *context,
772		dispatch_io_handler_function_t handler)
773{
774	return dispatch_io_read(channel, offset, length, queue,
775			^(bool done, dispatch_data_t d, int error){
776		handler(context, done, d, error);
777	});
778}
779
780void
781dispatch_io_write(dispatch_io_t channel, off_t offset, dispatch_data_t data,
782		dispatch_queue_t queue, dispatch_io_handler_t handler)
783{
784	_dispatch_io_data_retain(data);
785	_dispatch_retain(channel);
786	_dispatch_retain(queue);
787	dispatch_async(channel->queue, ^{
788		dispatch_operation_t op;
789		op = _dispatch_operation_create(DOP_DIR_WRITE, channel, offset,
790				dispatch_data_get_size(data), data, queue, handler);
791		if (op) {
792			dispatch_queue_t barrier_q = channel->barrier_queue;
793			dispatch_async(barrier_q, ^{
794				_dispatch_operation_enqueue(op, DOP_DIR_WRITE, data);
795				_dispatch_io_data_release(data);
796			});
797		} else {
798			_dispatch_io_data_release(data);
799		}
800		_dispatch_release(channel);
801		_dispatch_release(queue);
802	});
803}
804
805void
806dispatch_io_write_f(dispatch_io_t channel, off_t offset, dispatch_data_t data,
807		dispatch_queue_t queue, void *context,
808		dispatch_io_handler_function_t handler)
809{
810	return dispatch_io_write(channel, offset, data, queue,
811			^(bool done, dispatch_data_t d, int error){
812		handler(context, done, d, error);
813	});
814}
815
816void
817dispatch_read(dispatch_fd_t fd, size_t length, dispatch_queue_t queue,
818		void (^handler)(dispatch_data_t, int))
819{
820	_dispatch_retain(queue);
821	_dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
822		// On barrier queue
823		if (fd_entry->err) {
824			int err = fd_entry->err;
825			dispatch_async(queue, ^{
826				_dispatch_fd_debug("convenience handler invoke", fd);
827				handler(dispatch_data_empty, err);
828			});
829			_dispatch_release(queue);
830			return;
831		}
832		// Safe to access fd_entry on barrier queue
833		dispatch_io_t channel = fd_entry->convenience_channel;
834		if (!channel) {
835			channel = _dispatch_io_create(DISPATCH_IO_STREAM);
836			channel->fd = fd;
837			channel->fd_actual = fd;
838			channel->fd_entry = fd_entry;
839			dispatch_retain(fd_entry->barrier_queue);
840			dispatch_retain(fd_entry->barrier_group);
841			channel->barrier_queue = fd_entry->barrier_queue;
842			channel->barrier_group = fd_entry->barrier_group;
843			fd_entry->convenience_channel = channel;
844		}
845		__block dispatch_data_t deliver_data = dispatch_data_empty;
846		__block int err = 0;
847		dispatch_async(fd_entry->close_queue, ^{
848			dispatch_async(queue, ^{
849				_dispatch_fd_debug("convenience handler invoke", fd);
850				handler(deliver_data, err);
851				_dispatch_io_data_release(deliver_data);
852			});
853			_dispatch_release(queue);
854		});
855		dispatch_operation_t op =
856			_dispatch_operation_create(DOP_DIR_READ, channel, 0,
857					length, dispatch_data_empty,
858					_dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,false),
859					^(bool done, dispatch_data_t data, int error) {
860				if (data) {
861					data = dispatch_data_create_concat(deliver_data, data);
862					_dispatch_io_data_release(deliver_data);
863					deliver_data = data;
864				}
865				if (done) {
866					err = error;
867				}
868			});
869		if (op) {
870			_dispatch_operation_enqueue(op, DOP_DIR_READ, dispatch_data_empty);
871		}
872	});
873}
874
875void
876dispatch_read_f(dispatch_fd_t fd, size_t length, dispatch_queue_t queue,
877		void *context, void (*handler)(void *, dispatch_data_t, int))
878{
879	return dispatch_read(fd, length, queue, ^(dispatch_data_t d, int error){
880		handler(context, d, error);
881	});
882}
883
884void
885dispatch_write(dispatch_fd_t fd, dispatch_data_t data, dispatch_queue_t queue,
886		void (^handler)(dispatch_data_t, int))
887{
888	_dispatch_io_data_retain(data);
889	_dispatch_retain(queue);
890	_dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
891		// On barrier queue
892		if (fd_entry->err) {
893			int err = fd_entry->err;
894			dispatch_async(queue, ^{
895				_dispatch_fd_debug("convenience handler invoke", fd);
896				handler(NULL, err);
897			});
898			_dispatch_release(queue);
899			return;
900		}
901		// Safe to access fd_entry on barrier queue
902		dispatch_io_t channel = fd_entry->convenience_channel;
903		if (!channel) {
904			channel = _dispatch_io_create(DISPATCH_IO_STREAM);
905			channel->fd = fd;
906			channel->fd_actual = fd;
907			channel->fd_entry = fd_entry;
908			dispatch_retain(fd_entry->barrier_queue);
909			dispatch_retain(fd_entry->barrier_group);
910			channel->barrier_queue = fd_entry->barrier_queue;
911			channel->barrier_group = fd_entry->barrier_group;
912			fd_entry->convenience_channel = channel;
913		}
914		__block dispatch_data_t deliver_data = NULL;
915		__block int err = 0;
916		dispatch_async(fd_entry->close_queue, ^{
917			dispatch_async(queue, ^{
918				_dispatch_fd_debug("convenience handler invoke", fd);
919				handler(deliver_data, err);
920				if (deliver_data) {
921					_dispatch_io_data_release(deliver_data);
922				}
923			});
924			_dispatch_release(queue);
925		});
926		dispatch_operation_t op =
927			_dispatch_operation_create(DOP_DIR_WRITE, channel, 0,
928					dispatch_data_get_size(data), data,
929					_dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,false),
930					^(bool done, dispatch_data_t d, int error) {
931				if (done) {
932					if (d) {
933						_dispatch_io_data_retain(d);
934						deliver_data = d;
935					}
936					err = error;
937				}
938			});
939		if (op) {
940			_dispatch_operation_enqueue(op, DOP_DIR_WRITE, data);
941		}
942		_dispatch_io_data_release(data);
943	});
944}
945
946void
947dispatch_write_f(dispatch_fd_t fd, dispatch_data_t data, dispatch_queue_t queue,
948		void *context, void (*handler)(void *, dispatch_data_t, int))
949{
950	return dispatch_write(fd, data, queue, ^(dispatch_data_t d, int error){
951		handler(context, d, error);
952	});
953}
954
955#pragma mark -
956#pragma mark dispatch_operation_t
957
958static dispatch_operation_t
959_dispatch_operation_create(dispatch_op_direction_t direction,
960		dispatch_io_t channel, off_t offset, size_t length,
961		dispatch_data_t data, dispatch_queue_t queue,
962		dispatch_io_handler_t handler)
963{
964	// On channel queue
965	dispatch_assert(direction < DOP_DIR_MAX);
966	_dispatch_fd_debug("operation create", channel->fd);
967#if DISPATCH_IO_DEBUG
968	int fd = channel->fd;
969#endif
970	// Safe to call _dispatch_io_get_error() with channel->fd_entry since
971	// that can only be NULL if atomic_flags are set rdar://problem/8362514
972	int err = _dispatch_io_get_error(NULL, channel, false);
973	if (err || !length) {
974		_dispatch_io_data_retain(data);
975		_dispatch_retain(queue);
976		dispatch_async(channel->barrier_queue, ^{
977			dispatch_async(queue, ^{
978				dispatch_data_t d = data;
979				if (direction == DOP_DIR_READ && err) {
980					d = NULL;
981				} else if (direction == DOP_DIR_WRITE && !err) {
982					d = NULL;
983				}
984				_dispatch_fd_debug("IO handler invoke", fd);
985				handler(true, d, err);
986				_dispatch_io_data_release(data);
987			});
988			_dispatch_release(queue);
989		});
990		return NULL;
991	}
992	dispatch_operation_t op = _dispatch_alloc(DISPATCH_VTABLE(operation),
993			sizeof(struct dispatch_operation_s));
994	op->do_next = DISPATCH_OBJECT_LISTLESS;
995	op->do_xref_cnt = -1; // operation object is not exposed externally
996	op->op_q = dispatch_queue_create("com.apple.libdispatch-io.opq", NULL);
997	op->op_q->do_targetq = queue;
998	_dispatch_retain(queue);
999	op->active = false;
1000	op->direction = direction;
1001	op->offset = offset + channel->f_ptr;
1002	op->length = length;
1003	op->handler = _dispatch_io_Block_copy(handler);
1004	_dispatch_retain(channel);
1005	op->channel = channel;
1006	op->params = channel->params;
1007	// Take a snapshot of the priority of the channel queue. The actual I/O
1008	// for this operation will be performed at this priority
1009	dispatch_queue_t targetq = op->channel->do_targetq;
1010	while (fastpath(targetq->do_targetq)) {
1011		targetq = targetq->do_targetq;
1012	}
1013	op->do_targetq = targetq;
1014	_dispatch_object_debug(op, "%s", __func__);
1015	return op;
1016}
1017
1018void
1019_dispatch_operation_dispose(dispatch_operation_t op)
1020{
1021	_dispatch_object_debug(op, "%s", __func__);
1022	// Deliver the data if there's any
1023	if (op->fd_entry) {
1024		_dispatch_operation_deliver_data(op, DOP_DONE);
1025		dispatch_group_leave(op->fd_entry->barrier_group);
1026		_dispatch_fd_entry_release(op->fd_entry);
1027	}
1028	if (op->channel) {
1029		_dispatch_release(op->channel);
1030	}
1031	if (op->timer) {
1032		dispatch_release(op->timer);
1033	}
1034	// For write operations, op->buf is owned by op->buf_data
1035	if (op->buf && op->direction == DOP_DIR_READ) {
1036		free(op->buf);
1037	}
1038	if (op->buf_data) {
1039		_dispatch_io_data_release(op->buf_data);
1040	}
1041	if (op->data) {
1042		_dispatch_io_data_release(op->data);
1043	}
1044	if (op->op_q) {
1045		dispatch_release(op->op_q);
1046	}
1047	Block_release(op->handler);
1048}
1049
1050static void
1051_dispatch_operation_enqueue(dispatch_operation_t op,
1052		dispatch_op_direction_t direction, dispatch_data_t data)
1053{
1054	// Called from the barrier queue
1055	_dispatch_io_data_retain(data);
1056	// If channel is closed or stopped, then call the handler immediately
1057	int err = _dispatch_io_get_error(NULL, op->channel, false);
1058	if (err) {
1059		dispatch_io_handler_t handler = op->handler;
1060		dispatch_async(op->op_q, ^{
1061			dispatch_data_t d = data;
1062			if (direction == DOP_DIR_READ && err) {
1063				d = NULL;
1064			} else if (direction == DOP_DIR_WRITE && !err) {
1065				d = NULL;
1066			}
1067			handler(true, d, err);
1068			_dispatch_io_data_release(data);
1069		});
1070		_dispatch_release(op);
1071		return;
1072	}
1073	// Finish operation init
1074	op->fd_entry = op->channel->fd_entry;
1075	_dispatch_fd_entry_retain(op->fd_entry);
1076	dispatch_group_enter(op->fd_entry->barrier_group);
1077	dispatch_disk_t disk = op->fd_entry->disk;
1078	if (!disk) {
1079		dispatch_stream_t stream = op->fd_entry->streams[direction];
1080		dispatch_async(stream->dq, ^{
1081			_dispatch_stream_enqueue_operation(stream, op, data);
1082			_dispatch_io_data_release(data);
1083		});
1084	} else {
1085		dispatch_async(disk->pick_queue, ^{
1086			_dispatch_disk_enqueue_operation(disk, op, data);
1087			_dispatch_io_data_release(data);
1088		});
1089	}
1090}
1091
1092static bool
1093_dispatch_operation_should_enqueue(dispatch_operation_t op,
1094		dispatch_queue_t tq, dispatch_data_t data)
1095{
1096	// On stream queue or disk queue
1097	_dispatch_fd_debug("enqueue operation", op->fd_entry->fd);
1098	_dispatch_io_data_retain(data);
1099	op->data = data;
1100	int err = _dispatch_io_get_error(op, NULL, true);
1101	if (err) {
1102		op->err = err;
1103		// Final release
1104		_dispatch_release(op);
1105		return false;
1106	}
1107	if (op->params.interval) {
1108		dispatch_resume(_dispatch_operation_timer(tq, op));
1109	}
1110	return true;
1111}
1112
1113static dispatch_source_t
1114_dispatch_operation_timer(dispatch_queue_t tq, dispatch_operation_t op)
1115{
1116	// On stream queue or pick queue
1117	if (op->timer) {
1118		return op->timer;
1119	}
1120	dispatch_source_t timer = dispatch_source_create(
1121			DISPATCH_SOURCE_TYPE_TIMER, 0, 0, tq);
1122	dispatch_source_set_timer(timer, dispatch_time(DISPATCH_TIME_NOW,
1123			(int64_t)op->params.interval), op->params.interval, 0);
1124	dispatch_source_set_event_handler(timer, ^{
1125		// On stream queue or pick queue
1126		if (dispatch_source_testcancel(timer)) {
1127			// Do nothing. The operation has already completed
1128			return;
1129		}
1130		dispatch_op_flags_t flags = DOP_DEFAULT;
1131		if (op->params.interval_flags & DISPATCH_IO_STRICT_INTERVAL) {
1132			// Deliver even if there is less data than the low-water mark
1133			flags |= DOP_DELIVER;
1134		}
1135		// If the operation is active, dont deliver data
1136		if ((op->active) && (flags & DOP_DELIVER)) {
1137			op->flags = flags;
1138		} else {
1139			_dispatch_operation_deliver_data(op, flags);
1140		}
1141	});
1142	op->timer = timer;
1143	return op->timer;
1144}
1145
1146#pragma mark -
1147#pragma mark dispatch_fd_entry_t
1148
1149#if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
1150static void
1151_dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry)
1152{
1153	guardid_t guard = fd_entry;
1154	const unsigned int guard_flags = GUARD_CLOSE;
1155	int err, fd_flags = 0;
1156	_dispatch_io_syscall_switch_noerr(err,
1157		change_fdguard_np(fd_entry->fd, NULL, 0, &guard, guard_flags,
1158				&fd_flags),
1159		case 0:
1160			fd_entry->guard_flags = guard_flags;
1161			fd_entry->orig_fd_flags = fd_flags;
1162			break;
1163		case EPERM: break;
1164		default: (void)dispatch_assume_zero(err); break;
1165	);
1166}
1167
1168static void
1169_dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry)
1170{
1171	if (!fd_entry->guard_flags) {
1172		return;
1173	}
1174	guardid_t guard = fd_entry;
1175	int err, fd_flags = fd_entry->orig_fd_flags;
1176	_dispatch_io_syscall_switch(err,
1177		change_fdguard_np(fd_entry->fd, &guard, fd_entry->guard_flags, NULL, 0,
1178				&fd_flags),
1179		default: (void)dispatch_assume_zero(err); break;
1180	);
1181}
1182#else
1183static inline void
1184_dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry) { (void)fd_entry; }
1185static inline void
1186_dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry) { (void)fd_entry; }
1187#endif // DISPATCH_USE_GUARDED_FD
1188
1189static inline int
1190_dispatch_fd_entry_guarded_open(dispatch_fd_entry_t fd_entry, const char *path,
1191		int oflag, mode_t mode) {
1192#if DISPATCH_USE_GUARDED_FD
1193	guardid_t guard = (uintptr_t)fd_entry;
1194	const unsigned int guard_flags = GUARD_CLOSE | GUARD_DUP |
1195			GUARD_SOCKET_IPC | GUARD_FILEPORT;
1196	int fd = guarded_open_np(path, &guard, guard_flags, oflag | O_CLOEXEC,
1197			mode);
1198	if (fd != -1) {
1199		fd_entry->guard_flags = guard_flags;
1200		return fd;
1201	}
1202	errno = 0;
1203#endif
1204	return open(path, oflag, mode);
1205	(void)fd_entry;
1206}
1207
1208static inline int
1209_dispatch_fd_entry_guarded_close(dispatch_fd_entry_t fd_entry, int fd) {
1210#if DISPATCH_USE_GUARDED_FD
1211	if (fd_entry->guard_flags) {
1212		guardid_t guard = (uintptr_t)fd_entry;
1213		return guarded_close_np(fd, &guard);
1214	} else
1215#endif
1216	{
1217		return close(fd);
1218	}
1219	(void)fd_entry;
1220}
1221
1222static inline void
1223_dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry) {
1224	dispatch_suspend(fd_entry->close_queue);
1225}
1226
1227static inline void
1228_dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry) {
1229	dispatch_resume(fd_entry->close_queue);
1230}
1231
1232static void
1233_dispatch_fd_entry_init_async(dispatch_fd_t fd,
1234		dispatch_fd_entry_init_callback_t completion_callback)
1235{
1236	static dispatch_once_t _dispatch_io_fds_lockq_pred;
1237	dispatch_once_f(&_dispatch_io_fds_lockq_pred, NULL,
1238			_dispatch_io_fds_lockq_init);
1239	dispatch_async(_dispatch_io_fds_lockq, ^{
1240		_dispatch_fd_debug("fd entry init", fd);
1241		dispatch_fd_entry_t fd_entry = NULL;
1242		// Check to see if there is an existing entry for the given fd
1243		uintptr_t hash = DIO_HASH(fd);
1244		TAILQ_FOREACH(fd_entry, &_dispatch_io_fds[hash], fd_list) {
1245			if (fd_entry->fd == fd) {
1246				// Retain the fd_entry to ensure it cannot go away until the
1247				// stat() has completed
1248				_dispatch_fd_entry_retain(fd_entry);
1249				break;
1250			}
1251		}
1252		if (!fd_entry) {
1253			// If we did not find an existing entry, create one
1254			fd_entry = _dispatch_fd_entry_create_with_fd(fd, hash);
1255		}
1256		dispatch_async(fd_entry->barrier_queue, ^{
1257			_dispatch_fd_debug("fd entry init completion", fd);
1258			completion_callback(fd_entry);
1259			// stat() is complete, release reference to fd_entry
1260			_dispatch_fd_entry_release(fd_entry);
1261		});
1262	});
1263}
1264
1265static dispatch_fd_entry_t
1266_dispatch_fd_entry_create(dispatch_queue_t q)
1267{
1268	dispatch_fd_entry_t fd_entry;
1269	fd_entry = _dispatch_calloc(1ul, sizeof(struct dispatch_fd_entry_s));
1270	fd_entry->close_queue = dispatch_queue_create(
1271			"com.apple.libdispatch-io.closeq", NULL);
1272	// Use target queue to ensure that no concurrent lookups are going on when
1273	// the close queue is running
1274	fd_entry->close_queue->do_targetq = q;
1275	_dispatch_retain(q);
1276	// Suspend the cleanup queue until closing
1277	_dispatch_fd_entry_retain(fd_entry);
1278	return fd_entry;
1279}
1280
1281static dispatch_fd_entry_t
1282_dispatch_fd_entry_create_with_fd(dispatch_fd_t fd, uintptr_t hash)
1283{
1284	// On fds lock queue
1285	_dispatch_fd_debug("fd entry create", fd);
1286	dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create(
1287			_dispatch_io_fds_lockq);
1288	fd_entry->fd = fd;
1289	TAILQ_INSERT_TAIL(&_dispatch_io_fds[hash], fd_entry, fd_list);
1290	fd_entry->barrier_queue = dispatch_queue_create(
1291			"com.apple.libdispatch-io.barrierq", NULL);
1292	fd_entry->barrier_group = dispatch_group_create();
1293	dispatch_async(fd_entry->barrier_queue, ^{
1294		_dispatch_fd_debug("fd entry stat", fd);
1295		int err, orig_flags, orig_nosigpipe = -1;
1296		struct stat st;
1297		_dispatch_io_syscall_switch(err,
1298			fstat(fd, &st),
1299			default: fd_entry->err = err; return;
1300		);
1301		fd_entry->stat.dev = st.st_dev;
1302		fd_entry->stat.mode = st.st_mode;
1303		_dispatch_fd_entry_guard(fd_entry);
1304		_dispatch_io_syscall_switch(err,
1305			orig_flags = fcntl(fd, F_GETFL),
1306			default: (void)dispatch_assume_zero(err); break;
1307		);
1308#if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
1309		if (S_ISFIFO(st.st_mode)) {
1310			_dispatch_io_syscall_switch(err,
1311				orig_nosigpipe = fcntl(fd, F_GETNOSIGPIPE),
1312				default: (void)dispatch_assume_zero(err); break;
1313			);
1314			if (orig_nosigpipe != -1) {
1315				_dispatch_io_syscall_switch(err,
1316					orig_nosigpipe = fcntl(fd, F_SETNOSIGPIPE, 1),
1317					default:
1318						orig_nosigpipe = -1;
1319						(void)dispatch_assume_zero(err);
1320						break;
1321				);
1322			}
1323		}
1324#endif
1325		if (S_ISREG(st.st_mode)) {
1326			if (orig_flags != -1) {
1327				_dispatch_io_syscall_switch(err,
1328					fcntl(fd, F_SETFL, orig_flags & ~O_NONBLOCK),
1329					default:
1330						orig_flags = -1;
1331						(void)dispatch_assume_zero(err);
1332						break;
1333				);
1334			}
1335			int32_t dev = major(st.st_dev);
1336			// We have to get the disk on the global dev queue. The
1337			// barrier queue cannot continue until that is complete
1338			dispatch_suspend(fd_entry->barrier_queue);
1339			dispatch_once_f(&_dispatch_io_devs_lockq_pred, NULL,
1340					_dispatch_io_devs_lockq_init);
1341			dispatch_async(_dispatch_io_devs_lockq, ^{
1342				_dispatch_disk_init(fd_entry, dev);
1343				dispatch_resume(fd_entry->barrier_queue);
1344			});
1345		} else {
1346			if (orig_flags != -1) {
1347				_dispatch_io_syscall_switch(err,
1348					fcntl(fd, F_SETFL, orig_flags | O_NONBLOCK),
1349					default:
1350						orig_flags = -1;
1351						(void)dispatch_assume_zero(err);
1352						break;
1353				);
1354			}
1355			_dispatch_stream_init(fd_entry, _dispatch_get_root_queue(
1356					_DISPATCH_QOS_CLASS_DEFAULT, false));
1357		}
1358		fd_entry->orig_flags = orig_flags;
1359		fd_entry->orig_nosigpipe = orig_nosigpipe;
1360	});
1361	// This is the first item run when the close queue is resumed, indicating
1362	// that all channels associated with this entry have been closed and that
1363	// all operations associated with this entry have been freed
1364	dispatch_async(fd_entry->close_queue, ^{
1365		if (!fd_entry->disk) {
1366			_dispatch_fd_debug("close queue fd_entry cleanup", fd);
1367			dispatch_op_direction_t dir;
1368			for (dir = 0; dir < DOP_DIR_MAX; dir++) {
1369				_dispatch_stream_dispose(fd_entry, dir);
1370			}
1371		} else {
1372			dispatch_disk_t disk = fd_entry->disk;
1373			dispatch_async(_dispatch_io_devs_lockq, ^{
1374				_dispatch_release(disk);
1375			});
1376		}
1377		// Remove this entry from the global fd list
1378		TAILQ_REMOVE(&_dispatch_io_fds[hash], fd_entry, fd_list);
1379	});
1380	// If there was a source associated with this stream, disposing of the
1381	// source cancels it and suspends the close queue. Freeing the fd_entry
1382	// structure must happen after the source cancel handler has finished
1383	dispatch_async(fd_entry->close_queue, ^{
1384		_dispatch_fd_debug("close queue release", fd);
1385		dispatch_release(fd_entry->close_queue);
1386		_dispatch_fd_debug("barrier queue release", fd);
1387		dispatch_release(fd_entry->barrier_queue);
1388		_dispatch_fd_debug("barrier group release", fd);
1389		dispatch_release(fd_entry->barrier_group);
1390		if (fd_entry->orig_flags != -1) {
1391			_dispatch_io_syscall(
1392				fcntl(fd, F_SETFL, fd_entry->orig_flags)
1393			);
1394		}
1395#if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
1396		if (fd_entry->orig_nosigpipe != -1) {
1397			_dispatch_io_syscall(
1398				fcntl(fd, F_SETNOSIGPIPE, fd_entry->orig_nosigpipe)
1399			);
1400		}
1401#endif
1402		_dispatch_fd_entry_unguard(fd_entry);
1403		if (fd_entry->convenience_channel) {
1404			fd_entry->convenience_channel->fd_entry = NULL;
1405			dispatch_release(fd_entry->convenience_channel);
1406		}
1407		free(fd_entry);
1408	});
1409	return fd_entry;
1410}
1411
1412static dispatch_fd_entry_t
1413_dispatch_fd_entry_create_with_path(dispatch_io_path_data_t path_data,
1414		dev_t dev, mode_t mode)
1415{
1416	// On devs lock queue
1417	_dispatch_fd_debug("fd entry create with path %s", -1, path_data->path);
1418	dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create(
1419			path_data->channel->queue);
1420	if (S_ISREG(mode)) {
1421		_dispatch_disk_init(fd_entry, major(dev));
1422	} else {
1423		_dispatch_stream_init(fd_entry, _dispatch_get_root_queue(
1424				_DISPATCH_QOS_CLASS_DEFAULT, false));
1425	}
1426	fd_entry->fd = -1;
1427	fd_entry->orig_flags = -1;
1428	fd_entry->path_data = path_data;
1429	fd_entry->stat.dev = dev;
1430	fd_entry->stat.mode = mode;
1431	fd_entry->barrier_queue = dispatch_queue_create(
1432			"com.apple.libdispatch-io.barrierq", NULL);
1433	fd_entry->barrier_group = dispatch_group_create();
1434	// This is the first item run when the close queue is resumed, indicating
1435	// that the channel associated with this entry has been closed and that
1436	// all operations associated with this entry have been freed
1437	dispatch_async(fd_entry->close_queue, ^{
1438		_dispatch_fd_debug("close queue fd_entry cleanup", -1);
1439		if (!fd_entry->disk) {
1440			dispatch_op_direction_t dir;
1441			for (dir = 0; dir < DOP_DIR_MAX; dir++) {
1442				_dispatch_stream_dispose(fd_entry, dir);
1443			}
1444		}
1445		if (fd_entry->fd != -1) {
1446			_dispatch_fd_entry_guarded_close(fd_entry, fd_entry->fd);
1447		}
1448		if (fd_entry->path_data->channel) {
1449			// If associated channel has not been released yet, mark it as
1450			// no longer having an fd_entry (for stop after close).
1451			// It is safe to modify channel since we are on close_queue with
1452			// target queue the channel queue
1453			fd_entry->path_data->channel->fd_entry = NULL;
1454		}
1455	});
1456	dispatch_async(fd_entry->close_queue, ^{
1457		_dispatch_fd_debug("close queue release", -1);
1458		dispatch_release(fd_entry->close_queue);
1459		dispatch_release(fd_entry->barrier_queue);
1460		dispatch_release(fd_entry->barrier_group);
1461		free(fd_entry->path_data);
1462		free(fd_entry);
1463	});
1464	return fd_entry;
1465}
1466
1467static int
1468_dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry, dispatch_io_t channel)
1469{
1470	if (!(fd_entry->fd == -1 && fd_entry->path_data)) {
1471		return 0;
1472	}
1473	if (fd_entry->err) {
1474		return fd_entry->err;
1475	}
1476	int fd = -1;
1477	int oflag = fd_entry->disk ? fd_entry->path_data->oflag & ~O_NONBLOCK :
1478			fd_entry->path_data->oflag | O_NONBLOCK;
1479open:
1480	fd = _dispatch_fd_entry_guarded_open(fd_entry, fd_entry->path_data->path,
1481			oflag, fd_entry->path_data->mode);
1482	if (fd == -1) {
1483		int err = errno;
1484		if (err == EINTR) {
1485			goto open;
1486		}
1487		(void)dispatch_atomic_cmpxchg2o(fd_entry, err, 0, err, relaxed);
1488		return err;
1489	}
1490	if (!dispatch_atomic_cmpxchg2o(fd_entry, fd, -1, fd, relaxed)) {
1491		// Lost the race with another open
1492		_dispatch_fd_entry_guarded_close(fd_entry, fd);
1493	} else {
1494		channel->fd_actual = fd;
1495	}
1496	_dispatch_object_debug(channel, "%s", __func__);
1497	return 0;
1498}
1499
1500static void
1501_dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry,
1502		dispatch_io_t channel)
1503{
1504	if (fd_entry->disk) {
1505		if (channel) {
1506			_dispatch_retain(channel);
1507		}
1508		_dispatch_fd_entry_retain(fd_entry);
1509		dispatch_async(fd_entry->disk->pick_queue, ^{
1510			_dispatch_disk_cleanup_operations(fd_entry->disk, channel);
1511			_dispatch_fd_entry_release(fd_entry);
1512			if (channel) {
1513				_dispatch_release(channel);
1514			}
1515		});
1516	} else {
1517		dispatch_op_direction_t direction;
1518		for (direction = 0; direction < DOP_DIR_MAX; direction++) {
1519			dispatch_stream_t stream = fd_entry->streams[direction];
1520			if (!stream) {
1521				continue;
1522			}
1523			if (channel) {
1524				_dispatch_retain(channel);
1525			}
1526			_dispatch_fd_entry_retain(fd_entry);
1527			dispatch_async(stream->dq, ^{
1528				_dispatch_stream_cleanup_operations(stream, channel);
1529				_dispatch_fd_entry_release(fd_entry);
1530				if (channel) {
1531					_dispatch_release(channel);
1532				}
1533			});
1534		}
1535	}
1536}
1537
1538#pragma mark -
1539#pragma mark dispatch_stream_t/dispatch_disk_t
1540
1541static void
1542_dispatch_stream_init(dispatch_fd_entry_t fd_entry, dispatch_queue_t tq)
1543{
1544	dispatch_op_direction_t direction;
1545	for (direction = 0; direction < DOP_DIR_MAX; direction++) {
1546		dispatch_stream_t stream;
1547		stream = _dispatch_calloc(1ul, sizeof(struct dispatch_stream_s));
1548		stream->dq = dispatch_queue_create("com.apple.libdispatch-io.streamq",
1549				NULL);
1550		dispatch_set_context(stream->dq, stream);
1551		_dispatch_retain(tq);
1552		stream->dq->do_targetq = tq;
1553		TAILQ_INIT(&stream->operations[DISPATCH_IO_RANDOM]);
1554		TAILQ_INIT(&stream->operations[DISPATCH_IO_STREAM]);
1555		fd_entry->streams[direction] = stream;
1556	}
1557}
1558
1559static void
1560_dispatch_stream_dispose(dispatch_fd_entry_t fd_entry,
1561		dispatch_op_direction_t direction)
1562{
1563	// On close queue
1564	dispatch_stream_t stream = fd_entry->streams[direction];
1565	if (!stream) {
1566		return;
1567	}
1568	dispatch_assert(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM]));
1569	dispatch_assert(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM]));
1570	if (stream->source) {
1571		// Balanced by source cancel handler:
1572		_dispatch_fd_entry_retain(fd_entry);
1573		dispatch_source_cancel(stream->source);
1574		dispatch_resume(stream->source);
1575		dispatch_release(stream->source);
1576	}
1577	dispatch_set_context(stream->dq, NULL);
1578	dispatch_release(stream->dq);
1579	free(stream);
1580}
1581
1582static void
1583_dispatch_disk_init(dispatch_fd_entry_t fd_entry, dev_t dev)
1584{
1585	// On devs lock queue
1586	dispatch_disk_t disk;
1587	// Check to see if there is an existing entry for the given device
1588	uintptr_t hash = DIO_HASH(dev);
1589	TAILQ_FOREACH(disk, &_dispatch_io_devs[hash], disk_list) {
1590		if (disk->dev == dev) {
1591			_dispatch_retain(disk);
1592			goto out;
1593		}
1594	}
1595	// Otherwise create a new entry
1596	size_t pending_reqs_depth = dispatch_io_defaults.max_pending_io_reqs;
1597	disk = _dispatch_alloc(DISPATCH_VTABLE(disk),
1598			sizeof(struct dispatch_disk_s) +
1599			(pending_reqs_depth * sizeof(dispatch_operation_t)));
1600	disk->do_next = DISPATCH_OBJECT_LISTLESS;
1601	disk->do_xref_cnt = -1;
1602	disk->advise_list_depth = pending_reqs_depth;
1603	disk->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,
1604			false);
1605	disk->dev = dev;
1606	TAILQ_INIT(&disk->operations);
1607	disk->cur_rq = TAILQ_FIRST(&disk->operations);
1608	char label[45];
1609	snprintf(label, sizeof(label), "com.apple.libdispatch-io.deviceq.%d", dev);
1610	disk->pick_queue = dispatch_queue_create(label, NULL);
1611	TAILQ_INSERT_TAIL(&_dispatch_io_devs[hash], disk, disk_list);
1612out:
1613	fd_entry->disk = disk;
1614	TAILQ_INIT(&fd_entry->stream_ops);
1615}
1616
1617void
1618_dispatch_disk_dispose(dispatch_disk_t disk)
1619{
1620	uintptr_t hash = DIO_HASH(disk->dev);
1621	TAILQ_REMOVE(&_dispatch_io_devs[hash], disk, disk_list);
1622	dispatch_assert(TAILQ_EMPTY(&disk->operations));
1623	size_t i;
1624	for (i=0; i<disk->advise_list_depth; ++i) {
1625		dispatch_assert(!disk->advise_list[i]);
1626	}
1627	dispatch_release(disk->pick_queue);
1628}
1629
1630#pragma mark -
1631#pragma mark dispatch_stream_operations/dispatch_disk_operations
1632
1633static inline bool
1634_dispatch_stream_operation_avail(dispatch_stream_t stream)
1635{
1636	return  !(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])) ||
1637			!(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM]));
1638}
1639
1640static void
1641_dispatch_stream_enqueue_operation(dispatch_stream_t stream,
1642		dispatch_operation_t op, dispatch_data_t data)
1643{
1644	if (!_dispatch_operation_should_enqueue(op, stream->dq, data)) {
1645		return;
1646	}
1647	_dispatch_object_debug(op, "%s", __func__);
1648	bool no_ops = !_dispatch_stream_operation_avail(stream);
1649	TAILQ_INSERT_TAIL(&stream->operations[op->params.type], op, operation_list);
1650	if (no_ops) {
1651		dispatch_async_f(stream->dq, stream->dq,
1652				_dispatch_stream_queue_handler);
1653	}
1654}
1655
1656static void
1657_dispatch_disk_enqueue_operation(dispatch_disk_t disk, dispatch_operation_t op,
1658		dispatch_data_t data)
1659{
1660	if (!_dispatch_operation_should_enqueue(op, disk->pick_queue, data)) {
1661		return;
1662	}
1663	_dispatch_object_debug(op, "%s", __func__);
1664	if (op->params.type == DISPATCH_IO_STREAM) {
1665		if (TAILQ_EMPTY(&op->fd_entry->stream_ops)) {
1666			TAILQ_INSERT_TAIL(&disk->operations, op, operation_list);
1667		}
1668		TAILQ_INSERT_TAIL(&op->fd_entry->stream_ops, op, stream_list);
1669	} else {
1670		TAILQ_INSERT_TAIL(&disk->operations, op, operation_list);
1671	}
1672	_dispatch_disk_handler(disk);
1673}
1674
1675static void
1676_dispatch_stream_complete_operation(dispatch_stream_t stream,
1677		dispatch_operation_t op)
1678{
1679	// On stream queue
1680	_dispatch_object_debug(op, "%s", __func__);
1681	_dispatch_fd_debug("complete operation", op->fd_entry->fd);
1682	TAILQ_REMOVE(&stream->operations[op->params.type], op, operation_list);
1683	if (op == stream->op) {
1684		stream->op = NULL;
1685	}
1686	if (op->timer) {
1687		dispatch_source_cancel(op->timer);
1688	}
1689	// Final release will deliver any pending data
1690	_dispatch_release(op);
1691}
1692
1693static void
1694_dispatch_disk_complete_operation(dispatch_disk_t disk, dispatch_operation_t op)
1695{
1696	// On pick queue
1697	_dispatch_object_debug(op, "%s", __func__);
1698	_dispatch_fd_debug("complete operation", op->fd_entry->fd);
1699	// Current request is always the last op returned
1700	if (disk->cur_rq == op) {
1701		disk->cur_rq = TAILQ_PREV(op, dispatch_disk_operations_s,
1702				operation_list);
1703	}
1704	if (op->params.type == DISPATCH_IO_STREAM) {
1705		// Check if there are other pending stream operations behind it
1706		dispatch_operation_t op_next = TAILQ_NEXT(op, stream_list);
1707		TAILQ_REMOVE(&op->fd_entry->stream_ops, op, stream_list);
1708		if (op_next) {
1709			TAILQ_INSERT_TAIL(&disk->operations, op_next, operation_list);
1710		}
1711	}
1712	TAILQ_REMOVE(&disk->operations, op, operation_list);
1713	if (op->timer) {
1714		dispatch_source_cancel(op->timer);
1715	}
1716	// Final release will deliver any pending data
1717	_dispatch_release(op);
1718}
1719
1720static dispatch_operation_t
1721_dispatch_stream_pick_next_operation(dispatch_stream_t stream,
1722		dispatch_operation_t op)
1723{
1724	// On stream queue
1725	if (!op) {
1726		// On the first run through, pick the first operation
1727		if (!_dispatch_stream_operation_avail(stream)) {
1728			return op;
1729		}
1730		if (!TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM])) {
1731			op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_STREAM]);
1732		} else if (!TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])) {
1733			op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]);
1734		}
1735		return op;
1736	}
1737	if (op->params.type == DISPATCH_IO_STREAM) {
1738		// Stream operations need to be serialized so continue the current
1739		// operation until it is finished
1740		return op;
1741	}
1742	// Get the next random operation (round-robin)
1743	if (op->params.type == DISPATCH_IO_RANDOM) {
1744		op = TAILQ_NEXT(op, operation_list);
1745		if (!op) {
1746			op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]);
1747		}
1748		return op;
1749	}
1750	return NULL;
1751}
1752
1753static dispatch_operation_t
1754_dispatch_disk_pick_next_operation(dispatch_disk_t disk)
1755{
1756	// On pick queue
1757	dispatch_operation_t op;
1758	if (!TAILQ_EMPTY(&disk->operations)) {
1759		if (disk->cur_rq == NULL) {
1760			op = TAILQ_FIRST(&disk->operations);
1761		} else {
1762			op = disk->cur_rq;
1763			do {
1764				op = TAILQ_NEXT(op, operation_list);
1765				if (!op) {
1766					op = TAILQ_FIRST(&disk->operations);
1767				}
1768				// TODO: more involved picking algorithm rdar://problem/8780312
1769			} while (op->active && op != disk->cur_rq);
1770		}
1771		if (!op->active) {
1772			disk->cur_rq = op;
1773			return op;
1774		}
1775	}
1776	return NULL;
1777}
1778
1779static void
1780_dispatch_stream_cleanup_operations(dispatch_stream_t stream,
1781		dispatch_io_t channel)
1782{
1783	// On stream queue
1784	dispatch_operation_t op, tmp;
1785	typeof(*stream->operations) *operations;
1786	operations = &stream->operations[DISPATCH_IO_RANDOM];
1787	TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) {
1788		if (!channel || op->channel == channel) {
1789			_dispatch_stream_complete_operation(stream, op);
1790		}
1791	}
1792	operations = &stream->operations[DISPATCH_IO_STREAM];
1793	TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) {
1794		if (!channel || op->channel == channel) {
1795			_dispatch_stream_complete_operation(stream, op);
1796		}
1797	}
1798	if (stream->source_running && !_dispatch_stream_operation_avail(stream)) {
1799		dispatch_suspend(stream->source);
1800		stream->source_running = false;
1801	}
1802}
1803
1804static void
1805_dispatch_disk_cleanup_operations(dispatch_disk_t disk, dispatch_io_t channel)
1806{
1807	// On pick queue
1808	dispatch_operation_t op, tmp;
1809	TAILQ_FOREACH_SAFE(op, &disk->operations, operation_list, tmp) {
1810		if (!channel || op->channel == channel) {
1811			_dispatch_disk_complete_operation(disk, op);
1812		}
1813	}
1814}
1815
1816#pragma mark -
1817#pragma mark dispatch_stream_handler/dispatch_disk_handler
1818
1819static dispatch_source_t
1820_dispatch_stream_source(dispatch_stream_t stream, dispatch_operation_t op)
1821{
1822	// On stream queue
1823	if (stream->source) {
1824		return stream->source;
1825	}
1826	dispatch_fd_t fd = op->fd_entry->fd;
1827	_dispatch_fd_debug("stream source create", fd);
1828	dispatch_source_t source = NULL;
1829	if (op->direction == DOP_DIR_READ) {
1830		source = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ,
1831				(uintptr_t)fd, 0, stream->dq);
1832	} else if (op->direction == DOP_DIR_WRITE) {
1833		source = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE,
1834				(uintptr_t)fd, 0, stream->dq);
1835	} else {
1836		dispatch_assert(op->direction < DOP_DIR_MAX);
1837		return NULL;
1838	}
1839	dispatch_set_context(source, stream);
1840	dispatch_source_set_event_handler_f(source,
1841			_dispatch_stream_source_handler);
1842	// Close queue must not run user cleanup handlers until sources are fully
1843	// unregistered
1844	dispatch_queue_t close_queue = op->fd_entry->close_queue;
1845	dispatch_source_set_cancel_handler(source, ^{
1846		_dispatch_fd_debug("stream source cancel", fd);
1847		dispatch_resume(close_queue);
1848	});
1849	stream->source = source;
1850	return stream->source;
1851}
1852
1853static void
1854_dispatch_stream_source_handler(void *ctx)
1855{
1856	// On stream queue
1857	dispatch_stream_t stream = (dispatch_stream_t)ctx;
1858	dispatch_suspend(stream->source);
1859	stream->source_running = false;
1860	return _dispatch_stream_handler(stream);
1861}
1862
1863static void
1864_dispatch_stream_queue_handler(void *ctx)
1865{
1866	// On stream queue
1867	dispatch_stream_t stream = (dispatch_stream_t)dispatch_get_context(ctx);
1868	if (!stream) {
1869		// _dispatch_stream_dispose has been called
1870		return;
1871	}
1872	return _dispatch_stream_handler(stream);
1873}
1874
1875static void
1876_dispatch_stream_handler(void *ctx)
1877{
1878	// On stream queue
1879	dispatch_stream_t stream = (dispatch_stream_t)ctx;
1880	dispatch_operation_t op;
1881pick:
1882	op = _dispatch_stream_pick_next_operation(stream, stream->op);
1883	if (!op) {
1884		_dispatch_debug("no operation found: stream %p", stream);
1885		return;
1886	}
1887	int err = _dispatch_io_get_error(op, NULL, true);
1888	if (err) {
1889		op->err = err;
1890		_dispatch_stream_complete_operation(stream, op);
1891		goto pick;
1892	}
1893	stream->op = op;
1894	_dispatch_fd_debug("stream handler", op->fd_entry->fd);
1895	dispatch_fd_entry_t fd_entry = op->fd_entry;
1896	_dispatch_fd_entry_retain(fd_entry);
1897	// For performance analysis
1898	if (!op->total && dispatch_io_defaults.initial_delivery) {
1899		// Empty delivery to signal the start of the operation
1900		_dispatch_fd_debug("initial delivery", op->fd_entry->fd);
1901		_dispatch_operation_deliver_data(op, DOP_DELIVER);
1902	}
1903	// TODO: perform on the operation target queue to get correct priority
1904	int result = _dispatch_operation_perform(op);
1905	dispatch_op_flags_t flags = ~0u;
1906	switch (result) {
1907	case DISPATCH_OP_DELIVER:
1908		flags = DOP_DEFAULT;
1909		// Fall through
1910	case DISPATCH_OP_DELIVER_AND_COMPLETE:
1911		flags = (flags != DOP_DEFAULT) ? DOP_DELIVER | DOP_NO_EMPTY :
1912				DOP_DEFAULT;
1913		_dispatch_operation_deliver_data(op, flags);
1914		// Fall through
1915	case DISPATCH_OP_COMPLETE:
1916		if (flags != DOP_DEFAULT) {
1917			_dispatch_stream_complete_operation(stream, op);
1918		}
1919		if (_dispatch_stream_operation_avail(stream)) {
1920			dispatch_async_f(stream->dq, stream->dq,
1921					_dispatch_stream_queue_handler);
1922		}
1923		break;
1924	case DISPATCH_OP_COMPLETE_RESUME:
1925		_dispatch_stream_complete_operation(stream, op);
1926		// Fall through
1927	case DISPATCH_OP_RESUME:
1928		if (_dispatch_stream_operation_avail(stream)) {
1929			stream->source_running = true;
1930			dispatch_resume(_dispatch_stream_source(stream, op));
1931		}
1932		break;
1933	case DISPATCH_OP_ERR:
1934		_dispatch_stream_cleanup_operations(stream, op->channel);
1935		break;
1936	case DISPATCH_OP_FD_ERR:
1937		_dispatch_fd_entry_retain(fd_entry);
1938		dispatch_async(fd_entry->barrier_queue, ^{
1939			_dispatch_fd_entry_cleanup_operations(fd_entry, NULL);
1940			_dispatch_fd_entry_release(fd_entry);
1941		});
1942		break;
1943	default:
1944		break;
1945	}
1946	_dispatch_fd_entry_release(fd_entry);
1947	return;
1948}
1949
1950static void
1951_dispatch_disk_handler(void *ctx)
1952{
1953	// On pick queue
1954	dispatch_disk_t disk = (dispatch_disk_t)ctx;
1955	if (disk->io_active) {
1956		return;
1957	}
1958	_dispatch_fd_debug("disk handler", -1);
1959	dispatch_operation_t op;
1960	size_t i = disk->free_idx, j = disk->req_idx;
1961	if (j <= i) {
1962		j += disk->advise_list_depth;
1963	}
1964	while (i <= j) {
1965		if ((!disk->advise_list[i%disk->advise_list_depth]) &&
1966				(op = _dispatch_disk_pick_next_operation(disk))) {
1967			int err = _dispatch_io_get_error(op, NULL, true);
1968			if (err) {
1969				op->err = err;
1970				_dispatch_disk_complete_operation(disk, op);
1971				continue;
1972			}
1973			_dispatch_retain(op);
1974			disk->advise_list[i%disk->advise_list_depth] = op;
1975			op->active = true;
1976			_dispatch_object_debug(op, "%s", __func__);
1977		} else {
1978			// No more operations to get
1979			break;
1980		}
1981		i++;
1982	}
1983	disk->free_idx = (i%disk->advise_list_depth);
1984	op = disk->advise_list[disk->req_idx];
1985	if (op) {
1986		disk->io_active = true;
1987		dispatch_async_f(op->do_targetq, disk, _dispatch_disk_perform);
1988	}
1989}
1990
1991static void
1992_dispatch_disk_perform(void *ctxt)
1993{
1994	dispatch_disk_t disk = ctxt;
1995	size_t chunk_size = dispatch_io_defaults.chunk_pages * PAGE_SIZE;
1996	_dispatch_fd_debug("disk perform", -1);
1997	dispatch_operation_t op;
1998	size_t i = disk->advise_idx, j = disk->free_idx;
1999	if (j <= i) {
2000		j += disk->advise_list_depth;
2001	}
2002	do {
2003		op = disk->advise_list[i%disk->advise_list_depth];
2004		if (!op) {
2005			// Nothing more to advise, must be at free_idx
2006			dispatch_assert(i%disk->advise_list_depth == disk->free_idx);
2007			break;
2008		}
2009		if (op->direction == DOP_DIR_WRITE) {
2010			// TODO: preallocate writes ? rdar://problem/9032172
2011			continue;
2012		}
2013		if (op->fd_entry->fd == -1 && _dispatch_fd_entry_open(op->fd_entry,
2014				op->channel)) {
2015			continue;
2016		}
2017		// For performance analysis
2018		if (!op->total && dispatch_io_defaults.initial_delivery) {
2019			// Empty delivery to signal the start of the operation
2020			_dispatch_fd_debug("initial delivery", op->fd_entry->fd);
2021			_dispatch_operation_deliver_data(op, DOP_DELIVER);
2022		}
2023		// Advise two chunks if the list only has one element and this is the
2024		// first advise on the operation
2025		if ((j-i) == 1 && !disk->advise_list[disk->free_idx] &&
2026				!op->advise_offset) {
2027			chunk_size *= 2;
2028		}
2029		_dispatch_operation_advise(op, chunk_size);
2030	} while (++i < j);
2031	disk->advise_idx = i%disk->advise_list_depth;
2032	op = disk->advise_list[disk->req_idx];
2033	int result = _dispatch_operation_perform(op);
2034	disk->advise_list[disk->req_idx] = NULL;
2035	disk->req_idx = (++disk->req_idx)%disk->advise_list_depth;
2036	dispatch_async(disk->pick_queue, ^{
2037		switch (result) {
2038		case DISPATCH_OP_DELIVER:
2039			_dispatch_operation_deliver_data(op, DOP_DEFAULT);
2040			break;
2041		case DISPATCH_OP_COMPLETE:
2042			_dispatch_disk_complete_operation(disk, op);
2043			break;
2044		case DISPATCH_OP_DELIVER_AND_COMPLETE:
2045			_dispatch_operation_deliver_data(op, DOP_DELIVER | DOP_NO_EMPTY);
2046			_dispatch_disk_complete_operation(disk, op);
2047			break;
2048		case DISPATCH_OP_ERR:
2049			_dispatch_disk_cleanup_operations(disk, op->channel);
2050			break;
2051		case DISPATCH_OP_FD_ERR:
2052			_dispatch_disk_cleanup_operations(disk, NULL);
2053			break;
2054		default:
2055			dispatch_assert(result);
2056			break;
2057		}
2058		op->active = false;
2059		disk->io_active = false;
2060		_dispatch_disk_handler(disk);
2061		// Balancing the retain in _dispatch_disk_handler. Note that op must be
2062		// released at the very end, since it might hold the last reference to
2063		// the disk
2064		_dispatch_release(op);
2065	});
2066}
2067
2068#pragma mark -
2069#pragma mark dispatch_operation_perform
2070
2071static void
2072_dispatch_operation_advise(dispatch_operation_t op, size_t chunk_size)
2073{
2074	int err;
2075	struct radvisory advise;
2076	// No point in issuing a read advise for the next chunk if we are already
2077	// a chunk ahead from reading the bytes
2078	if (op->advise_offset > (off_t)(((size_t)op->offset + op->total) +
2079			chunk_size + PAGE_SIZE)) {
2080		return;
2081	}
2082	_dispatch_object_debug(op, "%s", __func__);
2083	advise.ra_count = (int)chunk_size;
2084	if (!op->advise_offset) {
2085		op->advise_offset = op->offset;
2086		// If this is the first time through, align the advised range to a
2087		// page boundary
2088		size_t pg_fraction = ((size_t)op->offset + chunk_size) % PAGE_SIZE;
2089		advise.ra_count += (int)(pg_fraction ? PAGE_SIZE - pg_fraction : 0);
2090	}
2091	advise.ra_offset = op->advise_offset;
2092	op->advise_offset += advise.ra_count;
2093	_dispatch_io_syscall_switch(err,
2094		fcntl(op->fd_entry->fd, F_RDADVISE, &advise),
2095		case EFBIG: break; // advised past the end of the file rdar://10415691
2096		case ENOTSUP: break; // not all FS support radvise rdar://13484629
2097		// TODO: set disk status on error
2098		default: (void)dispatch_assume_zero(err); break;
2099	);
2100}
2101
2102static int
2103_dispatch_operation_perform(dispatch_operation_t op)
2104{
2105	int err = _dispatch_io_get_error(op, NULL, true);
2106	if (err) {
2107		goto error;
2108	}
2109	_dispatch_object_debug(op, "%s", __func__);
2110	if (!op->buf) {
2111		size_t max_buf_siz = op->params.high;
2112		size_t chunk_siz = dispatch_io_defaults.chunk_pages * PAGE_SIZE;
2113		if (op->direction == DOP_DIR_READ) {
2114			// If necessary, create a buffer for the ongoing operation, large
2115			// enough to fit chunk_pages but at most high-water
2116			size_t data_siz = dispatch_data_get_size(op->data);
2117			if (data_siz) {
2118				dispatch_assert(data_siz < max_buf_siz);
2119				max_buf_siz -= data_siz;
2120			}
2121			if (max_buf_siz > chunk_siz) {
2122				max_buf_siz = chunk_siz;
2123			}
2124			if (op->length < SIZE_MAX) {
2125				op->buf_siz = op->length - op->total;
2126				if (op->buf_siz > max_buf_siz) {
2127					op->buf_siz = max_buf_siz;
2128				}
2129			} else {
2130				op->buf_siz = max_buf_siz;
2131			}
2132			op->buf = valloc(op->buf_siz);
2133			_dispatch_fd_debug("buffer allocated", op->fd_entry->fd);
2134		} else if (op->direction == DOP_DIR_WRITE) {
2135			// Always write the first data piece, if that is smaller than a
2136			// chunk, accumulate further data pieces until chunk size is reached
2137			if (chunk_siz > max_buf_siz) {
2138				chunk_siz = max_buf_siz;
2139			}
2140			op->buf_siz = 0;
2141			dispatch_data_apply(op->data,
2142					^(dispatch_data_t region DISPATCH_UNUSED,
2143					size_t offset DISPATCH_UNUSED,
2144					const void* buf DISPATCH_UNUSED, size_t len) {
2145				size_t siz = op->buf_siz + len;
2146				if (!op->buf_siz || siz <= chunk_siz) {
2147					op->buf_siz = siz;
2148				}
2149				return (bool)(siz < chunk_siz);
2150			});
2151			if (op->buf_siz > max_buf_siz) {
2152				op->buf_siz = max_buf_siz;
2153			}
2154			dispatch_data_t d;
2155			d = dispatch_data_create_subrange(op->data, 0, op->buf_siz);
2156			op->buf_data = dispatch_data_create_map(d, (const void**)&op->buf,
2157					NULL);
2158			_dispatch_io_data_release(d);
2159			_dispatch_fd_debug("buffer mapped", op->fd_entry->fd);
2160		}
2161	}
2162	if (op->fd_entry->fd == -1) {
2163		err = _dispatch_fd_entry_open(op->fd_entry, op->channel);
2164		if (err) {
2165			goto error;
2166		}
2167	}
2168	void *buf = op->buf + op->buf_len;
2169	size_t len = op->buf_siz - op->buf_len;
2170	off_t off = (off_t)((size_t)op->offset + op->total);
2171	ssize_t processed = -1;
2172syscall:
2173	if (op->direction == DOP_DIR_READ) {
2174		if (op->params.type == DISPATCH_IO_STREAM) {
2175			processed = read(op->fd_entry->fd, buf, len);
2176		} else if (op->params.type == DISPATCH_IO_RANDOM) {
2177			processed = pread(op->fd_entry->fd, buf, len, off);
2178		}
2179	} else if (op->direction == DOP_DIR_WRITE) {
2180		if (op->params.type == DISPATCH_IO_STREAM) {
2181			processed = write(op->fd_entry->fd, buf, len);
2182		} else if (op->params.type == DISPATCH_IO_RANDOM) {
2183			processed = pwrite(op->fd_entry->fd, buf, len, off);
2184		}
2185	}
2186	// Encountered an error on the file descriptor
2187	if (processed == -1) {
2188		err = errno;
2189		if (err == EINTR) {
2190			goto syscall;
2191		}
2192		goto error;
2193	}
2194	// EOF is indicated by two handler invocations
2195	if (processed == 0) {
2196		_dispatch_fd_debug("EOF", op->fd_entry->fd);
2197		return DISPATCH_OP_DELIVER_AND_COMPLETE;
2198	}
2199	op->buf_len += (size_t)processed;
2200	op->total += (size_t)processed;
2201	if (op->total == op->length) {
2202		// Finished processing all the bytes requested by the operation
2203		return DISPATCH_OP_COMPLETE;
2204	} else {
2205		// Deliver data only if we satisfy the filters
2206		return DISPATCH_OP_DELIVER;
2207	}
2208error:
2209	if (err == EAGAIN) {
2210		// For disk based files with blocking I/O we should never get EAGAIN
2211		dispatch_assert(!op->fd_entry->disk);
2212		_dispatch_fd_debug("EAGAIN %d", op->fd_entry->fd, err);
2213		if (op->direction == DOP_DIR_READ && op->total &&
2214				op->channel == op->fd_entry->convenience_channel) {
2215			// Convenience read with available data completes on EAGAIN
2216			return DISPATCH_OP_COMPLETE_RESUME;
2217		}
2218		return DISPATCH_OP_RESUME;
2219	}
2220	op->err = err;
2221	switch (err) {
2222	case ECANCELED:
2223		return DISPATCH_OP_ERR;
2224	case EBADF:
2225		(void)dispatch_atomic_cmpxchg2o(op->fd_entry, err, 0, err, relaxed);
2226		return DISPATCH_OP_FD_ERR;
2227	default:
2228		return DISPATCH_OP_COMPLETE;
2229	}
2230}
2231
2232static void
2233_dispatch_operation_deliver_data(dispatch_operation_t op,
2234		dispatch_op_flags_t flags)
2235{
2236	// Either called from stream resp. pick queue or when op is finalized
2237	dispatch_data_t data = NULL;
2238	int err = 0;
2239	size_t undelivered = op->undelivered + op->buf_len;
2240	bool deliver = (flags & (DOP_DELIVER|DOP_DONE)) ||
2241			(op->flags & DOP_DELIVER);
2242	op->flags = DOP_DEFAULT;
2243	if (!deliver) {
2244		// Don't deliver data until low water mark has been reached
2245		if (undelivered >= op->params.low) {
2246			deliver = true;
2247		} else if (op->buf_len < op->buf_siz) {
2248			// Request buffer is not yet used up
2249			_dispatch_fd_debug("buffer data", op->fd_entry->fd);
2250			return;
2251		}
2252	} else {
2253		err = op->err;
2254		if (!err && (op->channel->atomic_flags & DIO_STOPPED)) {
2255			err = ECANCELED;
2256			op->err = err;
2257		}
2258	}
2259	// Deliver data or buffer used up
2260	if (op->direction == DOP_DIR_READ) {
2261		if (op->buf_len) {
2262			void *buf = op->buf;
2263			data = dispatch_data_create(buf, op->buf_len, NULL,
2264					DISPATCH_DATA_DESTRUCTOR_FREE);
2265			op->buf = NULL;
2266			op->buf_len = 0;
2267			dispatch_data_t d = dispatch_data_create_concat(op->data, data);
2268			_dispatch_io_data_release(op->data);
2269			_dispatch_io_data_release(data);
2270			data = d;
2271		} else {
2272			data = op->data;
2273		}
2274		op->data = deliver ? dispatch_data_empty : data;
2275	} else if (op->direction == DOP_DIR_WRITE) {
2276		if (deliver) {
2277			data = dispatch_data_create_subrange(op->data, op->buf_len,
2278					op->length);
2279		}
2280		if (op->buf_data && op->buf_len == op->buf_siz) {
2281			_dispatch_io_data_release(op->buf_data);
2282			op->buf_data = NULL;
2283			op->buf = NULL;
2284			op->buf_len = 0;
2285			// Trim newly written buffer from head of unwritten data
2286			dispatch_data_t d;
2287			if (deliver) {
2288				_dispatch_io_data_retain(data);
2289				d = data;
2290			} else {
2291				d = dispatch_data_create_subrange(op->data, op->buf_siz,
2292						op->length);
2293			}
2294			_dispatch_io_data_release(op->data);
2295			op->data = d;
2296		}
2297	} else {
2298		dispatch_assert(op->direction < DOP_DIR_MAX);
2299		return;
2300	}
2301	if (!deliver || ((flags & DOP_NO_EMPTY) && !dispatch_data_get_size(data))) {
2302		op->undelivered = undelivered;
2303		_dispatch_fd_debug("buffer data", op->fd_entry->fd);
2304		return;
2305	}
2306	op->undelivered = 0;
2307	_dispatch_object_debug(op, "%s", __func__);
2308	_dispatch_fd_debug("deliver data", op->fd_entry->fd);
2309	dispatch_op_direction_t direction = op->direction;
2310	dispatch_io_handler_t handler = op->handler;
2311#if DISPATCH_IO_DEBUG
2312	int fd = op->fd_entry->fd;
2313#endif
2314	dispatch_fd_entry_t fd_entry = op->fd_entry;
2315	_dispatch_fd_entry_retain(fd_entry);
2316	dispatch_io_t channel = op->channel;
2317	_dispatch_retain(channel);
2318	// Note that data delivery may occur after the operation is freed
2319	dispatch_async(op->op_q, ^{
2320		bool done = (flags & DOP_DONE);
2321		dispatch_data_t d = data;
2322		if (done) {
2323			if (direction == DOP_DIR_READ && err) {
2324				if (dispatch_data_get_size(d)) {
2325					_dispatch_fd_debug("IO handler invoke", fd);
2326					handler(false, d, 0);
2327				}
2328				d = NULL;
2329			} else if (direction == DOP_DIR_WRITE && !err) {
2330				d = NULL;
2331			}
2332		}
2333		_dispatch_fd_debug("IO handler invoke", fd);
2334		handler(done, d, err);
2335		_dispatch_release(channel);
2336		_dispatch_fd_entry_release(fd_entry);
2337		_dispatch_io_data_release(data);
2338	});
2339}
2340
2341#pragma mark -
2342#pragma mark dispatch_io_debug
2343
2344static size_t
2345_dispatch_io_debug_attr(dispatch_io_t channel, char* buf, size_t bufsiz)
2346{
2347	dispatch_queue_t target = channel->do_targetq;
2348	return dsnprintf(buf, bufsiz, "type = %s, fd = 0x%x, %sfd_entry = %p, "
2349			"queue = %p, target = %s[%p], barrier_queue = %p, barrier_group = "
2350			"%p, err = 0x%x, low = 0x%zx, high = 0x%zx, interval%s = %llu ",
2351			channel->params.type == DISPATCH_IO_STREAM ? "stream" : "random",
2352			channel->fd_actual, channel->atomic_flags & DIO_STOPPED ?
2353			"stopped, " : channel->atomic_flags & DIO_CLOSED ? "closed, " : "",
2354			channel->fd_entry, channel->queue, target && target->dq_label ?
2355			target->dq_label : "", target, channel->barrier_queue,
2356			channel->barrier_group, channel->err, channel->params.low,
2357			channel->params.high, channel->params.interval_flags &
2358			DISPATCH_IO_STRICT_INTERVAL ? "(strict)" : "",
2359			channel->params.interval);
2360}
2361
2362size_t
2363_dispatch_io_debug(dispatch_io_t channel, char* buf, size_t bufsiz)
2364{
2365	size_t offset = 0;
2366	offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
2367			dx_kind(channel), channel);
2368	offset += _dispatch_object_debug_attr(channel, &buf[offset],
2369			bufsiz - offset);
2370	offset += _dispatch_io_debug_attr(channel, &buf[offset], bufsiz - offset);
2371	offset += dsnprintf(&buf[offset], bufsiz - offset, "}");
2372	return offset;
2373}
2374
2375static size_t
2376_dispatch_operation_debug_attr(dispatch_operation_t op, char* buf,
2377		size_t bufsiz)
2378{
2379	dispatch_queue_t target = op->do_targetq;
2380	dispatch_queue_t oqtarget = op->op_q ? op->op_q->do_targetq : NULL;
2381	return dsnprintf(buf, bufsiz, "type = %s %s, fd = 0x%x, fd_entry = %p, "
2382			"channel = %p, queue = %p -> %s[%p], target = %s[%p], "
2383			"offset = %lld, length = %zu, done = %zu, undelivered = %zu, "
2384			"flags = %u, err = 0x%x, low = 0x%zx, high = 0x%zx, "
2385			"interval%s = %llu ", op->params.type == DISPATCH_IO_STREAM ?
2386			"stream" : "random", op->direction == DOP_DIR_READ ? "read" :
2387			"write", op->fd_entry ? op->fd_entry->fd : -1, op->fd_entry,
2388			op->channel, op->op_q, oqtarget && oqtarget->dq_label ?
2389			oqtarget->dq_label : "", oqtarget, target && target->dq_label ?
2390			target->dq_label : "", target, op->offset, op->length, op->total,
2391			op->undelivered + op->buf_len, op->flags, op->err, op->params.low,
2392			op->params.high, op->params.interval_flags &
2393			DISPATCH_IO_STRICT_INTERVAL ? "(strict)" : "", op->params.interval);
2394}
2395
2396size_t
2397_dispatch_operation_debug(dispatch_operation_t op, char* buf, size_t bufsiz)
2398{
2399	size_t offset = 0;
2400	offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
2401			dx_kind(op), op);
2402	offset += _dispatch_object_debug_attr(op, &buf[offset], bufsiz - offset);
2403	offset += _dispatch_operation_debug_attr(op, &buf[offset], bufsiz - offset);
2404	offset += dsnprintf(&buf[offset], bufsiz - offset, "}");
2405	return offset;
2406}
2407