1/*
2 * Copyright (c) 2009-2013 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21#include "internal.h"
22
23#ifndef DISPATCH_IO_DEBUG
24#define DISPATCH_IO_DEBUG DISPATCH_DEBUG
25#endif
26
27#if DISPATCH_IO_DEBUG
28#define _dispatch_fd_debug(msg, fd, args...) \
29	_dispatch_debug("fd[0x%x]: " msg, (fd), ##args)
30#else
31#define _dispatch_fd_debug(msg, fd, args...)
32#endif
33
34#if USE_OBJC
35#define _dispatch_io_data_retain(x) _dispatch_objc_retain(x)
36#define _dispatch_io_data_release(x) _dispatch_objc_release(x)
37#else
38#define _dispatch_io_data_retain(x) dispatch_retain(x)
39#define _dispatch_io_data_release(x) dispatch_release(x)
40#endif
41
42typedef void (^dispatch_fd_entry_init_callback_t)(dispatch_fd_entry_t fd_entry);
43
44DISPATCH_EXPORT DISPATCH_NOTHROW
45void _dispatch_iocntl(uint32_t param, uint64_t value);
46
47static dispatch_operation_t _dispatch_operation_create(
48		dispatch_op_direction_t direction, dispatch_io_t channel, off_t offset,
49		size_t length, dispatch_data_t data, dispatch_queue_t queue,
50		dispatch_io_handler_t handler);
51static void _dispatch_operation_enqueue(dispatch_operation_t op,
52		dispatch_op_direction_t direction, dispatch_data_t data);
53static dispatch_source_t _dispatch_operation_timer(dispatch_queue_t tq,
54		dispatch_operation_t op);
55static inline void _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry);
56static inline void _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry);
57static void _dispatch_fd_entry_init_async(dispatch_fd_t fd,
58		dispatch_fd_entry_init_callback_t completion_callback);
59static dispatch_fd_entry_t _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd,
60		uintptr_t hash);
61static dispatch_fd_entry_t _dispatch_fd_entry_create_with_path(
62		dispatch_io_path_data_t path_data, dev_t dev, mode_t mode);
63static int _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry,
64		dispatch_io_t channel);
65static void _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry,
66		dispatch_io_t channel);
67static void _dispatch_stream_init(dispatch_fd_entry_t fd_entry,
68		dispatch_queue_t tq);
69static void _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry,
70		dispatch_op_direction_t direction);
71static void _dispatch_disk_init(dispatch_fd_entry_t fd_entry, dev_t dev);
72static void _dispatch_stream_enqueue_operation(dispatch_stream_t stream,
73		dispatch_operation_t operation, dispatch_data_t data);
74static void _dispatch_disk_enqueue_operation(dispatch_disk_t dsk,
75		dispatch_operation_t operation, dispatch_data_t data);
76static void _dispatch_stream_cleanup_operations(dispatch_stream_t stream,
77		dispatch_io_t channel);
78static void _dispatch_disk_cleanup_operations(dispatch_disk_t disk,
79		dispatch_io_t channel);
80static void _dispatch_stream_source_handler(void *ctx);
81static void _dispatch_stream_queue_handler(void *ctx);
82static void _dispatch_stream_handler(void *ctx);
83static void _dispatch_disk_handler(void *ctx);
84static void _dispatch_disk_perform(void *ctxt);
85static void _dispatch_operation_advise(dispatch_operation_t op,
86		size_t chunk_size);
87static int _dispatch_operation_perform(dispatch_operation_t op);
88static void _dispatch_operation_deliver_data(dispatch_operation_t op,
89		dispatch_op_flags_t flags);
90
91// Macros to wrap syscalls which return -1 on error, and retry on EINTR
92#define _dispatch_io_syscall_switch_noerr(_err, _syscall, ...) do { \
93		switch (((_err) = (((_syscall) == -1) ? errno : 0))) { \
94		case EINTR: continue; \
95		__VA_ARGS__ \
96		} \
97		break; \
98	} while (1)
99#define _dispatch_io_syscall_switch(__err, __syscall, ...) do { \
100		_dispatch_io_syscall_switch_noerr(__err, __syscall, \
101		case 0: break; \
102		__VA_ARGS__ \
103		); \
104	} while (0)
105#define _dispatch_io_syscall(__syscall) do { int __err; \
106		_dispatch_io_syscall_switch(__err, __syscall); \
107	} while (0)
108
109enum {
110	DISPATCH_OP_COMPLETE = 1,
111	DISPATCH_OP_DELIVER,
112	DISPATCH_OP_DELIVER_AND_COMPLETE,
113	DISPATCH_OP_COMPLETE_RESUME,
114	DISPATCH_OP_RESUME,
115	DISPATCH_OP_ERR,
116	DISPATCH_OP_FD_ERR,
117};
118
119#define _dispatch_io_Block_copy(x) \
120		((typeof(x))_dispatch_Block_copy((dispatch_block_t)(x)))
121
122#pragma mark -
123#pragma mark dispatch_io_hashtables
124
125#if TARGET_OS_EMBEDDED
126#define DIO_HASH_SIZE  64u // must be a power of two
127#else
128#define DIO_HASH_SIZE 256u // must be a power of two
129#endif
130#define DIO_HASH(x) ((uintptr_t)(x) & (DIO_HASH_SIZE - 1))
131
132// Global hashtable of dev_t -> disk_s mappings
133DISPATCH_CACHELINE_ALIGN
134static TAILQ_HEAD(, dispatch_disk_s) _dispatch_io_devs[DIO_HASH_SIZE];
135// Global hashtable of fd -> fd_entry_s mappings
136DISPATCH_CACHELINE_ALIGN
137static TAILQ_HEAD(, dispatch_fd_entry_s) _dispatch_io_fds[DIO_HASH_SIZE];
138
139static dispatch_once_t  _dispatch_io_devs_lockq_pred;
140static dispatch_queue_t _dispatch_io_devs_lockq;
141static dispatch_queue_t _dispatch_io_fds_lockq;
142
143static void
144_dispatch_io_fds_lockq_init(void *context DISPATCH_UNUSED)
145{
146	_dispatch_io_fds_lockq = dispatch_queue_create(
147			"com.apple.libdispatch-io.fd_lockq", NULL);
148	unsigned int i;
149	for (i = 0; i < DIO_HASH_SIZE; i++) {
150		TAILQ_INIT(&_dispatch_io_fds[i]);
151	}
152}
153
154static void
155_dispatch_io_devs_lockq_init(void *context DISPATCH_UNUSED)
156{
157	_dispatch_io_devs_lockq = dispatch_queue_create(
158			"com.apple.libdispatch-io.dev_lockq", NULL);
159	unsigned int i;
160	for (i = 0; i < DIO_HASH_SIZE; i++) {
161		TAILQ_INIT(&_dispatch_io_devs[i]);
162	}
163}
164
165#pragma mark -
166#pragma mark dispatch_io_defaults
167
168enum {
169	DISPATCH_IOCNTL_CHUNK_PAGES = 1,
170	DISPATCH_IOCNTL_LOW_WATER_CHUNKS,
171	DISPATCH_IOCNTL_INITIAL_DELIVERY,
172	DISPATCH_IOCNTL_MAX_PENDING_IO_REQS,
173};
174
175static struct dispatch_io_defaults_s {
176	size_t chunk_pages, low_water_chunks, max_pending_io_reqs;
177	bool initial_delivery;
178} dispatch_io_defaults = {
179	.chunk_pages = DIO_MAX_CHUNK_PAGES,
180	.low_water_chunks = DIO_DEFAULT_LOW_WATER_CHUNKS,
181	.max_pending_io_reqs = DIO_MAX_PENDING_IO_REQS,
182};
183
184#define _dispatch_iocntl_set_default(p, v) do { \
185		dispatch_io_defaults.p = (typeof(dispatch_io_defaults.p))(v); \
186	} while (0)
187
188void
189_dispatch_iocntl(uint32_t param, uint64_t value)
190{
191	switch (param) {
192	case DISPATCH_IOCNTL_CHUNK_PAGES:
193		_dispatch_iocntl_set_default(chunk_pages, value);
194		break;
195	case DISPATCH_IOCNTL_LOW_WATER_CHUNKS:
196		_dispatch_iocntl_set_default(low_water_chunks, value);
197		break;
198	case DISPATCH_IOCNTL_INITIAL_DELIVERY:
199		_dispatch_iocntl_set_default(initial_delivery, value);
200	case DISPATCH_IOCNTL_MAX_PENDING_IO_REQS:
201		_dispatch_iocntl_set_default(max_pending_io_reqs, value);
202		break;
203	}
204}
205
206#pragma mark -
207#pragma mark dispatch_io_t
208
209static dispatch_io_t
210_dispatch_io_create(dispatch_io_type_t type)
211{
212	dispatch_io_t channel = _dispatch_alloc(DISPATCH_VTABLE(io),
213			sizeof(struct dispatch_io_s));
214	channel->do_next = DISPATCH_OBJECT_LISTLESS;
215	channel->do_targetq = _dispatch_get_root_queue(0, true);
216	channel->params.type = type;
217	channel->params.high = SIZE_MAX;
218	channel->params.low = dispatch_io_defaults.low_water_chunks *
219			dispatch_io_defaults.chunk_pages * PAGE_SIZE;
220	channel->queue = dispatch_queue_create("com.apple.libdispatch-io.channelq",
221			NULL);
222	return channel;
223}
224
225static void
226_dispatch_io_init(dispatch_io_t channel, dispatch_fd_entry_t fd_entry,
227		dispatch_queue_t queue, int err, void (^cleanup_handler)(int))
228{
229	// Enqueue the cleanup handler on the suspended close queue
230	if (cleanup_handler) {
231		_dispatch_retain(queue);
232		dispatch_async(!err ? fd_entry->close_queue : channel->queue, ^{
233			dispatch_async(queue, ^{
234				_dispatch_fd_debug("cleanup handler invoke", -1);
235				cleanup_handler(err);
236			});
237			_dispatch_release(queue);
238		});
239	}
240	if (fd_entry) {
241		channel->fd_entry = fd_entry;
242		dispatch_retain(fd_entry->barrier_queue);
243		dispatch_retain(fd_entry->barrier_group);
244		channel->barrier_queue = fd_entry->barrier_queue;
245		channel->barrier_group = fd_entry->barrier_group;
246	} else {
247		// Still need to create a barrier queue, since all operations go
248		// through it
249		channel->barrier_queue = dispatch_queue_create(
250				"com.apple.libdispatch-io.barrierq", NULL);
251		channel->barrier_group = dispatch_group_create();
252	}
253}
254
255void
256_dispatch_io_dispose(dispatch_io_t channel)
257{
258	_dispatch_object_debug(channel, "%s", __func__);
259	if (channel->fd_entry &&
260			!(channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED))) {
261		if (channel->fd_entry->path_data) {
262			// This modification is safe since path_data->channel is checked
263			// only on close_queue (which is still suspended at this point)
264			channel->fd_entry->path_data->channel = NULL;
265		}
266		// Cleanup handlers will only run when all channels related to this
267		// fd are complete
268		_dispatch_fd_entry_release(channel->fd_entry);
269	}
270	if (channel->queue) {
271		dispatch_release(channel->queue);
272	}
273	if (channel->barrier_queue) {
274		dispatch_release(channel->barrier_queue);
275	}
276	if (channel->barrier_group) {
277		dispatch_release(channel->barrier_group);
278	}
279}
280
281static int
282_dispatch_io_validate_type(dispatch_io_t channel, mode_t mode)
283{
284	int err = 0;
285	if (S_ISDIR(mode)) {
286		err = EISDIR;
287	} else if (channel->params.type == DISPATCH_IO_RANDOM &&
288			(S_ISFIFO(mode) || S_ISSOCK(mode))) {
289		err = ESPIPE;
290	}
291	return err;
292}
293
294static int
295_dispatch_io_get_error(dispatch_operation_t op, dispatch_io_t channel,
296		bool ignore_closed)
297{
298	// On _any_ queue
299	int err;
300	if (op) {
301		channel = op->channel;
302	}
303	if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
304		if (!ignore_closed || channel->atomic_flags & DIO_STOPPED) {
305			err = ECANCELED;
306		} else {
307			err = 0;
308		}
309	} else {
310		err = op ? op->fd_entry->err : channel->err;
311	}
312	return err;
313}
314
315#pragma mark -
316#pragma mark dispatch_io_channels
317
318dispatch_io_t
319dispatch_io_create(dispatch_io_type_t type, dispatch_fd_t fd,
320		dispatch_queue_t queue, void (^cleanup_handler)(int))
321{
322	if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) {
323		return NULL;
324	}
325	_dispatch_fd_debug("io create", fd);
326	dispatch_io_t channel = _dispatch_io_create(type);
327	channel->fd = fd;
328	channel->fd_actual = fd;
329	dispatch_suspend(channel->queue);
330	_dispatch_retain(queue);
331	_dispatch_retain(channel);
332	_dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
333		// On barrier queue
334		int err = fd_entry->err;
335		if (!err) {
336			err = _dispatch_io_validate_type(channel, fd_entry->stat.mode);
337		}
338		if (!err && type == DISPATCH_IO_RANDOM) {
339			off_t f_ptr;
340			_dispatch_io_syscall_switch_noerr(err,
341				f_ptr = lseek(fd_entry->fd, 0, SEEK_CUR),
342				case 0: channel->f_ptr = f_ptr; break;
343				default: (void)dispatch_assume_zero(err); break;
344			);
345		}
346		channel->err = err;
347		_dispatch_fd_entry_retain(fd_entry);
348		_dispatch_io_init(channel, fd_entry, queue, err, cleanup_handler);
349		dispatch_resume(channel->queue);
350		_dispatch_object_debug(channel, "%s", __func__);
351		_dispatch_release(channel);
352		_dispatch_release(queue);
353	});
354	_dispatch_object_debug(channel, "%s", __func__);
355	return channel;
356}
357
358dispatch_io_t
359dispatch_io_create_f(dispatch_io_type_t type, dispatch_fd_t fd,
360		dispatch_queue_t queue, void *context,
361		void (*cleanup_handler)(void *context, int error))
362{
363	return dispatch_io_create(type, fd, queue, !cleanup_handler ? NULL :
364			^(int error){ cleanup_handler(context, error); });
365}
366
367dispatch_io_t
368dispatch_io_create_with_path(dispatch_io_type_t type, const char *path,
369		int oflag, mode_t mode, dispatch_queue_t queue,
370		void (^cleanup_handler)(int error))
371{
372	if ((type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) ||
373			!(path && *path == '/')) {
374		return NULL;
375	}
376	size_t pathlen = strlen(path);
377	dispatch_io_path_data_t path_data = malloc(sizeof(*path_data) + pathlen+1);
378	if (!path_data) {
379		return NULL;
380	}
381	_dispatch_fd_debug("io create with path %s", -1, path);
382	dispatch_io_t channel = _dispatch_io_create(type);
383	channel->fd = -1;
384	channel->fd_actual = -1;
385	path_data->channel = channel;
386	path_data->oflag = oflag;
387	path_data->mode = mode;
388	path_data->pathlen = pathlen;
389	memcpy(path_data->path, path, pathlen + 1);
390	_dispatch_retain(queue);
391	_dispatch_retain(channel);
392	dispatch_async(channel->queue, ^{
393		int err = 0;
394		struct stat st;
395		_dispatch_io_syscall_switch_noerr(err,
396			(path_data->oflag & O_NOFOLLOW) == O_NOFOLLOW ||
397					(path_data->oflag & O_SYMLINK) == O_SYMLINK ?
398					lstat(path_data->path, &st) : stat(path_data->path, &st),
399			case 0:
400				err = _dispatch_io_validate_type(channel, st.st_mode);
401				break;
402			default:
403				if ((path_data->oflag & O_CREAT) &&
404						(*(path_data->path + path_data->pathlen - 1) != '/')) {
405					// Check parent directory
406					char *c = strrchr(path_data->path, '/');
407					dispatch_assert(c);
408					*c = 0;
409					int perr;
410					_dispatch_io_syscall_switch_noerr(perr,
411						stat(path_data->path, &st),
412						case 0:
413							// Since the parent directory exists, open() will
414							// create a regular file after the fd_entry has
415							// been filled in
416							st.st_mode = S_IFREG;
417							err = 0;
418							break;
419					);
420					*c = '/';
421				}
422				break;
423		);
424		channel->err = err;
425		if (err) {
426			free(path_data);
427			_dispatch_io_init(channel, NULL, queue, err, cleanup_handler);
428			_dispatch_release(channel);
429			_dispatch_release(queue);
430			return;
431		}
432		dispatch_suspend(channel->queue);
433		dispatch_once_f(&_dispatch_io_devs_lockq_pred, NULL,
434				_dispatch_io_devs_lockq_init);
435		dispatch_async(_dispatch_io_devs_lockq, ^{
436			dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create_with_path(
437					path_data, st.st_dev, st.st_mode);
438			_dispatch_io_init(channel, fd_entry, queue, 0, cleanup_handler);
439			dispatch_resume(channel->queue);
440			_dispatch_object_debug(channel, "%s", __func__);
441			_dispatch_release(channel);
442			_dispatch_release(queue);
443		});
444	});
445	_dispatch_object_debug(channel, "%s", __func__);
446	return channel;
447}
448
449dispatch_io_t
450dispatch_io_create_with_path_f(dispatch_io_type_t type, const char *path,
451		int oflag, mode_t mode, dispatch_queue_t queue, void *context,
452		void (*cleanup_handler)(void *context, int error))
453{
454	return dispatch_io_create_with_path(type, path, oflag, mode, queue,
455			!cleanup_handler ? NULL :
456			^(int error){ cleanup_handler(context, error); });
457}
458
459dispatch_io_t
460dispatch_io_create_with_io(dispatch_io_type_t type, dispatch_io_t in_channel,
461		dispatch_queue_t queue, void (^cleanup_handler)(int error))
462{
463	if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) {
464		return NULL;
465	}
466	_dispatch_fd_debug("io create with io %p", -1, in_channel);
467	dispatch_io_t channel = _dispatch_io_create(type);
468	dispatch_suspend(channel->queue);
469	_dispatch_retain(queue);
470	_dispatch_retain(channel);
471	_dispatch_retain(in_channel);
472	dispatch_async(in_channel->queue, ^{
473		int err0 = _dispatch_io_get_error(NULL, in_channel, false);
474		if (err0) {
475			channel->err = err0;
476			_dispatch_io_init(channel, NULL, queue, err0, cleanup_handler);
477			dispatch_resume(channel->queue);
478			_dispatch_release(channel);
479			_dispatch_release(in_channel);
480			_dispatch_release(queue);
481			return;
482		}
483		dispatch_async(in_channel->barrier_queue, ^{
484			int err = _dispatch_io_get_error(NULL, in_channel, false);
485			// If there is no error, the fd_entry for the in_channel is valid.
486			// Since we are running on in_channel's queue, the fd_entry has been
487			// fully resolved and will stay valid for the duration of this block
488			if (!err) {
489				err = in_channel->err;
490				if (!err) {
491					err = in_channel->fd_entry->err;
492				}
493			}
494			if (!err) {
495				err = _dispatch_io_validate_type(channel,
496						in_channel->fd_entry->stat.mode);
497			}
498			if (!err && type == DISPATCH_IO_RANDOM && in_channel->fd != -1) {
499				off_t f_ptr;
500				_dispatch_io_syscall_switch_noerr(err,
501					f_ptr = lseek(in_channel->fd_entry->fd, 0, SEEK_CUR),
502					case 0: channel->f_ptr = f_ptr; break;
503					default: (void)dispatch_assume_zero(err); break;
504				);
505			}
506			channel->err = err;
507			if (err) {
508				_dispatch_io_init(channel, NULL, queue, err, cleanup_handler);
509				dispatch_resume(channel->queue);
510				_dispatch_release(channel);
511				_dispatch_release(in_channel);
512				_dispatch_release(queue);
513				return;
514			}
515			if (in_channel->fd == -1) {
516				// in_channel was created from path
517				channel->fd = -1;
518				channel->fd_actual = -1;
519				mode_t mode = in_channel->fd_entry->stat.mode;
520				dev_t dev = in_channel->fd_entry->stat.dev;
521				size_t path_data_len = sizeof(struct dispatch_io_path_data_s) +
522						in_channel->fd_entry->path_data->pathlen + 1;
523				dispatch_io_path_data_t path_data = malloc(path_data_len);
524				memcpy(path_data, in_channel->fd_entry->path_data,
525						path_data_len);
526				path_data->channel = channel;
527				// lockq_io_devs is known to already exist
528				dispatch_async(_dispatch_io_devs_lockq, ^{
529					dispatch_fd_entry_t fd_entry;
530					fd_entry = _dispatch_fd_entry_create_with_path(path_data,
531							dev, mode);
532					_dispatch_io_init(channel, fd_entry, queue, 0,
533							cleanup_handler);
534					dispatch_resume(channel->queue);
535					_dispatch_release(channel);
536					_dispatch_release(queue);
537				});
538			} else {
539				dispatch_fd_entry_t fd_entry = in_channel->fd_entry;
540				channel->fd = in_channel->fd;
541				channel->fd_actual = in_channel->fd_actual;
542				_dispatch_fd_entry_retain(fd_entry);
543				_dispatch_io_init(channel, fd_entry, queue, 0, cleanup_handler);
544				dispatch_resume(channel->queue);
545				_dispatch_release(channel);
546				_dispatch_release(queue);
547			}
548			_dispatch_release(in_channel);
549			_dispatch_object_debug(channel, "%s", __func__);
550		});
551	});
552	_dispatch_object_debug(channel, "%s", __func__);
553	return channel;
554}
555
556dispatch_io_t
557dispatch_io_create_with_io_f(dispatch_io_type_t type, dispatch_io_t in_channel,
558		dispatch_queue_t queue, void *context,
559		void (*cleanup_handler)(void *context, int error))
560{
561	return dispatch_io_create_with_io(type, in_channel, queue,
562			!cleanup_handler ? NULL :
563			^(int error){ cleanup_handler(context, error); });
564}
565
566#pragma mark -
567#pragma mark dispatch_io_accessors
568
569void
570dispatch_io_set_high_water(dispatch_io_t channel, size_t high_water)
571{
572	_dispatch_retain(channel);
573	dispatch_async(channel->queue, ^{
574		_dispatch_fd_debug("io set high water", channel->fd);
575		if (channel->params.low > high_water) {
576			channel->params.low = high_water;
577		}
578		channel->params.high = high_water ? high_water : 1;
579		_dispatch_release(channel);
580	});
581}
582
583void
584dispatch_io_set_low_water(dispatch_io_t channel, size_t low_water)
585{
586	_dispatch_retain(channel);
587	dispatch_async(channel->queue, ^{
588		_dispatch_fd_debug("io set low water", channel->fd);
589		if (channel->params.high < low_water) {
590			channel->params.high = low_water ? low_water : 1;
591		}
592		channel->params.low = low_water;
593		_dispatch_release(channel);
594	});
595}
596
597void
598dispatch_io_set_interval(dispatch_io_t channel, uint64_t interval,
599		unsigned long flags)
600{
601	_dispatch_retain(channel);
602	dispatch_async(channel->queue, ^{
603		_dispatch_fd_debug("io set interval", channel->fd);
604		channel->params.interval = interval < INT64_MAX ? interval : INT64_MAX;
605		channel->params.interval_flags = flags;
606		_dispatch_release(channel);
607	});
608}
609
610void
611_dispatch_io_set_target_queue(dispatch_io_t channel, dispatch_queue_t dq)
612{
613	_dispatch_retain(dq);
614	_dispatch_retain(channel);
615	dispatch_async(channel->queue, ^{
616		dispatch_queue_t prev_dq = channel->do_targetq;
617		channel->do_targetq = dq;
618		_dispatch_release(prev_dq);
619		_dispatch_object_debug(channel, "%s", __func__);
620		_dispatch_release(channel);
621	});
622}
623
624dispatch_fd_t
625dispatch_io_get_descriptor(dispatch_io_t channel)
626{
627	if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
628		return -1;
629	}
630	dispatch_fd_t fd = channel->fd_actual;
631	if (fd == -1 && _dispatch_thread_getspecific(dispatch_io_key) == channel &&
632			!_dispatch_io_get_error(NULL, channel, false)) {
633		dispatch_fd_entry_t fd_entry = channel->fd_entry;
634		(void)_dispatch_fd_entry_open(fd_entry, channel);
635	}
636	return channel->fd_actual;
637}
638
639#pragma mark -
640#pragma mark dispatch_io_operations
641
642static void
643_dispatch_io_stop(dispatch_io_t channel)
644{
645	_dispatch_fd_debug("io stop", channel->fd);
646	(void)dispatch_atomic_or2o(channel, atomic_flags, DIO_STOPPED, relaxed);
647	_dispatch_retain(channel);
648	dispatch_async(channel->queue, ^{
649		dispatch_async(channel->barrier_queue, ^{
650			_dispatch_object_debug(channel, "%s", __func__);
651			dispatch_fd_entry_t fd_entry = channel->fd_entry;
652			if (fd_entry) {
653				_dispatch_fd_debug("io stop cleanup", channel->fd);
654				_dispatch_fd_entry_cleanup_operations(fd_entry, channel);
655				if (!(channel->atomic_flags & DIO_CLOSED)) {
656					channel->fd_entry = NULL;
657					_dispatch_fd_entry_release(fd_entry);
658				}
659			} else if (channel->fd != -1) {
660				// Stop after close, need to check if fd_entry still exists
661				_dispatch_retain(channel);
662				dispatch_async(_dispatch_io_fds_lockq, ^{
663					_dispatch_object_debug(channel, "%s", __func__);
664					_dispatch_fd_debug("io stop after close cleanup",
665							channel->fd);
666					dispatch_fd_entry_t fdi;
667					uintptr_t hash = DIO_HASH(channel->fd);
668					TAILQ_FOREACH(fdi, &_dispatch_io_fds[hash], fd_list) {
669						if (fdi->fd == channel->fd) {
670							_dispatch_fd_entry_cleanup_operations(fdi, channel);
671							break;
672						}
673					}
674					_dispatch_release(channel);
675				});
676			}
677			_dispatch_release(channel);
678		});
679	});
680}
681
682void
683dispatch_io_close(dispatch_io_t channel, unsigned long flags)
684{
685	if (flags & DISPATCH_IO_STOP) {
686		// Don't stop an already stopped channel
687		if (channel->atomic_flags & DIO_STOPPED) {
688			return;
689		}
690		return _dispatch_io_stop(channel);
691	}
692	// Don't close an already closed or stopped channel
693	if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
694		return;
695	}
696	_dispatch_retain(channel);
697	dispatch_async(channel->queue, ^{
698		dispatch_async(channel->barrier_queue, ^{
699			_dispatch_object_debug(channel, "%s", __func__);
700			_dispatch_fd_debug("io close", channel->fd);
701			if (!(channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED))) {
702				(void)dispatch_atomic_or2o(channel, atomic_flags, DIO_CLOSED,
703						relaxed);
704				dispatch_fd_entry_t fd_entry = channel->fd_entry;
705				if (fd_entry) {
706					if (!fd_entry->path_data) {
707						channel->fd_entry = NULL;
708					}
709					_dispatch_fd_entry_release(fd_entry);
710				}
711			}
712			_dispatch_release(channel);
713		});
714	});
715}
716
717void
718dispatch_io_barrier(dispatch_io_t channel, dispatch_block_t barrier)
719{
720	_dispatch_retain(channel);
721	dispatch_async(channel->queue, ^{
722		dispatch_queue_t io_q = channel->do_targetq;
723		dispatch_queue_t barrier_queue = channel->barrier_queue;
724		dispatch_group_t barrier_group = channel->barrier_group;
725		dispatch_async(barrier_queue, ^{
726			dispatch_suspend(barrier_queue);
727			dispatch_group_notify(barrier_group, io_q, ^{
728				_dispatch_object_debug(channel, "%s", __func__);
729				_dispatch_thread_setspecific(dispatch_io_key, channel);
730				barrier();
731				_dispatch_thread_setspecific(dispatch_io_key, NULL);
732				dispatch_resume(barrier_queue);
733				_dispatch_release(channel);
734			});
735		});
736	});
737}
738
739void
740dispatch_io_barrier_f(dispatch_io_t channel, void *context,
741		dispatch_function_t barrier)
742{
743	return dispatch_io_barrier(channel, ^{ barrier(context); });
744}
745
746void
747dispatch_io_read(dispatch_io_t channel, off_t offset, size_t length,
748		dispatch_queue_t queue, dispatch_io_handler_t handler)
749{
750	_dispatch_retain(channel);
751	_dispatch_retain(queue);
752	dispatch_async(channel->queue, ^{
753		dispatch_operation_t op;
754		op = _dispatch_operation_create(DOP_DIR_READ, channel, offset,
755				length, dispatch_data_empty, queue, handler);
756		if (op) {
757			dispatch_queue_t barrier_q = channel->barrier_queue;
758			dispatch_async(barrier_q, ^{
759				_dispatch_operation_enqueue(op, DOP_DIR_READ,
760						dispatch_data_empty);
761			});
762		}
763		_dispatch_release(channel);
764		_dispatch_release(queue);
765	});
766}
767
768void
769dispatch_io_read_f(dispatch_io_t channel, off_t offset, size_t length,
770		dispatch_queue_t queue, void *context,
771		dispatch_io_handler_function_t handler)
772{
773	return dispatch_io_read(channel, offset, length, queue,
774			^(bool done, dispatch_data_t d, int error){
775		handler(context, done, d, error);
776	});
777}
778
779void
780dispatch_io_write(dispatch_io_t channel, off_t offset, dispatch_data_t data,
781		dispatch_queue_t queue, dispatch_io_handler_t handler)
782{
783	_dispatch_io_data_retain(data);
784	_dispatch_retain(channel);
785	_dispatch_retain(queue);
786	dispatch_async(channel->queue, ^{
787		dispatch_operation_t op;
788		op = _dispatch_operation_create(DOP_DIR_WRITE, channel, offset,
789				dispatch_data_get_size(data), data, queue, handler);
790		if (op) {
791			dispatch_queue_t barrier_q = channel->barrier_queue;
792			dispatch_async(barrier_q, ^{
793				_dispatch_operation_enqueue(op, DOP_DIR_WRITE, data);
794				_dispatch_io_data_release(data);
795			});
796		} else {
797			_dispatch_io_data_release(data);
798		}
799		_dispatch_release(channel);
800		_dispatch_release(queue);
801	});
802}
803
804void
805dispatch_io_write_f(dispatch_io_t channel, off_t offset, dispatch_data_t data,
806		dispatch_queue_t queue, void *context,
807		dispatch_io_handler_function_t handler)
808{
809	return dispatch_io_write(channel, offset, data, queue,
810			^(bool done, dispatch_data_t d, int error){
811		handler(context, done, d, error);
812	});
813}
814
815void
816dispatch_read(dispatch_fd_t fd, size_t length, dispatch_queue_t queue,
817		void (^handler)(dispatch_data_t, int))
818{
819	_dispatch_retain(queue);
820	_dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
821		// On barrier queue
822		if (fd_entry->err) {
823			int err = fd_entry->err;
824			dispatch_async(queue, ^{
825				_dispatch_fd_debug("convenience handler invoke", fd);
826				handler(dispatch_data_empty, err);
827			});
828			_dispatch_release(queue);
829			return;
830		}
831		// Safe to access fd_entry on barrier queue
832		dispatch_io_t channel = fd_entry->convenience_channel;
833		if (!channel) {
834			channel = _dispatch_io_create(DISPATCH_IO_STREAM);
835			channel->fd = fd;
836			channel->fd_actual = fd;
837			channel->fd_entry = fd_entry;
838			dispatch_retain(fd_entry->barrier_queue);
839			dispatch_retain(fd_entry->barrier_group);
840			channel->barrier_queue = fd_entry->barrier_queue;
841			channel->barrier_group = fd_entry->barrier_group;
842			fd_entry->convenience_channel = channel;
843		}
844		__block dispatch_data_t deliver_data = dispatch_data_empty;
845		__block int err = 0;
846		dispatch_async(fd_entry->close_queue, ^{
847			dispatch_async(queue, ^{
848				_dispatch_fd_debug("convenience handler invoke", fd);
849				handler(deliver_data, err);
850				_dispatch_io_data_release(deliver_data);
851			});
852			_dispatch_release(queue);
853		});
854		dispatch_operation_t op =
855			_dispatch_operation_create(DOP_DIR_READ, channel, 0,
856					length, dispatch_data_empty,
857					_dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT,
858					false), ^(bool done, dispatch_data_t data, int error) {
859				if (data) {
860					data = dispatch_data_create_concat(deliver_data, data);
861					_dispatch_io_data_release(deliver_data);
862					deliver_data = data;
863				}
864				if (done) {
865					err = error;
866				}
867			});
868		if (op) {
869			_dispatch_operation_enqueue(op, DOP_DIR_READ, dispatch_data_empty);
870		}
871	});
872}
873
874void
875dispatch_read_f(dispatch_fd_t fd, size_t length, dispatch_queue_t queue,
876		void *context, void (*handler)(void *, dispatch_data_t, int))
877{
878	return dispatch_read(fd, length, queue, ^(dispatch_data_t d, int error){
879		handler(context, d, error);
880	});
881}
882
883void
884dispatch_write(dispatch_fd_t fd, dispatch_data_t data, dispatch_queue_t queue,
885		void (^handler)(dispatch_data_t, int))
886{
887	_dispatch_io_data_retain(data);
888	_dispatch_retain(queue);
889	_dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
890		// On barrier queue
891		if (fd_entry->err) {
892			int err = fd_entry->err;
893			dispatch_async(queue, ^{
894				_dispatch_fd_debug("convenience handler invoke", fd);
895				handler(NULL, err);
896			});
897			_dispatch_release(queue);
898			return;
899		}
900		// Safe to access fd_entry on barrier queue
901		dispatch_io_t channel = fd_entry->convenience_channel;
902		if (!channel) {
903			channel = _dispatch_io_create(DISPATCH_IO_STREAM);
904			channel->fd = fd;
905			channel->fd_actual = fd;
906			channel->fd_entry = fd_entry;
907			dispatch_retain(fd_entry->barrier_queue);
908			dispatch_retain(fd_entry->barrier_group);
909			channel->barrier_queue = fd_entry->barrier_queue;
910			channel->barrier_group = fd_entry->barrier_group;
911			fd_entry->convenience_channel = channel;
912		}
913		__block dispatch_data_t deliver_data = NULL;
914		__block int err = 0;
915		dispatch_async(fd_entry->close_queue, ^{
916			dispatch_async(queue, ^{
917				_dispatch_fd_debug("convenience handler invoke", fd);
918				handler(deliver_data, err);
919				if (deliver_data) {
920					_dispatch_io_data_release(deliver_data);
921				}
922			});
923			_dispatch_release(queue);
924		});
925		dispatch_operation_t op =
926			_dispatch_operation_create(DOP_DIR_WRITE, channel, 0,
927					dispatch_data_get_size(data), data,
928					_dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT,
929					false), ^(bool done, dispatch_data_t d, int error) {
930				if (done) {
931					if (d) {
932						_dispatch_io_data_retain(d);
933						deliver_data = d;
934					}
935					err = error;
936				}
937			});
938		if (op) {
939			_dispatch_operation_enqueue(op, DOP_DIR_WRITE, data);
940		}
941		_dispatch_io_data_release(data);
942	});
943}
944
945void
946dispatch_write_f(dispatch_fd_t fd, dispatch_data_t data, dispatch_queue_t queue,
947		void *context, void (*handler)(void *, dispatch_data_t, int))
948{
949	return dispatch_write(fd, data, queue, ^(dispatch_data_t d, int error){
950		handler(context, d, error);
951	});
952}
953
954#pragma mark -
955#pragma mark dispatch_operation_t
956
957static dispatch_operation_t
958_dispatch_operation_create(dispatch_op_direction_t direction,
959		dispatch_io_t channel, off_t offset, size_t length,
960		dispatch_data_t data, dispatch_queue_t queue,
961		dispatch_io_handler_t handler)
962{
963	// On channel queue
964	dispatch_assert(direction < DOP_DIR_MAX);
965	_dispatch_fd_debug("operation create", channel->fd);
966#if DISPATCH_IO_DEBUG
967	int fd = channel->fd;
968#endif
969	// Safe to call _dispatch_io_get_error() with channel->fd_entry since
970	// that can only be NULL if atomic_flags are set rdar://problem/8362514
971	int err = _dispatch_io_get_error(NULL, channel, false);
972	if (err || !length) {
973		_dispatch_io_data_retain(data);
974		_dispatch_retain(queue);
975		dispatch_async(channel->barrier_queue, ^{
976			dispatch_async(queue, ^{
977				dispatch_data_t d = data;
978				if (direction == DOP_DIR_READ && err) {
979					d = NULL;
980				} else if (direction == DOP_DIR_WRITE && !err) {
981					d = NULL;
982				}
983				_dispatch_fd_debug("IO handler invoke", fd);
984				handler(true, d, err);
985				_dispatch_io_data_release(data);
986			});
987			_dispatch_release(queue);
988		});
989		return NULL;
990	}
991	dispatch_operation_t op = _dispatch_alloc(DISPATCH_VTABLE(operation),
992			sizeof(struct dispatch_operation_s));
993	op->do_next = DISPATCH_OBJECT_LISTLESS;
994	op->do_xref_cnt = -1; // operation object is not exposed externally
995	op->op_q = dispatch_queue_create("com.apple.libdispatch-io.opq", NULL);
996	op->op_q->do_targetq = queue;
997	_dispatch_retain(queue);
998	op->active = false;
999	op->direction = direction;
1000	op->offset = offset + channel->f_ptr;
1001	op->length = length;
1002	op->handler = _dispatch_io_Block_copy(handler);
1003	_dispatch_retain(channel);
1004	op->channel = channel;
1005	op->params = channel->params;
1006	// Take a snapshot of the priority of the channel queue. The actual I/O
1007	// for this operation will be performed at this priority
1008	dispatch_queue_t targetq = op->channel->do_targetq;
1009	while (fastpath(targetq->do_targetq)) {
1010		targetq = targetq->do_targetq;
1011	}
1012	op->do_targetq = targetq;
1013	_dispatch_object_debug(op, "%s", __func__);
1014	return op;
1015}
1016
1017void
1018_dispatch_operation_dispose(dispatch_operation_t op)
1019{
1020	_dispatch_object_debug(op, "%s", __func__);
1021	// Deliver the data if there's any
1022	if (op->fd_entry) {
1023		_dispatch_operation_deliver_data(op, DOP_DONE);
1024		dispatch_group_leave(op->fd_entry->barrier_group);
1025		_dispatch_fd_entry_release(op->fd_entry);
1026	}
1027	if (op->channel) {
1028		_dispatch_release(op->channel);
1029	}
1030	if (op->timer) {
1031		dispatch_release(op->timer);
1032	}
1033	// For write operations, op->buf is owned by op->buf_data
1034	if (op->buf && op->direction == DOP_DIR_READ) {
1035		free(op->buf);
1036	}
1037	if (op->buf_data) {
1038		_dispatch_io_data_release(op->buf_data);
1039	}
1040	if (op->data) {
1041		_dispatch_io_data_release(op->data);
1042	}
1043	if (op->op_q) {
1044		dispatch_release(op->op_q);
1045	}
1046	Block_release(op->handler);
1047}
1048
1049static void
1050_dispatch_operation_enqueue(dispatch_operation_t op,
1051		dispatch_op_direction_t direction, dispatch_data_t data)
1052{
1053	// Called from the barrier queue
1054	_dispatch_io_data_retain(data);
1055	// If channel is closed or stopped, then call the handler immediately
1056	int err = _dispatch_io_get_error(NULL, op->channel, false);
1057	if (err) {
1058		dispatch_io_handler_t handler = op->handler;
1059		dispatch_async(op->op_q, ^{
1060			dispatch_data_t d = data;
1061			if (direction == DOP_DIR_READ && err) {
1062				d = NULL;
1063			} else if (direction == DOP_DIR_WRITE && !err) {
1064				d = NULL;
1065			}
1066			handler(true, d, err);
1067			_dispatch_io_data_release(data);
1068		});
1069		_dispatch_release(op);
1070		return;
1071	}
1072	// Finish operation init
1073	op->fd_entry = op->channel->fd_entry;
1074	_dispatch_fd_entry_retain(op->fd_entry);
1075	dispatch_group_enter(op->fd_entry->barrier_group);
1076	dispatch_disk_t disk = op->fd_entry->disk;
1077	if (!disk) {
1078		dispatch_stream_t stream = op->fd_entry->streams[direction];
1079		dispatch_async(stream->dq, ^{
1080			_dispatch_stream_enqueue_operation(stream, op, data);
1081			_dispatch_io_data_release(data);
1082		});
1083	} else {
1084		dispatch_async(disk->pick_queue, ^{
1085			_dispatch_disk_enqueue_operation(disk, op, data);
1086			_dispatch_io_data_release(data);
1087		});
1088	}
1089}
1090
1091static bool
1092_dispatch_operation_should_enqueue(dispatch_operation_t op,
1093		dispatch_queue_t tq, dispatch_data_t data)
1094{
1095	// On stream queue or disk queue
1096	_dispatch_fd_debug("enqueue operation", op->fd_entry->fd);
1097	_dispatch_io_data_retain(data);
1098	op->data = data;
1099	int err = _dispatch_io_get_error(op, NULL, true);
1100	if (err) {
1101		op->err = err;
1102		// Final release
1103		_dispatch_release(op);
1104		return false;
1105	}
1106	if (op->params.interval) {
1107		dispatch_resume(_dispatch_operation_timer(tq, op));
1108	}
1109	return true;
1110}
1111
1112static dispatch_source_t
1113_dispatch_operation_timer(dispatch_queue_t tq, dispatch_operation_t op)
1114{
1115	// On stream queue or pick queue
1116	if (op->timer) {
1117		return op->timer;
1118	}
1119	dispatch_source_t timer = dispatch_source_create(
1120			DISPATCH_SOURCE_TYPE_TIMER, 0, 0, tq);
1121	dispatch_source_set_timer(timer, dispatch_time(DISPATCH_TIME_NOW,
1122			(int64_t)op->params.interval), op->params.interval, 0);
1123	dispatch_source_set_event_handler(timer, ^{
1124		// On stream queue or pick queue
1125		if (dispatch_source_testcancel(timer)) {
1126			// Do nothing. The operation has already completed
1127			return;
1128		}
1129		dispatch_op_flags_t flags = DOP_DEFAULT;
1130		if (op->params.interval_flags & DISPATCH_IO_STRICT_INTERVAL) {
1131			// Deliver even if there is less data than the low-water mark
1132			flags |= DOP_DELIVER;
1133		}
1134		// If the operation is active, dont deliver data
1135		if ((op->active) && (flags & DOP_DELIVER)) {
1136			op->flags = flags;
1137		} else {
1138			_dispatch_operation_deliver_data(op, flags);
1139		}
1140	});
1141	op->timer = timer;
1142	return op->timer;
1143}
1144
1145#pragma mark -
1146#pragma mark dispatch_fd_entry_t
1147
1148#if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
1149static void
1150_dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry)
1151{
1152	guardid_t guard = fd_entry;
1153	const unsigned int guard_flags = GUARD_CLOSE;
1154	int err, fd_flags = 0;
1155	_dispatch_io_syscall_switch_noerr(err,
1156		change_fdguard_np(fd_entry->fd, NULL, 0, &guard, guard_flags,
1157				&fd_flags),
1158		case 0:
1159			fd_entry->guard_flags = guard_flags;
1160			fd_entry->orig_fd_flags = fd_flags;
1161			break;
1162		case EPERM: break;
1163		default: (void)dispatch_assume_zero(err); break;
1164	);
1165}
1166
1167static void
1168_dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry)
1169{
1170	if (!fd_entry->guard_flags) {
1171		return;
1172	}
1173	guardid_t guard = fd_entry;
1174	int err, fd_flags = fd_entry->orig_fd_flags;
1175	_dispatch_io_syscall_switch(err,
1176		change_fdguard_np(fd_entry->fd, &guard, fd_entry->guard_flags, NULL, 0,
1177				&fd_flags),
1178		default: (void)dispatch_assume_zero(err); break;
1179	);
1180}
1181#else
1182static inline void
1183_dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry) { (void)fd_entry; }
1184static inline void
1185_dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry) { (void)fd_entry; }
1186#endif // DISPATCH_USE_GUARDED_FD
1187
1188static inline int
1189_dispatch_fd_entry_guarded_open(dispatch_fd_entry_t fd_entry, const char *path,
1190		int oflag, mode_t mode) {
1191#if DISPATCH_USE_GUARDED_FD
1192	guardid_t guard = (uintptr_t)fd_entry;
1193	const unsigned int guard_flags = GUARD_CLOSE | GUARD_DUP |
1194			GUARD_SOCKET_IPC | GUARD_FILEPORT;
1195	int fd = guarded_open_np(path, &guard, guard_flags, oflag | O_CLOEXEC,
1196			mode);
1197	if (fd != -1) {
1198		fd_entry->guard_flags = guard_flags;
1199		return fd;
1200	}
1201	errno = 0;
1202#endif
1203	return open(path, oflag, mode);
1204	(void)fd_entry;
1205}
1206
1207static inline int
1208_dispatch_fd_entry_guarded_close(dispatch_fd_entry_t fd_entry, int fd) {
1209#if DISPATCH_USE_GUARDED_FD
1210	if (fd_entry->guard_flags) {
1211		guardid_t guard = (uintptr_t)fd_entry;
1212		return guarded_close_np(fd, &guard);
1213	} else
1214#endif
1215	{
1216		return close(fd);
1217	}
1218	(void)fd_entry;
1219}
1220
1221static inline void
1222_dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry) {
1223	dispatch_suspend(fd_entry->close_queue);
1224}
1225
1226static inline void
1227_dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry) {
1228	dispatch_resume(fd_entry->close_queue);
1229}
1230
1231static void
1232_dispatch_fd_entry_init_async(dispatch_fd_t fd,
1233		dispatch_fd_entry_init_callback_t completion_callback)
1234{
1235	static dispatch_once_t _dispatch_io_fds_lockq_pred;
1236	dispatch_once_f(&_dispatch_io_fds_lockq_pred, NULL,
1237			_dispatch_io_fds_lockq_init);
1238	dispatch_async(_dispatch_io_fds_lockq, ^{
1239		_dispatch_fd_debug("fd entry init", fd);
1240		dispatch_fd_entry_t fd_entry = NULL;
1241		// Check to see if there is an existing entry for the given fd
1242		uintptr_t hash = DIO_HASH(fd);
1243		TAILQ_FOREACH(fd_entry, &_dispatch_io_fds[hash], fd_list) {
1244			if (fd_entry->fd == fd) {
1245				// Retain the fd_entry to ensure it cannot go away until the
1246				// stat() has completed
1247				_dispatch_fd_entry_retain(fd_entry);
1248				break;
1249			}
1250		}
1251		if (!fd_entry) {
1252			// If we did not find an existing entry, create one
1253			fd_entry = _dispatch_fd_entry_create_with_fd(fd, hash);
1254		}
1255		dispatch_async(fd_entry->barrier_queue, ^{
1256			_dispatch_fd_debug("fd entry init completion", fd);
1257			completion_callback(fd_entry);
1258			// stat() is complete, release reference to fd_entry
1259			_dispatch_fd_entry_release(fd_entry);
1260		});
1261	});
1262}
1263
1264static dispatch_fd_entry_t
1265_dispatch_fd_entry_create(dispatch_queue_t q)
1266{
1267	dispatch_fd_entry_t fd_entry;
1268	fd_entry = _dispatch_calloc(1ul, sizeof(struct dispatch_fd_entry_s));
1269	fd_entry->close_queue = dispatch_queue_create(
1270			"com.apple.libdispatch-io.closeq", NULL);
1271	// Use target queue to ensure that no concurrent lookups are going on when
1272	// the close queue is running
1273	fd_entry->close_queue->do_targetq = q;
1274	_dispatch_retain(q);
1275	// Suspend the cleanup queue until closing
1276	_dispatch_fd_entry_retain(fd_entry);
1277	return fd_entry;
1278}
1279
1280static dispatch_fd_entry_t
1281_dispatch_fd_entry_create_with_fd(dispatch_fd_t fd, uintptr_t hash)
1282{
1283	// On fds lock queue
1284	_dispatch_fd_debug("fd entry create", fd);
1285	dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create(
1286			_dispatch_io_fds_lockq);
1287	fd_entry->fd = fd;
1288	TAILQ_INSERT_TAIL(&_dispatch_io_fds[hash], fd_entry, fd_list);
1289	fd_entry->barrier_queue = dispatch_queue_create(
1290			"com.apple.libdispatch-io.barrierq", NULL);
1291	fd_entry->barrier_group = dispatch_group_create();
1292	dispatch_async(fd_entry->barrier_queue, ^{
1293		_dispatch_fd_debug("fd entry stat", fd);
1294		int err, orig_flags, orig_nosigpipe = -1;
1295		struct stat st;
1296		_dispatch_io_syscall_switch(err,
1297			fstat(fd, &st),
1298			default: fd_entry->err = err; return;
1299		);
1300		fd_entry->stat.dev = st.st_dev;
1301		fd_entry->stat.mode = st.st_mode;
1302		_dispatch_fd_entry_guard(fd_entry);
1303		_dispatch_io_syscall_switch(err,
1304			orig_flags = fcntl(fd, F_GETFL),
1305			default: (void)dispatch_assume_zero(err); break;
1306		);
1307#if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
1308		if (S_ISFIFO(st.st_mode)) {
1309			_dispatch_io_syscall_switch(err,
1310				orig_nosigpipe = fcntl(fd, F_GETNOSIGPIPE),
1311				default: (void)dispatch_assume_zero(err); break;
1312			);
1313			if (orig_nosigpipe != -1) {
1314				_dispatch_io_syscall_switch(err,
1315					orig_nosigpipe = fcntl(fd, F_SETNOSIGPIPE, 1),
1316					default:
1317						orig_nosigpipe = -1;
1318						(void)dispatch_assume_zero(err);
1319						break;
1320				);
1321			}
1322		}
1323#endif
1324		if (S_ISREG(st.st_mode)) {
1325			if (orig_flags != -1) {
1326				_dispatch_io_syscall_switch(err,
1327					fcntl(fd, F_SETFL, orig_flags & ~O_NONBLOCK),
1328					default:
1329						orig_flags = -1;
1330						(void)dispatch_assume_zero(err);
1331						break;
1332				);
1333			}
1334			int32_t dev = major(st.st_dev);
1335			// We have to get the disk on the global dev queue. The
1336			// barrier queue cannot continue until that is complete
1337			dispatch_suspend(fd_entry->barrier_queue);
1338			dispatch_once_f(&_dispatch_io_devs_lockq_pred, NULL,
1339					_dispatch_io_devs_lockq_init);
1340			dispatch_async(_dispatch_io_devs_lockq, ^{
1341				_dispatch_disk_init(fd_entry, dev);
1342				dispatch_resume(fd_entry->barrier_queue);
1343			});
1344		} else {
1345			if (orig_flags != -1) {
1346				_dispatch_io_syscall_switch(err,
1347					fcntl(fd, F_SETFL, orig_flags | O_NONBLOCK),
1348					default:
1349						orig_flags = -1;
1350						(void)dispatch_assume_zero(err);
1351						break;
1352				);
1353			}
1354			_dispatch_stream_init(fd_entry, _dispatch_get_root_queue(
1355					DISPATCH_QUEUE_PRIORITY_DEFAULT, false));
1356		}
1357		fd_entry->orig_flags = orig_flags;
1358		fd_entry->orig_nosigpipe = orig_nosigpipe;
1359	});
1360	// This is the first item run when the close queue is resumed, indicating
1361	// that all channels associated with this entry have been closed and that
1362	// all operations associated with this entry have been freed
1363	dispatch_async(fd_entry->close_queue, ^{
1364		if (!fd_entry->disk) {
1365			_dispatch_fd_debug("close queue fd_entry cleanup", fd);
1366			dispatch_op_direction_t dir;
1367			for (dir = 0; dir < DOP_DIR_MAX; dir++) {
1368				_dispatch_stream_dispose(fd_entry, dir);
1369			}
1370		} else {
1371			dispatch_disk_t disk = fd_entry->disk;
1372			dispatch_async(_dispatch_io_devs_lockq, ^{
1373				_dispatch_release(disk);
1374			});
1375		}
1376		// Remove this entry from the global fd list
1377		TAILQ_REMOVE(&_dispatch_io_fds[hash], fd_entry, fd_list);
1378	});
1379	// If there was a source associated with this stream, disposing of the
1380	// source cancels it and suspends the close queue. Freeing the fd_entry
1381	// structure must happen after the source cancel handler has finished
1382	dispatch_async(fd_entry->close_queue, ^{
1383		_dispatch_fd_debug("close queue release", fd);
1384		dispatch_release(fd_entry->close_queue);
1385		_dispatch_fd_debug("barrier queue release", fd);
1386		dispatch_release(fd_entry->barrier_queue);
1387		_dispatch_fd_debug("barrier group release", fd);
1388		dispatch_release(fd_entry->barrier_group);
1389		if (fd_entry->orig_flags != -1) {
1390			_dispatch_io_syscall(
1391				fcntl(fd, F_SETFL, fd_entry->orig_flags)
1392			);
1393		}
1394#if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
1395		if (fd_entry->orig_nosigpipe != -1) {
1396			_dispatch_io_syscall(
1397				fcntl(fd, F_SETNOSIGPIPE, fd_entry->orig_nosigpipe)
1398			);
1399		}
1400#endif
1401		_dispatch_fd_entry_unguard(fd_entry);
1402		if (fd_entry->convenience_channel) {
1403			fd_entry->convenience_channel->fd_entry = NULL;
1404			dispatch_release(fd_entry->convenience_channel);
1405		}
1406		free(fd_entry);
1407	});
1408	return fd_entry;
1409}
1410
1411static dispatch_fd_entry_t
1412_dispatch_fd_entry_create_with_path(dispatch_io_path_data_t path_data,
1413		dev_t dev, mode_t mode)
1414{
1415	// On devs lock queue
1416	_dispatch_fd_debug("fd entry create with path %s", -1, path_data->path);
1417	dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create(
1418			path_data->channel->queue);
1419	if (S_ISREG(mode)) {
1420		_dispatch_disk_init(fd_entry, major(dev));
1421	} else {
1422		_dispatch_stream_init(fd_entry, _dispatch_get_root_queue(
1423				DISPATCH_QUEUE_PRIORITY_DEFAULT, false));
1424	}
1425	fd_entry->fd = -1;
1426	fd_entry->orig_flags = -1;
1427	fd_entry->path_data = path_data;
1428	fd_entry->stat.dev = dev;
1429	fd_entry->stat.mode = mode;
1430	fd_entry->barrier_queue = dispatch_queue_create(
1431			"com.apple.libdispatch-io.barrierq", NULL);
1432	fd_entry->barrier_group = dispatch_group_create();
1433	// This is the first item run when the close queue is resumed, indicating
1434	// that the channel associated with this entry has been closed and that
1435	// all operations associated with this entry have been freed
1436	dispatch_async(fd_entry->close_queue, ^{
1437		_dispatch_fd_debug("close queue fd_entry cleanup", -1);
1438		if (!fd_entry->disk) {
1439			dispatch_op_direction_t dir;
1440			for (dir = 0; dir < DOP_DIR_MAX; dir++) {
1441				_dispatch_stream_dispose(fd_entry, dir);
1442			}
1443		}
1444		if (fd_entry->fd != -1) {
1445			_dispatch_fd_entry_guarded_close(fd_entry, fd_entry->fd);
1446		}
1447		if (fd_entry->path_data->channel) {
1448			// If associated channel has not been released yet, mark it as
1449			// no longer having an fd_entry (for stop after close).
1450			// It is safe to modify channel since we are on close_queue with
1451			// target queue the channel queue
1452			fd_entry->path_data->channel->fd_entry = NULL;
1453		}
1454	});
1455	dispatch_async(fd_entry->close_queue, ^{
1456		_dispatch_fd_debug("close queue release", -1);
1457		dispatch_release(fd_entry->close_queue);
1458		dispatch_release(fd_entry->barrier_queue);
1459		dispatch_release(fd_entry->barrier_group);
1460		free(fd_entry->path_data);
1461		free(fd_entry);
1462	});
1463	return fd_entry;
1464}
1465
1466static int
1467_dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry, dispatch_io_t channel)
1468{
1469	if (!(fd_entry->fd == -1 && fd_entry->path_data)) {
1470		return 0;
1471	}
1472	if (fd_entry->err) {
1473		return fd_entry->err;
1474	}
1475	int fd = -1;
1476	int oflag = fd_entry->disk ? fd_entry->path_data->oflag & ~O_NONBLOCK :
1477			fd_entry->path_data->oflag | O_NONBLOCK;
1478open:
1479	fd = _dispatch_fd_entry_guarded_open(fd_entry, fd_entry->path_data->path,
1480			oflag, fd_entry->path_data->mode);
1481	if (fd == -1) {
1482		int err = errno;
1483		if (err == EINTR) {
1484			goto open;
1485		}
1486		(void)dispatch_atomic_cmpxchg2o(fd_entry, err, 0, err, relaxed);
1487		return err;
1488	}
1489	if (!dispatch_atomic_cmpxchg2o(fd_entry, fd, -1, fd, relaxed)) {
1490		// Lost the race with another open
1491		_dispatch_fd_entry_guarded_close(fd_entry, fd);
1492	} else {
1493		channel->fd_actual = fd;
1494	}
1495	_dispatch_object_debug(channel, "%s", __func__);
1496	return 0;
1497}
1498
1499static void
1500_dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry,
1501		dispatch_io_t channel)
1502{
1503	if (fd_entry->disk) {
1504		if (channel) {
1505			_dispatch_retain(channel);
1506		}
1507		_dispatch_fd_entry_retain(fd_entry);
1508		dispatch_async(fd_entry->disk->pick_queue, ^{
1509			_dispatch_disk_cleanup_operations(fd_entry->disk, channel);
1510			_dispatch_fd_entry_release(fd_entry);
1511			if (channel) {
1512				_dispatch_release(channel);
1513			}
1514		});
1515	} else {
1516		dispatch_op_direction_t direction;
1517		for (direction = 0; direction < DOP_DIR_MAX; direction++) {
1518			dispatch_stream_t stream = fd_entry->streams[direction];
1519			if (!stream) {
1520				continue;
1521			}
1522			if (channel) {
1523				_dispatch_retain(channel);
1524			}
1525			_dispatch_fd_entry_retain(fd_entry);
1526			dispatch_async(stream->dq, ^{
1527				_dispatch_stream_cleanup_operations(stream, channel);
1528				_dispatch_fd_entry_release(fd_entry);
1529				if (channel) {
1530					_dispatch_release(channel);
1531				}
1532			});
1533		}
1534	}
1535}
1536
1537#pragma mark -
1538#pragma mark dispatch_stream_t/dispatch_disk_t
1539
1540static void
1541_dispatch_stream_init(dispatch_fd_entry_t fd_entry, dispatch_queue_t tq)
1542{
1543	dispatch_op_direction_t direction;
1544	for (direction = 0; direction < DOP_DIR_MAX; direction++) {
1545		dispatch_stream_t stream;
1546		stream = _dispatch_calloc(1ul, sizeof(struct dispatch_stream_s));
1547		stream->dq = dispatch_queue_create("com.apple.libdispatch-io.streamq",
1548				NULL);
1549		dispatch_set_context(stream->dq, stream);
1550		_dispatch_retain(tq);
1551		stream->dq->do_targetq = tq;
1552		TAILQ_INIT(&stream->operations[DISPATCH_IO_RANDOM]);
1553		TAILQ_INIT(&stream->operations[DISPATCH_IO_STREAM]);
1554		fd_entry->streams[direction] = stream;
1555	}
1556}
1557
1558static void
1559_dispatch_stream_dispose(dispatch_fd_entry_t fd_entry,
1560		dispatch_op_direction_t direction)
1561{
1562	// On close queue
1563	dispatch_stream_t stream = fd_entry->streams[direction];
1564	if (!stream) {
1565		return;
1566	}
1567	dispatch_assert(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM]));
1568	dispatch_assert(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM]));
1569	if (stream->source) {
1570		// Balanced by source cancel handler:
1571		_dispatch_fd_entry_retain(fd_entry);
1572		dispatch_source_cancel(stream->source);
1573		dispatch_resume(stream->source);
1574		dispatch_release(stream->source);
1575	}
1576	dispatch_set_context(stream->dq, NULL);
1577	dispatch_release(stream->dq);
1578	free(stream);
1579}
1580
1581static void
1582_dispatch_disk_init(dispatch_fd_entry_t fd_entry, dev_t dev)
1583{
1584	// On devs lock queue
1585	dispatch_disk_t disk;
1586	// Check to see if there is an existing entry for the given device
1587	uintptr_t hash = DIO_HASH(dev);
1588	TAILQ_FOREACH(disk, &_dispatch_io_devs[hash], disk_list) {
1589		if (disk->dev == dev) {
1590			_dispatch_retain(disk);
1591			goto out;
1592		}
1593	}
1594	// Otherwise create a new entry
1595	size_t pending_reqs_depth = dispatch_io_defaults.max_pending_io_reqs;
1596	disk = _dispatch_alloc(DISPATCH_VTABLE(disk),
1597			sizeof(struct dispatch_disk_s) +
1598			(pending_reqs_depth * sizeof(dispatch_operation_t)));
1599	disk->do_next = DISPATCH_OBJECT_LISTLESS;
1600	disk->do_xref_cnt = -1;
1601	disk->advise_list_depth = pending_reqs_depth;
1602	disk->do_targetq = _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT,
1603			false);
1604	disk->dev = dev;
1605	TAILQ_INIT(&disk->operations);
1606	disk->cur_rq = TAILQ_FIRST(&disk->operations);
1607	char label[45];
1608	snprintf(label, sizeof(label), "com.apple.libdispatch-io.deviceq.%d", dev);
1609	disk->pick_queue = dispatch_queue_create(label, NULL);
1610	TAILQ_INSERT_TAIL(&_dispatch_io_devs[hash], disk, disk_list);
1611out:
1612	fd_entry->disk = disk;
1613	TAILQ_INIT(&fd_entry->stream_ops);
1614}
1615
1616void
1617_dispatch_disk_dispose(dispatch_disk_t disk)
1618{
1619	uintptr_t hash = DIO_HASH(disk->dev);
1620	TAILQ_REMOVE(&_dispatch_io_devs[hash], disk, disk_list);
1621	dispatch_assert(TAILQ_EMPTY(&disk->operations));
1622	size_t i;
1623	for (i=0; i<disk->advise_list_depth; ++i) {
1624		dispatch_assert(!disk->advise_list[i]);
1625	}
1626	dispatch_release(disk->pick_queue);
1627}
1628
1629#pragma mark -
1630#pragma mark dispatch_stream_operations/dispatch_disk_operations
1631
1632static inline bool
1633_dispatch_stream_operation_avail(dispatch_stream_t stream)
1634{
1635	return  !(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])) ||
1636			!(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM]));
1637}
1638
1639static void
1640_dispatch_stream_enqueue_operation(dispatch_stream_t stream,
1641		dispatch_operation_t op, dispatch_data_t data)
1642{
1643	if (!_dispatch_operation_should_enqueue(op, stream->dq, data)) {
1644		return;
1645	}
1646	_dispatch_object_debug(op, "%s", __func__);
1647	bool no_ops = !_dispatch_stream_operation_avail(stream);
1648	TAILQ_INSERT_TAIL(&stream->operations[op->params.type], op, operation_list);
1649	if (no_ops) {
1650		dispatch_async_f(stream->dq, stream->dq,
1651				_dispatch_stream_queue_handler);
1652	}
1653}
1654
1655static void
1656_dispatch_disk_enqueue_operation(dispatch_disk_t disk, dispatch_operation_t op,
1657		dispatch_data_t data)
1658{
1659	if (!_dispatch_operation_should_enqueue(op, disk->pick_queue, data)) {
1660		return;
1661	}
1662	_dispatch_object_debug(op, "%s", __func__);
1663	if (op->params.type == DISPATCH_IO_STREAM) {
1664		if (TAILQ_EMPTY(&op->fd_entry->stream_ops)) {
1665			TAILQ_INSERT_TAIL(&disk->operations, op, operation_list);
1666		}
1667		TAILQ_INSERT_TAIL(&op->fd_entry->stream_ops, op, stream_list);
1668	} else {
1669		TAILQ_INSERT_TAIL(&disk->operations, op, operation_list);
1670	}
1671	_dispatch_disk_handler(disk);
1672}
1673
1674static void
1675_dispatch_stream_complete_operation(dispatch_stream_t stream,
1676		dispatch_operation_t op)
1677{
1678	// On stream queue
1679	_dispatch_object_debug(op, "%s", __func__);
1680	_dispatch_fd_debug("complete operation", op->fd_entry->fd);
1681	TAILQ_REMOVE(&stream->operations[op->params.type], op, operation_list);
1682	if (op == stream->op) {
1683		stream->op = NULL;
1684	}
1685	if (op->timer) {
1686		dispatch_source_cancel(op->timer);
1687	}
1688	// Final release will deliver any pending data
1689	_dispatch_release(op);
1690}
1691
1692static void
1693_dispatch_disk_complete_operation(dispatch_disk_t disk, dispatch_operation_t op)
1694{
1695	// On pick queue
1696	_dispatch_object_debug(op, "%s", __func__);
1697	_dispatch_fd_debug("complete operation", op->fd_entry->fd);
1698	// Current request is always the last op returned
1699	if (disk->cur_rq == op) {
1700		disk->cur_rq = TAILQ_PREV(op, dispatch_disk_operations_s,
1701				operation_list);
1702	}
1703	if (op->params.type == DISPATCH_IO_STREAM) {
1704		// Check if there are other pending stream operations behind it
1705		dispatch_operation_t op_next = TAILQ_NEXT(op, stream_list);
1706		TAILQ_REMOVE(&op->fd_entry->stream_ops, op, stream_list);
1707		if (op_next) {
1708			TAILQ_INSERT_TAIL(&disk->operations, op_next, operation_list);
1709		}
1710	}
1711	TAILQ_REMOVE(&disk->operations, op, operation_list);
1712	if (op->timer) {
1713		dispatch_source_cancel(op->timer);
1714	}
1715	// Final release will deliver any pending data
1716	_dispatch_release(op);
1717}
1718
1719static dispatch_operation_t
1720_dispatch_stream_pick_next_operation(dispatch_stream_t stream,
1721		dispatch_operation_t op)
1722{
1723	// On stream queue
1724	if (!op) {
1725		// On the first run through, pick the first operation
1726		if (!_dispatch_stream_operation_avail(stream)) {
1727			return op;
1728		}
1729		if (!TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM])) {
1730			op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_STREAM]);
1731		} else if (!TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])) {
1732			op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]);
1733		}
1734		return op;
1735	}
1736	if (op->params.type == DISPATCH_IO_STREAM) {
1737		// Stream operations need to be serialized so continue the current
1738		// operation until it is finished
1739		return op;
1740	}
1741	// Get the next random operation (round-robin)
1742	if (op->params.type == DISPATCH_IO_RANDOM) {
1743		op = TAILQ_NEXT(op, operation_list);
1744		if (!op) {
1745			op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]);
1746		}
1747		return op;
1748	}
1749	return NULL;
1750}
1751
1752static dispatch_operation_t
1753_dispatch_disk_pick_next_operation(dispatch_disk_t disk)
1754{
1755	// On pick queue
1756	dispatch_operation_t op;
1757	if (!TAILQ_EMPTY(&disk->operations)) {
1758		if (disk->cur_rq == NULL) {
1759			op = TAILQ_FIRST(&disk->operations);
1760		} else {
1761			op = disk->cur_rq;
1762			do {
1763				op = TAILQ_NEXT(op, operation_list);
1764				if (!op) {
1765					op = TAILQ_FIRST(&disk->operations);
1766				}
1767				// TODO: more involved picking algorithm rdar://problem/8780312
1768			} while (op->active && op != disk->cur_rq);
1769		}
1770		if (!op->active) {
1771			disk->cur_rq = op;
1772			return op;
1773		}
1774	}
1775	return NULL;
1776}
1777
1778static void
1779_dispatch_stream_cleanup_operations(dispatch_stream_t stream,
1780		dispatch_io_t channel)
1781{
1782	// On stream queue
1783	dispatch_operation_t op, tmp;
1784	typeof(*stream->operations) *operations;
1785	operations = &stream->operations[DISPATCH_IO_RANDOM];
1786	TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) {
1787		if (!channel || op->channel == channel) {
1788			_dispatch_stream_complete_operation(stream, op);
1789		}
1790	}
1791	operations = &stream->operations[DISPATCH_IO_STREAM];
1792	TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) {
1793		if (!channel || op->channel == channel) {
1794			_dispatch_stream_complete_operation(stream, op);
1795		}
1796	}
1797	if (stream->source_running && !_dispatch_stream_operation_avail(stream)) {
1798		dispatch_suspend(stream->source);
1799		stream->source_running = false;
1800	}
1801}
1802
1803static void
1804_dispatch_disk_cleanup_operations(dispatch_disk_t disk, dispatch_io_t channel)
1805{
1806	// On pick queue
1807	dispatch_operation_t op, tmp;
1808	TAILQ_FOREACH_SAFE(op, &disk->operations, operation_list, tmp) {
1809		if (!channel || op->channel == channel) {
1810			_dispatch_disk_complete_operation(disk, op);
1811		}
1812	}
1813}
1814
1815#pragma mark -
1816#pragma mark dispatch_stream_handler/dispatch_disk_handler
1817
1818static dispatch_source_t
1819_dispatch_stream_source(dispatch_stream_t stream, dispatch_operation_t op)
1820{
1821	// On stream queue
1822	if (stream->source) {
1823		return stream->source;
1824	}
1825	dispatch_fd_t fd = op->fd_entry->fd;
1826	_dispatch_fd_debug("stream source create", fd);
1827	dispatch_source_t source = NULL;
1828	if (op->direction == DOP_DIR_READ) {
1829		source = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ,
1830				(uintptr_t)fd, 0, stream->dq);
1831	} else if (op->direction == DOP_DIR_WRITE) {
1832		source = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE,
1833				(uintptr_t)fd, 0, stream->dq);
1834	} else {
1835		dispatch_assert(op->direction < DOP_DIR_MAX);
1836		return NULL;
1837	}
1838	dispatch_set_context(source, stream);
1839	dispatch_source_set_event_handler_f(source,
1840			_dispatch_stream_source_handler);
1841	// Close queue must not run user cleanup handlers until sources are fully
1842	// unregistered
1843	dispatch_queue_t close_queue = op->fd_entry->close_queue;
1844	dispatch_source_set_cancel_handler(source, ^{
1845		_dispatch_fd_debug("stream source cancel", fd);
1846		dispatch_resume(close_queue);
1847	});
1848	stream->source = source;
1849	return stream->source;
1850}
1851
1852static void
1853_dispatch_stream_source_handler(void *ctx)
1854{
1855	// On stream queue
1856	dispatch_stream_t stream = (dispatch_stream_t)ctx;
1857	dispatch_suspend(stream->source);
1858	stream->source_running = false;
1859	return _dispatch_stream_handler(stream);
1860}
1861
1862static void
1863_dispatch_stream_queue_handler(void *ctx)
1864{
1865	// On stream queue
1866	dispatch_stream_t stream = (dispatch_stream_t)dispatch_get_context(ctx);
1867	if (!stream) {
1868		// _dispatch_stream_dispose has been called
1869		return;
1870	}
1871	return _dispatch_stream_handler(stream);
1872}
1873
1874static void
1875_dispatch_stream_handler(void *ctx)
1876{
1877	// On stream queue
1878	dispatch_stream_t stream = (dispatch_stream_t)ctx;
1879	dispatch_operation_t op;
1880pick:
1881	op = _dispatch_stream_pick_next_operation(stream, stream->op);
1882	if (!op) {
1883		_dispatch_debug("no operation found: stream %p", stream);
1884		return;
1885	}
1886	int err = _dispatch_io_get_error(op, NULL, true);
1887	if (err) {
1888		op->err = err;
1889		_dispatch_stream_complete_operation(stream, op);
1890		goto pick;
1891	}
1892	stream->op = op;
1893	_dispatch_fd_debug("stream handler", op->fd_entry->fd);
1894	dispatch_fd_entry_t fd_entry = op->fd_entry;
1895	_dispatch_fd_entry_retain(fd_entry);
1896	// For performance analysis
1897	if (!op->total && dispatch_io_defaults.initial_delivery) {
1898		// Empty delivery to signal the start of the operation
1899		_dispatch_fd_debug("initial delivery", op->fd_entry->fd);
1900		_dispatch_operation_deliver_data(op, DOP_DELIVER);
1901	}
1902	// TODO: perform on the operation target queue to get correct priority
1903	int result = _dispatch_operation_perform(op);
1904	dispatch_op_flags_t flags = ~0u;
1905	switch (result) {
1906	case DISPATCH_OP_DELIVER:
1907		flags = DOP_DEFAULT;
1908		// Fall through
1909	case DISPATCH_OP_DELIVER_AND_COMPLETE:
1910		flags = (flags != DOP_DEFAULT) ? DOP_DELIVER | DOP_NO_EMPTY :
1911				DOP_DEFAULT;
1912		_dispatch_operation_deliver_data(op, flags);
1913		// Fall through
1914	case DISPATCH_OP_COMPLETE:
1915		if (flags != DOP_DEFAULT) {
1916			_dispatch_stream_complete_operation(stream, op);
1917		}
1918		if (_dispatch_stream_operation_avail(stream)) {
1919			dispatch_async_f(stream->dq, stream->dq,
1920					_dispatch_stream_queue_handler);
1921		}
1922		break;
1923	case DISPATCH_OP_COMPLETE_RESUME:
1924		_dispatch_stream_complete_operation(stream, op);
1925		// Fall through
1926	case DISPATCH_OP_RESUME:
1927		if (_dispatch_stream_operation_avail(stream)) {
1928			stream->source_running = true;
1929			dispatch_resume(_dispatch_stream_source(stream, op));
1930		}
1931		break;
1932	case DISPATCH_OP_ERR:
1933		_dispatch_stream_cleanup_operations(stream, op->channel);
1934		break;
1935	case DISPATCH_OP_FD_ERR:
1936		_dispatch_fd_entry_retain(fd_entry);
1937		dispatch_async(fd_entry->barrier_queue, ^{
1938			_dispatch_fd_entry_cleanup_operations(fd_entry, NULL);
1939			_dispatch_fd_entry_release(fd_entry);
1940		});
1941		break;
1942	default:
1943		break;
1944	}
1945	_dispatch_fd_entry_release(fd_entry);
1946	return;
1947}
1948
1949static void
1950_dispatch_disk_handler(void *ctx)
1951{
1952	// On pick queue
1953	dispatch_disk_t disk = (dispatch_disk_t)ctx;
1954	if (disk->io_active) {
1955		return;
1956	}
1957	_dispatch_fd_debug("disk handler", -1);
1958	dispatch_operation_t op;
1959	size_t i = disk->free_idx, j = disk->req_idx;
1960	if (j <= i) {
1961		j += disk->advise_list_depth;
1962	}
1963	while (i <= j) {
1964		if ((!disk->advise_list[i%disk->advise_list_depth]) &&
1965				(op = _dispatch_disk_pick_next_operation(disk))) {
1966			int err = _dispatch_io_get_error(op, NULL, true);
1967			if (err) {
1968				op->err = err;
1969				_dispatch_disk_complete_operation(disk, op);
1970				continue;
1971			}
1972			_dispatch_retain(op);
1973			disk->advise_list[i%disk->advise_list_depth] = op;
1974			op->active = true;
1975			_dispatch_object_debug(op, "%s", __func__);
1976		} else {
1977			// No more operations to get
1978			break;
1979		}
1980		i++;
1981	}
1982	disk->free_idx = (i%disk->advise_list_depth);
1983	op = disk->advise_list[disk->req_idx];
1984	if (op) {
1985		disk->io_active = true;
1986		dispatch_async_f(op->do_targetq, disk, _dispatch_disk_perform);
1987	}
1988}
1989
1990static void
1991_dispatch_disk_perform(void *ctxt)
1992{
1993	dispatch_disk_t disk = ctxt;
1994	size_t chunk_size = dispatch_io_defaults.chunk_pages * PAGE_SIZE;
1995	_dispatch_fd_debug("disk perform", -1);
1996	dispatch_operation_t op;
1997	size_t i = disk->advise_idx, j = disk->free_idx;
1998	if (j <= i) {
1999		j += disk->advise_list_depth;
2000	}
2001	do {
2002		op = disk->advise_list[i%disk->advise_list_depth];
2003		if (!op) {
2004			// Nothing more to advise, must be at free_idx
2005			dispatch_assert(i%disk->advise_list_depth == disk->free_idx);
2006			break;
2007		}
2008		if (op->direction == DOP_DIR_WRITE) {
2009			// TODO: preallocate writes ? rdar://problem/9032172
2010			continue;
2011		}
2012		if (op->fd_entry->fd == -1 && _dispatch_fd_entry_open(op->fd_entry,
2013				op->channel)) {
2014			continue;
2015		}
2016		// For performance analysis
2017		if (!op->total && dispatch_io_defaults.initial_delivery) {
2018			// Empty delivery to signal the start of the operation
2019			_dispatch_fd_debug("initial delivery", op->fd_entry->fd);
2020			_dispatch_operation_deliver_data(op, DOP_DELIVER);
2021		}
2022		// Advise two chunks if the list only has one element and this is the
2023		// first advise on the operation
2024		if ((j-i) == 1 && !disk->advise_list[disk->free_idx] &&
2025				!op->advise_offset) {
2026			chunk_size *= 2;
2027		}
2028		_dispatch_operation_advise(op, chunk_size);
2029	} while (++i < j);
2030	disk->advise_idx = i%disk->advise_list_depth;
2031	op = disk->advise_list[disk->req_idx];
2032	int result = _dispatch_operation_perform(op);
2033	disk->advise_list[disk->req_idx] = NULL;
2034	disk->req_idx = (++disk->req_idx)%disk->advise_list_depth;
2035	dispatch_async(disk->pick_queue, ^{
2036		switch (result) {
2037		case DISPATCH_OP_DELIVER:
2038			_dispatch_operation_deliver_data(op, DOP_DEFAULT);
2039			break;
2040		case DISPATCH_OP_COMPLETE:
2041			_dispatch_disk_complete_operation(disk, op);
2042			break;
2043		case DISPATCH_OP_DELIVER_AND_COMPLETE:
2044			_dispatch_operation_deliver_data(op, DOP_DELIVER | DOP_NO_EMPTY);
2045			_dispatch_disk_complete_operation(disk, op);
2046			break;
2047		case DISPATCH_OP_ERR:
2048			_dispatch_disk_cleanup_operations(disk, op->channel);
2049			break;
2050		case DISPATCH_OP_FD_ERR:
2051			_dispatch_disk_cleanup_operations(disk, NULL);
2052			break;
2053		default:
2054			dispatch_assert(result);
2055			break;
2056		}
2057		op->active = false;
2058		disk->io_active = false;
2059		_dispatch_disk_handler(disk);
2060		// Balancing the retain in _dispatch_disk_handler. Note that op must be
2061		// released at the very end, since it might hold the last reference to
2062		// the disk
2063		_dispatch_release(op);
2064	});
2065}
2066
2067#pragma mark -
2068#pragma mark dispatch_operation_perform
2069
2070static void
2071_dispatch_operation_advise(dispatch_operation_t op, size_t chunk_size)
2072{
2073	int err;
2074	struct radvisory advise;
2075	// No point in issuing a read advise for the next chunk if we are already
2076	// a chunk ahead from reading the bytes
2077	if (op->advise_offset > (off_t)(((size_t)op->offset + op->total) +
2078			chunk_size + PAGE_SIZE)) {
2079		return;
2080	}
2081	_dispatch_object_debug(op, "%s", __func__);
2082	advise.ra_count = (int)chunk_size;
2083	if (!op->advise_offset) {
2084		op->advise_offset = op->offset;
2085		// If this is the first time through, align the advised range to a
2086		// page boundary
2087		size_t pg_fraction = ((size_t)op->offset + chunk_size) % PAGE_SIZE;
2088		advise.ra_count += (int)(pg_fraction ? PAGE_SIZE - pg_fraction : 0);
2089	}
2090	advise.ra_offset = op->advise_offset;
2091	op->advise_offset += advise.ra_count;
2092	_dispatch_io_syscall_switch(err,
2093		fcntl(op->fd_entry->fd, F_RDADVISE, &advise),
2094		case EFBIG: break; // advised past the end of the file rdar://10415691
2095		case ENOTSUP: break; // not all FS support radvise rdar://13484629
2096		// TODO: set disk status on error
2097		default: (void)dispatch_assume_zero(err); break;
2098	);
2099}
2100
2101static int
2102_dispatch_operation_perform(dispatch_operation_t op)
2103{
2104	int err = _dispatch_io_get_error(op, NULL, true);
2105	if (err) {
2106		goto error;
2107	}
2108	_dispatch_object_debug(op, "%s", __func__);
2109	if (!op->buf) {
2110		size_t max_buf_siz = op->params.high;
2111		size_t chunk_siz = dispatch_io_defaults.chunk_pages * PAGE_SIZE;
2112		if (op->direction == DOP_DIR_READ) {
2113			// If necessary, create a buffer for the ongoing operation, large
2114			// enough to fit chunk_pages but at most high-water
2115			size_t data_siz = dispatch_data_get_size(op->data);
2116			if (data_siz) {
2117				dispatch_assert(data_siz < max_buf_siz);
2118				max_buf_siz -= data_siz;
2119			}
2120			if (max_buf_siz > chunk_siz) {
2121				max_buf_siz = chunk_siz;
2122			}
2123			if (op->length < SIZE_MAX) {
2124				op->buf_siz = op->length - op->total;
2125				if (op->buf_siz > max_buf_siz) {
2126					op->buf_siz = max_buf_siz;
2127				}
2128			} else {
2129				op->buf_siz = max_buf_siz;
2130			}
2131			op->buf = valloc(op->buf_siz);
2132			_dispatch_fd_debug("buffer allocated", op->fd_entry->fd);
2133		} else if (op->direction == DOP_DIR_WRITE) {
2134			// Always write the first data piece, if that is smaller than a
2135			// chunk, accumulate further data pieces until chunk size is reached
2136			if (chunk_siz > max_buf_siz) {
2137				chunk_siz = max_buf_siz;
2138			}
2139			op->buf_siz = 0;
2140			dispatch_data_apply(op->data,
2141					^(dispatch_data_t region DISPATCH_UNUSED,
2142					size_t offset DISPATCH_UNUSED,
2143					const void* buf DISPATCH_UNUSED, size_t len) {
2144				size_t siz = op->buf_siz + len;
2145				if (!op->buf_siz || siz <= chunk_siz) {
2146					op->buf_siz = siz;
2147				}
2148				return (bool)(siz < chunk_siz);
2149			});
2150			if (op->buf_siz > max_buf_siz) {
2151				op->buf_siz = max_buf_siz;
2152			}
2153			dispatch_data_t d;
2154			d = dispatch_data_create_subrange(op->data, 0, op->buf_siz);
2155			op->buf_data = dispatch_data_create_map(d, (const void**)&op->buf,
2156					NULL);
2157			_dispatch_io_data_release(d);
2158			_dispatch_fd_debug("buffer mapped", op->fd_entry->fd);
2159		}
2160	}
2161	if (op->fd_entry->fd == -1) {
2162		err = _dispatch_fd_entry_open(op->fd_entry, op->channel);
2163		if (err) {
2164			goto error;
2165		}
2166	}
2167	void *buf = op->buf + op->buf_len;
2168	size_t len = op->buf_siz - op->buf_len;
2169	off_t off = (off_t)((size_t)op->offset + op->total);
2170	ssize_t processed = -1;
2171syscall:
2172	if (op->direction == DOP_DIR_READ) {
2173		if (op->params.type == DISPATCH_IO_STREAM) {
2174			processed = read(op->fd_entry->fd, buf, len);
2175		} else if (op->params.type == DISPATCH_IO_RANDOM) {
2176			processed = pread(op->fd_entry->fd, buf, len, off);
2177		}
2178	} else if (op->direction == DOP_DIR_WRITE) {
2179		if (op->params.type == DISPATCH_IO_STREAM) {
2180			processed = write(op->fd_entry->fd, buf, len);
2181		} else if (op->params.type == DISPATCH_IO_RANDOM) {
2182			processed = pwrite(op->fd_entry->fd, buf, len, off);
2183		}
2184	}
2185	// Encountered an error on the file descriptor
2186	if (processed == -1) {
2187		err = errno;
2188		if (err == EINTR) {
2189			goto syscall;
2190		}
2191		goto error;
2192	}
2193	// EOF is indicated by two handler invocations
2194	if (processed == 0) {
2195		_dispatch_fd_debug("EOF", op->fd_entry->fd);
2196		return DISPATCH_OP_DELIVER_AND_COMPLETE;
2197	}
2198	op->buf_len += (size_t)processed;
2199	op->total += (size_t)processed;
2200	if (op->total == op->length) {
2201		// Finished processing all the bytes requested by the operation
2202		return DISPATCH_OP_COMPLETE;
2203	} else {
2204		// Deliver data only if we satisfy the filters
2205		return DISPATCH_OP_DELIVER;
2206	}
2207error:
2208	if (err == EAGAIN) {
2209		// For disk based files with blocking I/O we should never get EAGAIN
2210		dispatch_assert(!op->fd_entry->disk);
2211		_dispatch_fd_debug("EAGAIN %d", op->fd_entry->fd, err);
2212		if (op->direction == DOP_DIR_READ && op->total &&
2213				op->channel == op->fd_entry->convenience_channel) {
2214			// Convenience read with available data completes on EAGAIN
2215			return DISPATCH_OP_COMPLETE_RESUME;
2216		}
2217		return DISPATCH_OP_RESUME;
2218	}
2219	op->err = err;
2220	switch (err) {
2221	case ECANCELED:
2222		return DISPATCH_OP_ERR;
2223	case EBADF:
2224		(void)dispatch_atomic_cmpxchg2o(op->fd_entry, err, 0, err, relaxed);
2225		return DISPATCH_OP_FD_ERR;
2226	default:
2227		return DISPATCH_OP_COMPLETE;
2228	}
2229}
2230
2231static void
2232_dispatch_operation_deliver_data(dispatch_operation_t op,
2233		dispatch_op_flags_t flags)
2234{
2235	// Either called from stream resp. pick queue or when op is finalized
2236	dispatch_data_t data = NULL;
2237	int err = 0;
2238	size_t undelivered = op->undelivered + op->buf_len;
2239	bool deliver = (flags & (DOP_DELIVER|DOP_DONE)) ||
2240			(op->flags & DOP_DELIVER);
2241	op->flags = DOP_DEFAULT;
2242	if (!deliver) {
2243		// Don't deliver data until low water mark has been reached
2244		if (undelivered >= op->params.low) {
2245			deliver = true;
2246		} else if (op->buf_len < op->buf_siz) {
2247			// Request buffer is not yet used up
2248			_dispatch_fd_debug("buffer data", op->fd_entry->fd);
2249			return;
2250		}
2251	} else {
2252		err = op->err;
2253		if (!err && (op->channel->atomic_flags & DIO_STOPPED)) {
2254			err = ECANCELED;
2255			op->err = err;
2256		}
2257	}
2258	// Deliver data or buffer used up
2259	if (op->direction == DOP_DIR_READ) {
2260		if (op->buf_len) {
2261			void *buf = op->buf;
2262			data = dispatch_data_create(buf, op->buf_len, NULL,
2263					DISPATCH_DATA_DESTRUCTOR_FREE);
2264			op->buf = NULL;
2265			op->buf_len = 0;
2266			dispatch_data_t d = dispatch_data_create_concat(op->data, data);
2267			_dispatch_io_data_release(op->data);
2268			_dispatch_io_data_release(data);
2269			data = d;
2270		} else {
2271			data = op->data;
2272		}
2273		op->data = deliver ? dispatch_data_empty : data;
2274	} else if (op->direction == DOP_DIR_WRITE) {
2275		if (deliver) {
2276			data = dispatch_data_create_subrange(op->data, op->buf_len,
2277					op->length);
2278		}
2279		if (op->buf_data && op->buf_len == op->buf_siz) {
2280			_dispatch_io_data_release(op->buf_data);
2281			op->buf_data = NULL;
2282			op->buf = NULL;
2283			op->buf_len = 0;
2284			// Trim newly written buffer from head of unwritten data
2285			dispatch_data_t d;
2286			if (deliver) {
2287				_dispatch_io_data_retain(data);
2288				d = data;
2289			} else {
2290				d = dispatch_data_create_subrange(op->data, op->buf_siz,
2291						op->length);
2292			}
2293			_dispatch_io_data_release(op->data);
2294			op->data = d;
2295		}
2296	} else {
2297		dispatch_assert(op->direction < DOP_DIR_MAX);
2298		return;
2299	}
2300	if (!deliver || ((flags & DOP_NO_EMPTY) && !dispatch_data_get_size(data))) {
2301		op->undelivered = undelivered;
2302		_dispatch_fd_debug("buffer data", op->fd_entry->fd);
2303		return;
2304	}
2305	op->undelivered = 0;
2306	_dispatch_object_debug(op, "%s", __func__);
2307	_dispatch_fd_debug("deliver data", op->fd_entry->fd);
2308	dispatch_op_direction_t direction = op->direction;
2309	dispatch_io_handler_t handler = op->handler;
2310#if DISPATCH_IO_DEBUG
2311	int fd = op->fd_entry->fd;
2312#endif
2313	dispatch_fd_entry_t fd_entry = op->fd_entry;
2314	_dispatch_fd_entry_retain(fd_entry);
2315	dispatch_io_t channel = op->channel;
2316	_dispatch_retain(channel);
2317	// Note that data delivery may occur after the operation is freed
2318	dispatch_async(op->op_q, ^{
2319		bool done = (flags & DOP_DONE);
2320		dispatch_data_t d = data;
2321		if (done) {
2322			if (direction == DOP_DIR_READ && err) {
2323				if (dispatch_data_get_size(d)) {
2324					_dispatch_fd_debug("IO handler invoke", fd);
2325					handler(false, d, 0);
2326				}
2327				d = NULL;
2328			} else if (direction == DOP_DIR_WRITE && !err) {
2329				d = NULL;
2330			}
2331		}
2332		_dispatch_fd_debug("IO handler invoke", fd);
2333		handler(done, d, err);
2334		_dispatch_release(channel);
2335		_dispatch_fd_entry_release(fd_entry);
2336		_dispatch_io_data_release(data);
2337	});
2338}
2339
2340#pragma mark -
2341#pragma mark dispatch_io_debug
2342
2343static size_t
2344_dispatch_io_debug_attr(dispatch_io_t channel, char* buf, size_t bufsiz)
2345{
2346	dispatch_queue_t target = channel->do_targetq;
2347	return dsnprintf(buf, bufsiz, "type = %s, fd = 0x%x, %sfd_entry = %p, "
2348			"queue = %p, target = %s[%p], barrier_queue = %p, barrier_group = "
2349			"%p, err = 0x%x, low = 0x%zx, high = 0x%zx, interval%s = %llu ",
2350			channel->params.type == DISPATCH_IO_STREAM ? "stream" : "random",
2351			channel->fd_actual, channel->atomic_flags & DIO_STOPPED ?
2352			"stopped, " : channel->atomic_flags & DIO_CLOSED ? "closed, " : "",
2353			channel->fd_entry, channel->queue, target && target->dq_label ?
2354			target->dq_label : "", target, channel->barrier_queue,
2355			channel->barrier_group, channel->err, channel->params.low,
2356			channel->params.high, channel->params.interval_flags &
2357			DISPATCH_IO_STRICT_INTERVAL ? "(strict)" : "",
2358			channel->params.interval);
2359}
2360
2361size_t
2362_dispatch_io_debug(dispatch_io_t channel, char* buf, size_t bufsiz)
2363{
2364	size_t offset = 0;
2365	offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
2366			dx_kind(channel), channel);
2367	offset += _dispatch_object_debug_attr(channel, &buf[offset],
2368			bufsiz - offset);
2369	offset += _dispatch_io_debug_attr(channel, &buf[offset], bufsiz - offset);
2370	offset += dsnprintf(&buf[offset], bufsiz - offset, "}");
2371	return offset;
2372}
2373
2374static size_t
2375_dispatch_operation_debug_attr(dispatch_operation_t op, char* buf,
2376		size_t bufsiz)
2377{
2378	dispatch_queue_t target = op->do_targetq;
2379	dispatch_queue_t oqtarget = op->op_q ? op->op_q->do_targetq : NULL;
2380	return dsnprintf(buf, bufsiz, "type = %s %s, fd = 0x%x, fd_entry = %p, "
2381			"channel = %p, queue = %p -> %s[%p], target = %s[%p], "
2382			"offset = %lld, length = %zu, done = %zu, undelivered = %zu, "
2383			"flags = %u, err = 0x%x, low = 0x%zx, high = 0x%zx, "
2384			"interval%s = %llu ", op->params.type == DISPATCH_IO_STREAM ?
2385			"stream" : "random", op->direction == DOP_DIR_READ ? "read" :
2386			"write", op->fd_entry ? op->fd_entry->fd : -1, op->fd_entry,
2387			op->channel, op->op_q, oqtarget && oqtarget->dq_label ?
2388			oqtarget->dq_label : "", oqtarget, target && target->dq_label ?
2389			target->dq_label : "", target, op->offset, op->length, op->total,
2390			op->undelivered + op->buf_len, op->flags, op->err, op->params.low,
2391			op->params.high, op->params.interval_flags &
2392			DISPATCH_IO_STRICT_INTERVAL ? "(strict)" : "", op->params.interval);
2393}
2394
2395size_t
2396_dispatch_operation_debug(dispatch_operation_t op, char* buf, size_t bufsiz)
2397{
2398	size_t offset = 0;
2399	offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
2400			dx_kind(op), op);
2401	offset += _dispatch_object_debug_attr(op, &buf[offset], bufsiz - offset);
2402	offset += _dispatch_operation_debug_attr(op, &buf[offset], bufsiz - offset);
2403	offset += dsnprintf(&buf[offset], bufsiz - offset, "}");
2404	return offset;
2405}
2406