1/* 2 * Copyright (c) 2009-2013 Apple Inc. All rights reserved. 3 * 4 * @APPLE_APACHE_LICENSE_HEADER_START@ 5 * 6 * Licensed under the Apache License, Version 2.0 (the "License"); 7 * you may not use this file except in compliance with the License. 8 * You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 * 18 * @APPLE_APACHE_LICENSE_HEADER_END@ 19 */ 20 21#include "internal.h" 22 23#ifndef DISPATCH_IO_DEBUG 24#define DISPATCH_IO_DEBUG DISPATCH_DEBUG 25#endif 26 27#if DISPATCH_IO_DEBUG 28#define _dispatch_fd_debug(msg, fd, args...) \ 29 _dispatch_debug("fd[0x%x]: " msg, (fd), ##args) 30#else 31#define _dispatch_fd_debug(msg, fd, args...) 32#endif 33 34#if USE_OBJC 35#define _dispatch_io_data_retain(x) _dispatch_objc_retain(x) 36#define _dispatch_io_data_release(x) _dispatch_objc_release(x) 37#else 38#define _dispatch_io_data_retain(x) dispatch_retain(x) 39#define _dispatch_io_data_release(x) dispatch_release(x) 40#endif 41 42typedef void (^dispatch_fd_entry_init_callback_t)(dispatch_fd_entry_t fd_entry); 43 44DISPATCH_EXPORT DISPATCH_NOTHROW 45void _dispatch_iocntl(uint32_t param, uint64_t value); 46 47static dispatch_operation_t _dispatch_operation_create( 48 dispatch_op_direction_t direction, dispatch_io_t channel, off_t offset, 49 size_t length, dispatch_data_t data, dispatch_queue_t queue, 50 dispatch_io_handler_t handler); 51static void _dispatch_operation_enqueue(dispatch_operation_t op, 52 dispatch_op_direction_t direction, dispatch_data_t data); 53static dispatch_source_t _dispatch_operation_timer(dispatch_queue_t tq, 54 dispatch_operation_t op); 55static inline void _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry); 56static inline void _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry); 57static void _dispatch_fd_entry_init_async(dispatch_fd_t fd, 58 dispatch_fd_entry_init_callback_t completion_callback); 59static dispatch_fd_entry_t _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd, 60 uintptr_t hash); 61static dispatch_fd_entry_t _dispatch_fd_entry_create_with_path( 62 dispatch_io_path_data_t path_data, dev_t dev, mode_t mode); 63static int _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry, 64 dispatch_io_t channel); 65static void _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry, 66 dispatch_io_t channel); 67static void _dispatch_stream_init(dispatch_fd_entry_t fd_entry, 68 dispatch_queue_t tq); 69static void _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry, 70 dispatch_op_direction_t direction); 71static void _dispatch_disk_init(dispatch_fd_entry_t fd_entry, dev_t dev); 72static void _dispatch_stream_enqueue_operation(dispatch_stream_t stream, 73 dispatch_operation_t operation, dispatch_data_t data); 74static void _dispatch_disk_enqueue_operation(dispatch_disk_t dsk, 75 dispatch_operation_t operation, dispatch_data_t data); 76static void _dispatch_stream_cleanup_operations(dispatch_stream_t stream, 77 dispatch_io_t channel); 78static void _dispatch_disk_cleanup_operations(dispatch_disk_t disk, 79 dispatch_io_t channel); 80static void _dispatch_stream_source_handler(void *ctx); 81static void _dispatch_stream_queue_handler(void *ctx); 82static void _dispatch_stream_handler(void *ctx); 83static void _dispatch_disk_handler(void *ctx); 84static void _dispatch_disk_perform(void *ctxt); 85static void _dispatch_operation_advise(dispatch_operation_t op, 86 size_t chunk_size); 87static int _dispatch_operation_perform(dispatch_operation_t op); 88static void _dispatch_operation_deliver_data(dispatch_operation_t op, 89 dispatch_op_flags_t flags); 90 91// Macros to wrap syscalls which return -1 on error, and retry on EINTR 92#define _dispatch_io_syscall_switch_noerr(_err, _syscall, ...) do { \ 93 switch (((_err) = (((_syscall) == -1) ? errno : 0))) { \ 94 case EINTR: continue; \ 95 __VA_ARGS__ \ 96 } \ 97 break; \ 98 } while (1) 99#define _dispatch_io_syscall_switch(__err, __syscall, ...) do { \ 100 _dispatch_io_syscall_switch_noerr(__err, __syscall, \ 101 case 0: break; \ 102 __VA_ARGS__ \ 103 ); \ 104 } while (0) 105#define _dispatch_io_syscall(__syscall) do { int __err; \ 106 _dispatch_io_syscall_switch(__err, __syscall); \ 107 } while (0) 108 109enum { 110 DISPATCH_OP_COMPLETE = 1, 111 DISPATCH_OP_DELIVER, 112 DISPATCH_OP_DELIVER_AND_COMPLETE, 113 DISPATCH_OP_COMPLETE_RESUME, 114 DISPATCH_OP_RESUME, 115 DISPATCH_OP_ERR, 116 DISPATCH_OP_FD_ERR, 117}; 118 119#define _dispatch_io_Block_copy(x) \ 120 ((typeof(x))_dispatch_Block_copy((dispatch_block_t)(x))) 121 122#pragma mark - 123#pragma mark dispatch_io_hashtables 124 125#if TARGET_OS_EMBEDDED 126#define DIO_HASH_SIZE 64u // must be a power of two 127#else 128#define DIO_HASH_SIZE 256u // must be a power of two 129#endif 130#define DIO_HASH(x) ((uintptr_t)(x) & (DIO_HASH_SIZE - 1)) 131 132// Global hashtable of dev_t -> disk_s mappings 133DISPATCH_CACHELINE_ALIGN 134static TAILQ_HEAD(, dispatch_disk_s) _dispatch_io_devs[DIO_HASH_SIZE]; 135// Global hashtable of fd -> fd_entry_s mappings 136DISPATCH_CACHELINE_ALIGN 137static TAILQ_HEAD(, dispatch_fd_entry_s) _dispatch_io_fds[DIO_HASH_SIZE]; 138 139static dispatch_once_t _dispatch_io_devs_lockq_pred; 140static dispatch_queue_t _dispatch_io_devs_lockq; 141static dispatch_queue_t _dispatch_io_fds_lockq; 142 143static void 144_dispatch_io_fds_lockq_init(void *context DISPATCH_UNUSED) 145{ 146 _dispatch_io_fds_lockq = dispatch_queue_create( 147 "com.apple.libdispatch-io.fd_lockq", NULL); 148 unsigned int i; 149 for (i = 0; i < DIO_HASH_SIZE; i++) { 150 TAILQ_INIT(&_dispatch_io_fds[i]); 151 } 152} 153 154static void 155_dispatch_io_devs_lockq_init(void *context DISPATCH_UNUSED) 156{ 157 _dispatch_io_devs_lockq = dispatch_queue_create( 158 "com.apple.libdispatch-io.dev_lockq", NULL); 159 unsigned int i; 160 for (i = 0; i < DIO_HASH_SIZE; i++) { 161 TAILQ_INIT(&_dispatch_io_devs[i]); 162 } 163} 164 165#pragma mark - 166#pragma mark dispatch_io_defaults 167 168enum { 169 DISPATCH_IOCNTL_CHUNK_PAGES = 1, 170 DISPATCH_IOCNTL_LOW_WATER_CHUNKS, 171 DISPATCH_IOCNTL_INITIAL_DELIVERY, 172 DISPATCH_IOCNTL_MAX_PENDING_IO_REQS, 173}; 174 175static struct dispatch_io_defaults_s { 176 size_t chunk_pages, low_water_chunks, max_pending_io_reqs; 177 bool initial_delivery; 178} dispatch_io_defaults = { 179 .chunk_pages = DIO_MAX_CHUNK_PAGES, 180 .low_water_chunks = DIO_DEFAULT_LOW_WATER_CHUNKS, 181 .max_pending_io_reqs = DIO_MAX_PENDING_IO_REQS, 182}; 183 184#define _dispatch_iocntl_set_default(p, v) do { \ 185 dispatch_io_defaults.p = (typeof(dispatch_io_defaults.p))(v); \ 186 } while (0) 187 188void 189_dispatch_iocntl(uint32_t param, uint64_t value) 190{ 191 switch (param) { 192 case DISPATCH_IOCNTL_CHUNK_PAGES: 193 _dispatch_iocntl_set_default(chunk_pages, value); 194 break; 195 case DISPATCH_IOCNTL_LOW_WATER_CHUNKS: 196 _dispatch_iocntl_set_default(low_water_chunks, value); 197 break; 198 case DISPATCH_IOCNTL_INITIAL_DELIVERY: 199 _dispatch_iocntl_set_default(initial_delivery, value); 200 case DISPATCH_IOCNTL_MAX_PENDING_IO_REQS: 201 _dispatch_iocntl_set_default(max_pending_io_reqs, value); 202 break; 203 } 204} 205 206#pragma mark - 207#pragma mark dispatch_io_t 208 209static dispatch_io_t 210_dispatch_io_create(dispatch_io_type_t type) 211{ 212 dispatch_io_t channel = _dispatch_alloc(DISPATCH_VTABLE(io), 213 sizeof(struct dispatch_io_s)); 214 channel->do_next = DISPATCH_OBJECT_LISTLESS; 215 channel->do_targetq = _dispatch_get_root_queue(0, true); 216 channel->params.type = type; 217 channel->params.high = SIZE_MAX; 218 channel->params.low = dispatch_io_defaults.low_water_chunks * 219 dispatch_io_defaults.chunk_pages * PAGE_SIZE; 220 channel->queue = dispatch_queue_create("com.apple.libdispatch-io.channelq", 221 NULL); 222 return channel; 223} 224 225static void 226_dispatch_io_init(dispatch_io_t channel, dispatch_fd_entry_t fd_entry, 227 dispatch_queue_t queue, int err, void (^cleanup_handler)(int)) 228{ 229 // Enqueue the cleanup handler on the suspended close queue 230 if (cleanup_handler) { 231 _dispatch_retain(queue); 232 dispatch_async(!err ? fd_entry->close_queue : channel->queue, ^{ 233 dispatch_async(queue, ^{ 234 _dispatch_fd_debug("cleanup handler invoke", -1); 235 cleanup_handler(err); 236 }); 237 _dispatch_release(queue); 238 }); 239 } 240 if (fd_entry) { 241 channel->fd_entry = fd_entry; 242 dispatch_retain(fd_entry->barrier_queue); 243 dispatch_retain(fd_entry->barrier_group); 244 channel->barrier_queue = fd_entry->barrier_queue; 245 channel->barrier_group = fd_entry->barrier_group; 246 } else { 247 // Still need to create a barrier queue, since all operations go 248 // through it 249 channel->barrier_queue = dispatch_queue_create( 250 "com.apple.libdispatch-io.barrierq", NULL); 251 channel->barrier_group = dispatch_group_create(); 252 } 253} 254 255void 256_dispatch_io_dispose(dispatch_io_t channel) 257{ 258 _dispatch_object_debug(channel, "%s", __func__); 259 if (channel->fd_entry && 260 !(channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED))) { 261 if (channel->fd_entry->path_data) { 262 // This modification is safe since path_data->channel is checked 263 // only on close_queue (which is still suspended at this point) 264 channel->fd_entry->path_data->channel = NULL; 265 } 266 // Cleanup handlers will only run when all channels related to this 267 // fd are complete 268 _dispatch_fd_entry_release(channel->fd_entry); 269 } 270 if (channel->queue) { 271 dispatch_release(channel->queue); 272 } 273 if (channel->barrier_queue) { 274 dispatch_release(channel->barrier_queue); 275 } 276 if (channel->barrier_group) { 277 dispatch_release(channel->barrier_group); 278 } 279} 280 281static int 282_dispatch_io_validate_type(dispatch_io_t channel, mode_t mode) 283{ 284 int err = 0; 285 if (S_ISDIR(mode)) { 286 err = EISDIR; 287 } else if (channel->params.type == DISPATCH_IO_RANDOM && 288 (S_ISFIFO(mode) || S_ISSOCK(mode))) { 289 err = ESPIPE; 290 } 291 return err; 292} 293 294static int 295_dispatch_io_get_error(dispatch_operation_t op, dispatch_io_t channel, 296 bool ignore_closed) 297{ 298 // On _any_ queue 299 int err; 300 if (op) { 301 channel = op->channel; 302 } 303 if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) { 304 if (!ignore_closed || channel->atomic_flags & DIO_STOPPED) { 305 err = ECANCELED; 306 } else { 307 err = 0; 308 } 309 } else { 310 err = op ? op->fd_entry->err : channel->err; 311 } 312 return err; 313} 314 315#pragma mark - 316#pragma mark dispatch_io_channels 317 318dispatch_io_t 319dispatch_io_create(dispatch_io_type_t type, dispatch_fd_t fd, 320 dispatch_queue_t queue, void (^cleanup_handler)(int)) 321{ 322 if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) { 323 return NULL; 324 } 325 _dispatch_fd_debug("io create", fd); 326 dispatch_io_t channel = _dispatch_io_create(type); 327 channel->fd = fd; 328 channel->fd_actual = fd; 329 dispatch_suspend(channel->queue); 330 _dispatch_retain(queue); 331 _dispatch_retain(channel); 332 _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) { 333 // On barrier queue 334 int err = fd_entry->err; 335 if (!err) { 336 err = _dispatch_io_validate_type(channel, fd_entry->stat.mode); 337 } 338 if (!err && type == DISPATCH_IO_RANDOM) { 339 off_t f_ptr; 340 _dispatch_io_syscall_switch_noerr(err, 341 f_ptr = lseek(fd_entry->fd, 0, SEEK_CUR), 342 case 0: channel->f_ptr = f_ptr; break; 343 default: (void)dispatch_assume_zero(err); break; 344 ); 345 } 346 channel->err = err; 347 _dispatch_fd_entry_retain(fd_entry); 348 _dispatch_io_init(channel, fd_entry, queue, err, cleanup_handler); 349 dispatch_resume(channel->queue); 350 _dispatch_object_debug(channel, "%s", __func__); 351 _dispatch_release(channel); 352 _dispatch_release(queue); 353 }); 354 _dispatch_object_debug(channel, "%s", __func__); 355 return channel; 356} 357 358dispatch_io_t 359dispatch_io_create_f(dispatch_io_type_t type, dispatch_fd_t fd, 360 dispatch_queue_t queue, void *context, 361 void (*cleanup_handler)(void *context, int error)) 362{ 363 return dispatch_io_create(type, fd, queue, !cleanup_handler ? NULL : 364 ^(int error){ cleanup_handler(context, error); }); 365} 366 367dispatch_io_t 368dispatch_io_create_with_path(dispatch_io_type_t type, const char *path, 369 int oflag, mode_t mode, dispatch_queue_t queue, 370 void (^cleanup_handler)(int error)) 371{ 372 if ((type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) || 373 !(path && *path == '/')) { 374 return NULL; 375 } 376 size_t pathlen = strlen(path); 377 dispatch_io_path_data_t path_data = malloc(sizeof(*path_data) + pathlen+1); 378 if (!path_data) { 379 return NULL; 380 } 381 _dispatch_fd_debug("io create with path %s", -1, path); 382 dispatch_io_t channel = _dispatch_io_create(type); 383 channel->fd = -1; 384 channel->fd_actual = -1; 385 path_data->channel = channel; 386 path_data->oflag = oflag; 387 path_data->mode = mode; 388 path_data->pathlen = pathlen; 389 memcpy(path_data->path, path, pathlen + 1); 390 _dispatch_retain(queue); 391 _dispatch_retain(channel); 392 dispatch_async(channel->queue, ^{ 393 int err = 0; 394 struct stat st; 395 _dispatch_io_syscall_switch_noerr(err, 396 (path_data->oflag & O_NOFOLLOW) == O_NOFOLLOW || 397 (path_data->oflag & O_SYMLINK) == O_SYMLINK ? 398 lstat(path_data->path, &st) : stat(path_data->path, &st), 399 case 0: 400 err = _dispatch_io_validate_type(channel, st.st_mode); 401 break; 402 default: 403 if ((path_data->oflag & O_CREAT) && 404 (*(path_data->path + path_data->pathlen - 1) != '/')) { 405 // Check parent directory 406 char *c = strrchr(path_data->path, '/'); 407 dispatch_assert(c); 408 *c = 0; 409 int perr; 410 _dispatch_io_syscall_switch_noerr(perr, 411 stat(path_data->path, &st), 412 case 0: 413 // Since the parent directory exists, open() will 414 // create a regular file after the fd_entry has 415 // been filled in 416 st.st_mode = S_IFREG; 417 err = 0; 418 break; 419 ); 420 *c = '/'; 421 } 422 break; 423 ); 424 channel->err = err; 425 if (err) { 426 free(path_data); 427 _dispatch_io_init(channel, NULL, queue, err, cleanup_handler); 428 _dispatch_release(channel); 429 _dispatch_release(queue); 430 return; 431 } 432 dispatch_suspend(channel->queue); 433 dispatch_once_f(&_dispatch_io_devs_lockq_pred, NULL, 434 _dispatch_io_devs_lockq_init); 435 dispatch_async(_dispatch_io_devs_lockq, ^{ 436 dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create_with_path( 437 path_data, st.st_dev, st.st_mode); 438 _dispatch_io_init(channel, fd_entry, queue, 0, cleanup_handler); 439 dispatch_resume(channel->queue); 440 _dispatch_object_debug(channel, "%s", __func__); 441 _dispatch_release(channel); 442 _dispatch_release(queue); 443 }); 444 }); 445 _dispatch_object_debug(channel, "%s", __func__); 446 return channel; 447} 448 449dispatch_io_t 450dispatch_io_create_with_path_f(dispatch_io_type_t type, const char *path, 451 int oflag, mode_t mode, dispatch_queue_t queue, void *context, 452 void (*cleanup_handler)(void *context, int error)) 453{ 454 return dispatch_io_create_with_path(type, path, oflag, mode, queue, 455 !cleanup_handler ? NULL : 456 ^(int error){ cleanup_handler(context, error); }); 457} 458 459dispatch_io_t 460dispatch_io_create_with_io(dispatch_io_type_t type, dispatch_io_t in_channel, 461 dispatch_queue_t queue, void (^cleanup_handler)(int error)) 462{ 463 if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) { 464 return NULL; 465 } 466 _dispatch_fd_debug("io create with io %p", -1, in_channel); 467 dispatch_io_t channel = _dispatch_io_create(type); 468 dispatch_suspend(channel->queue); 469 _dispatch_retain(queue); 470 _dispatch_retain(channel); 471 _dispatch_retain(in_channel); 472 dispatch_async(in_channel->queue, ^{ 473 int err0 = _dispatch_io_get_error(NULL, in_channel, false); 474 if (err0) { 475 channel->err = err0; 476 _dispatch_io_init(channel, NULL, queue, err0, cleanup_handler); 477 dispatch_resume(channel->queue); 478 _dispatch_release(channel); 479 _dispatch_release(in_channel); 480 _dispatch_release(queue); 481 return; 482 } 483 dispatch_async(in_channel->barrier_queue, ^{ 484 int err = _dispatch_io_get_error(NULL, in_channel, false); 485 // If there is no error, the fd_entry for the in_channel is valid. 486 // Since we are running on in_channel's queue, the fd_entry has been 487 // fully resolved and will stay valid for the duration of this block 488 if (!err) { 489 err = in_channel->err; 490 if (!err) { 491 err = in_channel->fd_entry->err; 492 } 493 } 494 if (!err) { 495 err = _dispatch_io_validate_type(channel, 496 in_channel->fd_entry->stat.mode); 497 } 498 if (!err && type == DISPATCH_IO_RANDOM && in_channel->fd != -1) { 499 off_t f_ptr; 500 _dispatch_io_syscall_switch_noerr(err, 501 f_ptr = lseek(in_channel->fd_entry->fd, 0, SEEK_CUR), 502 case 0: channel->f_ptr = f_ptr; break; 503 default: (void)dispatch_assume_zero(err); break; 504 ); 505 } 506 channel->err = err; 507 if (err) { 508 _dispatch_io_init(channel, NULL, queue, err, cleanup_handler); 509 dispatch_resume(channel->queue); 510 _dispatch_release(channel); 511 _dispatch_release(in_channel); 512 _dispatch_release(queue); 513 return; 514 } 515 if (in_channel->fd == -1) { 516 // in_channel was created from path 517 channel->fd = -1; 518 channel->fd_actual = -1; 519 mode_t mode = in_channel->fd_entry->stat.mode; 520 dev_t dev = in_channel->fd_entry->stat.dev; 521 size_t path_data_len = sizeof(struct dispatch_io_path_data_s) + 522 in_channel->fd_entry->path_data->pathlen + 1; 523 dispatch_io_path_data_t path_data = malloc(path_data_len); 524 memcpy(path_data, in_channel->fd_entry->path_data, 525 path_data_len); 526 path_data->channel = channel; 527 // lockq_io_devs is known to already exist 528 dispatch_async(_dispatch_io_devs_lockq, ^{ 529 dispatch_fd_entry_t fd_entry; 530 fd_entry = _dispatch_fd_entry_create_with_path(path_data, 531 dev, mode); 532 _dispatch_io_init(channel, fd_entry, queue, 0, 533 cleanup_handler); 534 dispatch_resume(channel->queue); 535 _dispatch_release(channel); 536 _dispatch_release(queue); 537 }); 538 } else { 539 dispatch_fd_entry_t fd_entry = in_channel->fd_entry; 540 channel->fd = in_channel->fd; 541 channel->fd_actual = in_channel->fd_actual; 542 _dispatch_fd_entry_retain(fd_entry); 543 _dispatch_io_init(channel, fd_entry, queue, 0, cleanup_handler); 544 dispatch_resume(channel->queue); 545 _dispatch_release(channel); 546 _dispatch_release(queue); 547 } 548 _dispatch_release(in_channel); 549 _dispatch_object_debug(channel, "%s", __func__); 550 }); 551 }); 552 _dispatch_object_debug(channel, "%s", __func__); 553 return channel; 554} 555 556dispatch_io_t 557dispatch_io_create_with_io_f(dispatch_io_type_t type, dispatch_io_t in_channel, 558 dispatch_queue_t queue, void *context, 559 void (*cleanup_handler)(void *context, int error)) 560{ 561 return dispatch_io_create_with_io(type, in_channel, queue, 562 !cleanup_handler ? NULL : 563 ^(int error){ cleanup_handler(context, error); }); 564} 565 566#pragma mark - 567#pragma mark dispatch_io_accessors 568 569void 570dispatch_io_set_high_water(dispatch_io_t channel, size_t high_water) 571{ 572 _dispatch_retain(channel); 573 dispatch_async(channel->queue, ^{ 574 _dispatch_fd_debug("io set high water", channel->fd); 575 if (channel->params.low > high_water) { 576 channel->params.low = high_water; 577 } 578 channel->params.high = high_water ? high_water : 1; 579 _dispatch_release(channel); 580 }); 581} 582 583void 584dispatch_io_set_low_water(dispatch_io_t channel, size_t low_water) 585{ 586 _dispatch_retain(channel); 587 dispatch_async(channel->queue, ^{ 588 _dispatch_fd_debug("io set low water", channel->fd); 589 if (channel->params.high < low_water) { 590 channel->params.high = low_water ? low_water : 1; 591 } 592 channel->params.low = low_water; 593 _dispatch_release(channel); 594 }); 595} 596 597void 598dispatch_io_set_interval(dispatch_io_t channel, uint64_t interval, 599 unsigned long flags) 600{ 601 _dispatch_retain(channel); 602 dispatch_async(channel->queue, ^{ 603 _dispatch_fd_debug("io set interval", channel->fd); 604 channel->params.interval = interval < INT64_MAX ? interval : INT64_MAX; 605 channel->params.interval_flags = flags; 606 _dispatch_release(channel); 607 }); 608} 609 610void 611_dispatch_io_set_target_queue(dispatch_io_t channel, dispatch_queue_t dq) 612{ 613 _dispatch_retain(dq); 614 _dispatch_retain(channel); 615 dispatch_async(channel->queue, ^{ 616 dispatch_queue_t prev_dq = channel->do_targetq; 617 channel->do_targetq = dq; 618 _dispatch_release(prev_dq); 619 _dispatch_object_debug(channel, "%s", __func__); 620 _dispatch_release(channel); 621 }); 622} 623 624dispatch_fd_t 625dispatch_io_get_descriptor(dispatch_io_t channel) 626{ 627 if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) { 628 return -1; 629 } 630 dispatch_fd_t fd = channel->fd_actual; 631 if (fd == -1 && _dispatch_thread_getspecific(dispatch_io_key) == channel && 632 !_dispatch_io_get_error(NULL, channel, false)) { 633 dispatch_fd_entry_t fd_entry = channel->fd_entry; 634 (void)_dispatch_fd_entry_open(fd_entry, channel); 635 } 636 return channel->fd_actual; 637} 638 639#pragma mark - 640#pragma mark dispatch_io_operations 641 642static void 643_dispatch_io_stop(dispatch_io_t channel) 644{ 645 _dispatch_fd_debug("io stop", channel->fd); 646 (void)dispatch_atomic_or2o(channel, atomic_flags, DIO_STOPPED, relaxed); 647 _dispatch_retain(channel); 648 dispatch_async(channel->queue, ^{ 649 dispatch_async(channel->barrier_queue, ^{ 650 _dispatch_object_debug(channel, "%s", __func__); 651 dispatch_fd_entry_t fd_entry = channel->fd_entry; 652 if (fd_entry) { 653 _dispatch_fd_debug("io stop cleanup", channel->fd); 654 _dispatch_fd_entry_cleanup_operations(fd_entry, channel); 655 if (!(channel->atomic_flags & DIO_CLOSED)) { 656 channel->fd_entry = NULL; 657 _dispatch_fd_entry_release(fd_entry); 658 } 659 } else if (channel->fd != -1) { 660 // Stop after close, need to check if fd_entry still exists 661 _dispatch_retain(channel); 662 dispatch_async(_dispatch_io_fds_lockq, ^{ 663 _dispatch_object_debug(channel, "%s", __func__); 664 _dispatch_fd_debug("io stop after close cleanup", 665 channel->fd); 666 dispatch_fd_entry_t fdi; 667 uintptr_t hash = DIO_HASH(channel->fd); 668 TAILQ_FOREACH(fdi, &_dispatch_io_fds[hash], fd_list) { 669 if (fdi->fd == channel->fd) { 670 _dispatch_fd_entry_cleanup_operations(fdi, channel); 671 break; 672 } 673 } 674 _dispatch_release(channel); 675 }); 676 } 677 _dispatch_release(channel); 678 }); 679 }); 680} 681 682void 683dispatch_io_close(dispatch_io_t channel, unsigned long flags) 684{ 685 if (flags & DISPATCH_IO_STOP) { 686 // Don't stop an already stopped channel 687 if (channel->atomic_flags & DIO_STOPPED) { 688 return; 689 } 690 return _dispatch_io_stop(channel); 691 } 692 // Don't close an already closed or stopped channel 693 if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) { 694 return; 695 } 696 _dispatch_retain(channel); 697 dispatch_async(channel->queue, ^{ 698 dispatch_async(channel->barrier_queue, ^{ 699 _dispatch_object_debug(channel, "%s", __func__); 700 _dispatch_fd_debug("io close", channel->fd); 701 if (!(channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED))) { 702 (void)dispatch_atomic_or2o(channel, atomic_flags, DIO_CLOSED, 703 relaxed); 704 dispatch_fd_entry_t fd_entry = channel->fd_entry; 705 if (fd_entry) { 706 if (!fd_entry->path_data) { 707 channel->fd_entry = NULL; 708 } 709 _dispatch_fd_entry_release(fd_entry); 710 } 711 } 712 _dispatch_release(channel); 713 }); 714 }); 715} 716 717void 718dispatch_io_barrier(dispatch_io_t channel, dispatch_block_t barrier) 719{ 720 _dispatch_retain(channel); 721 dispatch_async(channel->queue, ^{ 722 dispatch_queue_t io_q = channel->do_targetq; 723 dispatch_queue_t barrier_queue = channel->barrier_queue; 724 dispatch_group_t barrier_group = channel->barrier_group; 725 dispatch_async(barrier_queue, ^{ 726 dispatch_suspend(barrier_queue); 727 dispatch_group_notify(barrier_group, io_q, ^{ 728 _dispatch_object_debug(channel, "%s", __func__); 729 _dispatch_thread_setspecific(dispatch_io_key, channel); 730 barrier(); 731 _dispatch_thread_setspecific(dispatch_io_key, NULL); 732 dispatch_resume(barrier_queue); 733 _dispatch_release(channel); 734 }); 735 }); 736 }); 737} 738 739void 740dispatch_io_barrier_f(dispatch_io_t channel, void *context, 741 dispatch_function_t barrier) 742{ 743 return dispatch_io_barrier(channel, ^{ barrier(context); }); 744} 745 746void 747dispatch_io_read(dispatch_io_t channel, off_t offset, size_t length, 748 dispatch_queue_t queue, dispatch_io_handler_t handler) 749{ 750 _dispatch_retain(channel); 751 _dispatch_retain(queue); 752 dispatch_async(channel->queue, ^{ 753 dispatch_operation_t op; 754 op = _dispatch_operation_create(DOP_DIR_READ, channel, offset, 755 length, dispatch_data_empty, queue, handler); 756 if (op) { 757 dispatch_queue_t barrier_q = channel->barrier_queue; 758 dispatch_async(barrier_q, ^{ 759 _dispatch_operation_enqueue(op, DOP_DIR_READ, 760 dispatch_data_empty); 761 }); 762 } 763 _dispatch_release(channel); 764 _dispatch_release(queue); 765 }); 766} 767 768void 769dispatch_io_read_f(dispatch_io_t channel, off_t offset, size_t length, 770 dispatch_queue_t queue, void *context, 771 dispatch_io_handler_function_t handler) 772{ 773 return dispatch_io_read(channel, offset, length, queue, 774 ^(bool done, dispatch_data_t d, int error){ 775 handler(context, done, d, error); 776 }); 777} 778 779void 780dispatch_io_write(dispatch_io_t channel, off_t offset, dispatch_data_t data, 781 dispatch_queue_t queue, dispatch_io_handler_t handler) 782{ 783 _dispatch_io_data_retain(data); 784 _dispatch_retain(channel); 785 _dispatch_retain(queue); 786 dispatch_async(channel->queue, ^{ 787 dispatch_operation_t op; 788 op = _dispatch_operation_create(DOP_DIR_WRITE, channel, offset, 789 dispatch_data_get_size(data), data, queue, handler); 790 if (op) { 791 dispatch_queue_t barrier_q = channel->barrier_queue; 792 dispatch_async(barrier_q, ^{ 793 _dispatch_operation_enqueue(op, DOP_DIR_WRITE, data); 794 _dispatch_io_data_release(data); 795 }); 796 } else { 797 _dispatch_io_data_release(data); 798 } 799 _dispatch_release(channel); 800 _dispatch_release(queue); 801 }); 802} 803 804void 805dispatch_io_write_f(dispatch_io_t channel, off_t offset, dispatch_data_t data, 806 dispatch_queue_t queue, void *context, 807 dispatch_io_handler_function_t handler) 808{ 809 return dispatch_io_write(channel, offset, data, queue, 810 ^(bool done, dispatch_data_t d, int error){ 811 handler(context, done, d, error); 812 }); 813} 814 815void 816dispatch_read(dispatch_fd_t fd, size_t length, dispatch_queue_t queue, 817 void (^handler)(dispatch_data_t, int)) 818{ 819 _dispatch_retain(queue); 820 _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) { 821 // On barrier queue 822 if (fd_entry->err) { 823 int err = fd_entry->err; 824 dispatch_async(queue, ^{ 825 _dispatch_fd_debug("convenience handler invoke", fd); 826 handler(dispatch_data_empty, err); 827 }); 828 _dispatch_release(queue); 829 return; 830 } 831 // Safe to access fd_entry on barrier queue 832 dispatch_io_t channel = fd_entry->convenience_channel; 833 if (!channel) { 834 channel = _dispatch_io_create(DISPATCH_IO_STREAM); 835 channel->fd = fd; 836 channel->fd_actual = fd; 837 channel->fd_entry = fd_entry; 838 dispatch_retain(fd_entry->barrier_queue); 839 dispatch_retain(fd_entry->barrier_group); 840 channel->barrier_queue = fd_entry->barrier_queue; 841 channel->barrier_group = fd_entry->barrier_group; 842 fd_entry->convenience_channel = channel; 843 } 844 __block dispatch_data_t deliver_data = dispatch_data_empty; 845 __block int err = 0; 846 dispatch_async(fd_entry->close_queue, ^{ 847 dispatch_async(queue, ^{ 848 _dispatch_fd_debug("convenience handler invoke", fd); 849 handler(deliver_data, err); 850 _dispatch_io_data_release(deliver_data); 851 }); 852 _dispatch_release(queue); 853 }); 854 dispatch_operation_t op = 855 _dispatch_operation_create(DOP_DIR_READ, channel, 0, 856 length, dispatch_data_empty, 857 _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 858 false), ^(bool done, dispatch_data_t data, int error) { 859 if (data) { 860 data = dispatch_data_create_concat(deliver_data, data); 861 _dispatch_io_data_release(deliver_data); 862 deliver_data = data; 863 } 864 if (done) { 865 err = error; 866 } 867 }); 868 if (op) { 869 _dispatch_operation_enqueue(op, DOP_DIR_READ, dispatch_data_empty); 870 } 871 }); 872} 873 874void 875dispatch_read_f(dispatch_fd_t fd, size_t length, dispatch_queue_t queue, 876 void *context, void (*handler)(void *, dispatch_data_t, int)) 877{ 878 return dispatch_read(fd, length, queue, ^(dispatch_data_t d, int error){ 879 handler(context, d, error); 880 }); 881} 882 883void 884dispatch_write(dispatch_fd_t fd, dispatch_data_t data, dispatch_queue_t queue, 885 void (^handler)(dispatch_data_t, int)) 886{ 887 _dispatch_io_data_retain(data); 888 _dispatch_retain(queue); 889 _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) { 890 // On barrier queue 891 if (fd_entry->err) { 892 int err = fd_entry->err; 893 dispatch_async(queue, ^{ 894 _dispatch_fd_debug("convenience handler invoke", fd); 895 handler(NULL, err); 896 }); 897 _dispatch_release(queue); 898 return; 899 } 900 // Safe to access fd_entry on barrier queue 901 dispatch_io_t channel = fd_entry->convenience_channel; 902 if (!channel) { 903 channel = _dispatch_io_create(DISPATCH_IO_STREAM); 904 channel->fd = fd; 905 channel->fd_actual = fd; 906 channel->fd_entry = fd_entry; 907 dispatch_retain(fd_entry->barrier_queue); 908 dispatch_retain(fd_entry->barrier_group); 909 channel->barrier_queue = fd_entry->barrier_queue; 910 channel->barrier_group = fd_entry->barrier_group; 911 fd_entry->convenience_channel = channel; 912 } 913 __block dispatch_data_t deliver_data = NULL; 914 __block int err = 0; 915 dispatch_async(fd_entry->close_queue, ^{ 916 dispatch_async(queue, ^{ 917 _dispatch_fd_debug("convenience handler invoke", fd); 918 handler(deliver_data, err); 919 if (deliver_data) { 920 _dispatch_io_data_release(deliver_data); 921 } 922 }); 923 _dispatch_release(queue); 924 }); 925 dispatch_operation_t op = 926 _dispatch_operation_create(DOP_DIR_WRITE, channel, 0, 927 dispatch_data_get_size(data), data, 928 _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 929 false), ^(bool done, dispatch_data_t d, int error) { 930 if (done) { 931 if (d) { 932 _dispatch_io_data_retain(d); 933 deliver_data = d; 934 } 935 err = error; 936 } 937 }); 938 if (op) { 939 _dispatch_operation_enqueue(op, DOP_DIR_WRITE, data); 940 } 941 _dispatch_io_data_release(data); 942 }); 943} 944 945void 946dispatch_write_f(dispatch_fd_t fd, dispatch_data_t data, dispatch_queue_t queue, 947 void *context, void (*handler)(void *, dispatch_data_t, int)) 948{ 949 return dispatch_write(fd, data, queue, ^(dispatch_data_t d, int error){ 950 handler(context, d, error); 951 }); 952} 953 954#pragma mark - 955#pragma mark dispatch_operation_t 956 957static dispatch_operation_t 958_dispatch_operation_create(dispatch_op_direction_t direction, 959 dispatch_io_t channel, off_t offset, size_t length, 960 dispatch_data_t data, dispatch_queue_t queue, 961 dispatch_io_handler_t handler) 962{ 963 // On channel queue 964 dispatch_assert(direction < DOP_DIR_MAX); 965 _dispatch_fd_debug("operation create", channel->fd); 966#if DISPATCH_IO_DEBUG 967 int fd = channel->fd; 968#endif 969 // Safe to call _dispatch_io_get_error() with channel->fd_entry since 970 // that can only be NULL if atomic_flags are set rdar://problem/8362514 971 int err = _dispatch_io_get_error(NULL, channel, false); 972 if (err || !length) { 973 _dispatch_io_data_retain(data); 974 _dispatch_retain(queue); 975 dispatch_async(channel->barrier_queue, ^{ 976 dispatch_async(queue, ^{ 977 dispatch_data_t d = data; 978 if (direction == DOP_DIR_READ && err) { 979 d = NULL; 980 } else if (direction == DOP_DIR_WRITE && !err) { 981 d = NULL; 982 } 983 _dispatch_fd_debug("IO handler invoke", fd); 984 handler(true, d, err); 985 _dispatch_io_data_release(data); 986 }); 987 _dispatch_release(queue); 988 }); 989 return NULL; 990 } 991 dispatch_operation_t op = _dispatch_alloc(DISPATCH_VTABLE(operation), 992 sizeof(struct dispatch_operation_s)); 993 op->do_next = DISPATCH_OBJECT_LISTLESS; 994 op->do_xref_cnt = -1; // operation object is not exposed externally 995 op->op_q = dispatch_queue_create("com.apple.libdispatch-io.opq", NULL); 996 op->op_q->do_targetq = queue; 997 _dispatch_retain(queue); 998 op->active = false; 999 op->direction = direction; 1000 op->offset = offset + channel->f_ptr; 1001 op->length = length; 1002 op->handler = _dispatch_io_Block_copy(handler); 1003 _dispatch_retain(channel); 1004 op->channel = channel; 1005 op->params = channel->params; 1006 // Take a snapshot of the priority of the channel queue. The actual I/O 1007 // for this operation will be performed at this priority 1008 dispatch_queue_t targetq = op->channel->do_targetq; 1009 while (fastpath(targetq->do_targetq)) { 1010 targetq = targetq->do_targetq; 1011 } 1012 op->do_targetq = targetq; 1013 _dispatch_object_debug(op, "%s", __func__); 1014 return op; 1015} 1016 1017void 1018_dispatch_operation_dispose(dispatch_operation_t op) 1019{ 1020 _dispatch_object_debug(op, "%s", __func__); 1021 // Deliver the data if there's any 1022 if (op->fd_entry) { 1023 _dispatch_operation_deliver_data(op, DOP_DONE); 1024 dispatch_group_leave(op->fd_entry->barrier_group); 1025 _dispatch_fd_entry_release(op->fd_entry); 1026 } 1027 if (op->channel) { 1028 _dispatch_release(op->channel); 1029 } 1030 if (op->timer) { 1031 dispatch_release(op->timer); 1032 } 1033 // For write operations, op->buf is owned by op->buf_data 1034 if (op->buf && op->direction == DOP_DIR_READ) { 1035 free(op->buf); 1036 } 1037 if (op->buf_data) { 1038 _dispatch_io_data_release(op->buf_data); 1039 } 1040 if (op->data) { 1041 _dispatch_io_data_release(op->data); 1042 } 1043 if (op->op_q) { 1044 dispatch_release(op->op_q); 1045 } 1046 Block_release(op->handler); 1047} 1048 1049static void 1050_dispatch_operation_enqueue(dispatch_operation_t op, 1051 dispatch_op_direction_t direction, dispatch_data_t data) 1052{ 1053 // Called from the barrier queue 1054 _dispatch_io_data_retain(data); 1055 // If channel is closed or stopped, then call the handler immediately 1056 int err = _dispatch_io_get_error(NULL, op->channel, false); 1057 if (err) { 1058 dispatch_io_handler_t handler = op->handler; 1059 dispatch_async(op->op_q, ^{ 1060 dispatch_data_t d = data; 1061 if (direction == DOP_DIR_READ && err) { 1062 d = NULL; 1063 } else if (direction == DOP_DIR_WRITE && !err) { 1064 d = NULL; 1065 } 1066 handler(true, d, err); 1067 _dispatch_io_data_release(data); 1068 }); 1069 _dispatch_release(op); 1070 return; 1071 } 1072 // Finish operation init 1073 op->fd_entry = op->channel->fd_entry; 1074 _dispatch_fd_entry_retain(op->fd_entry); 1075 dispatch_group_enter(op->fd_entry->barrier_group); 1076 dispatch_disk_t disk = op->fd_entry->disk; 1077 if (!disk) { 1078 dispatch_stream_t stream = op->fd_entry->streams[direction]; 1079 dispatch_async(stream->dq, ^{ 1080 _dispatch_stream_enqueue_operation(stream, op, data); 1081 _dispatch_io_data_release(data); 1082 }); 1083 } else { 1084 dispatch_async(disk->pick_queue, ^{ 1085 _dispatch_disk_enqueue_operation(disk, op, data); 1086 _dispatch_io_data_release(data); 1087 }); 1088 } 1089} 1090 1091static bool 1092_dispatch_operation_should_enqueue(dispatch_operation_t op, 1093 dispatch_queue_t tq, dispatch_data_t data) 1094{ 1095 // On stream queue or disk queue 1096 _dispatch_fd_debug("enqueue operation", op->fd_entry->fd); 1097 _dispatch_io_data_retain(data); 1098 op->data = data; 1099 int err = _dispatch_io_get_error(op, NULL, true); 1100 if (err) { 1101 op->err = err; 1102 // Final release 1103 _dispatch_release(op); 1104 return false; 1105 } 1106 if (op->params.interval) { 1107 dispatch_resume(_dispatch_operation_timer(tq, op)); 1108 } 1109 return true; 1110} 1111 1112static dispatch_source_t 1113_dispatch_operation_timer(dispatch_queue_t tq, dispatch_operation_t op) 1114{ 1115 // On stream queue or pick queue 1116 if (op->timer) { 1117 return op->timer; 1118 } 1119 dispatch_source_t timer = dispatch_source_create( 1120 DISPATCH_SOURCE_TYPE_TIMER, 0, 0, tq); 1121 dispatch_source_set_timer(timer, dispatch_time(DISPATCH_TIME_NOW, 1122 (int64_t)op->params.interval), op->params.interval, 0); 1123 dispatch_source_set_event_handler(timer, ^{ 1124 // On stream queue or pick queue 1125 if (dispatch_source_testcancel(timer)) { 1126 // Do nothing. The operation has already completed 1127 return; 1128 } 1129 dispatch_op_flags_t flags = DOP_DEFAULT; 1130 if (op->params.interval_flags & DISPATCH_IO_STRICT_INTERVAL) { 1131 // Deliver even if there is less data than the low-water mark 1132 flags |= DOP_DELIVER; 1133 } 1134 // If the operation is active, dont deliver data 1135 if ((op->active) && (flags & DOP_DELIVER)) { 1136 op->flags = flags; 1137 } else { 1138 _dispatch_operation_deliver_data(op, flags); 1139 } 1140 }); 1141 op->timer = timer; 1142 return op->timer; 1143} 1144 1145#pragma mark - 1146#pragma mark dispatch_fd_entry_t 1147 1148#if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD 1149static void 1150_dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry) 1151{ 1152 guardid_t guard = fd_entry; 1153 const unsigned int guard_flags = GUARD_CLOSE; 1154 int err, fd_flags = 0; 1155 _dispatch_io_syscall_switch_noerr(err, 1156 change_fdguard_np(fd_entry->fd, NULL, 0, &guard, guard_flags, 1157 &fd_flags), 1158 case 0: 1159 fd_entry->guard_flags = guard_flags; 1160 fd_entry->orig_fd_flags = fd_flags; 1161 break; 1162 case EPERM: break; 1163 default: (void)dispatch_assume_zero(err); break; 1164 ); 1165} 1166 1167static void 1168_dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry) 1169{ 1170 if (!fd_entry->guard_flags) { 1171 return; 1172 } 1173 guardid_t guard = fd_entry; 1174 int err, fd_flags = fd_entry->orig_fd_flags; 1175 _dispatch_io_syscall_switch(err, 1176 change_fdguard_np(fd_entry->fd, &guard, fd_entry->guard_flags, NULL, 0, 1177 &fd_flags), 1178 default: (void)dispatch_assume_zero(err); break; 1179 ); 1180} 1181#else 1182static inline void 1183_dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry) { (void)fd_entry; } 1184static inline void 1185_dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry) { (void)fd_entry; } 1186#endif // DISPATCH_USE_GUARDED_FD 1187 1188static inline int 1189_dispatch_fd_entry_guarded_open(dispatch_fd_entry_t fd_entry, const char *path, 1190 int oflag, mode_t mode) { 1191#if DISPATCH_USE_GUARDED_FD 1192 guardid_t guard = (uintptr_t)fd_entry; 1193 const unsigned int guard_flags = GUARD_CLOSE | GUARD_DUP | 1194 GUARD_SOCKET_IPC | GUARD_FILEPORT; 1195 int fd = guarded_open_np(path, &guard, guard_flags, oflag | O_CLOEXEC, 1196 mode); 1197 if (fd != -1) { 1198 fd_entry->guard_flags = guard_flags; 1199 return fd; 1200 } 1201 errno = 0; 1202#endif 1203 return open(path, oflag, mode); 1204 (void)fd_entry; 1205} 1206 1207static inline int 1208_dispatch_fd_entry_guarded_close(dispatch_fd_entry_t fd_entry, int fd) { 1209#if DISPATCH_USE_GUARDED_FD 1210 if (fd_entry->guard_flags) { 1211 guardid_t guard = (uintptr_t)fd_entry; 1212 return guarded_close_np(fd, &guard); 1213 } else 1214#endif 1215 { 1216 return close(fd); 1217 } 1218 (void)fd_entry; 1219} 1220 1221static inline void 1222_dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry) { 1223 dispatch_suspend(fd_entry->close_queue); 1224} 1225 1226static inline void 1227_dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry) { 1228 dispatch_resume(fd_entry->close_queue); 1229} 1230 1231static void 1232_dispatch_fd_entry_init_async(dispatch_fd_t fd, 1233 dispatch_fd_entry_init_callback_t completion_callback) 1234{ 1235 static dispatch_once_t _dispatch_io_fds_lockq_pred; 1236 dispatch_once_f(&_dispatch_io_fds_lockq_pred, NULL, 1237 _dispatch_io_fds_lockq_init); 1238 dispatch_async(_dispatch_io_fds_lockq, ^{ 1239 _dispatch_fd_debug("fd entry init", fd); 1240 dispatch_fd_entry_t fd_entry = NULL; 1241 // Check to see if there is an existing entry for the given fd 1242 uintptr_t hash = DIO_HASH(fd); 1243 TAILQ_FOREACH(fd_entry, &_dispatch_io_fds[hash], fd_list) { 1244 if (fd_entry->fd == fd) { 1245 // Retain the fd_entry to ensure it cannot go away until the 1246 // stat() has completed 1247 _dispatch_fd_entry_retain(fd_entry); 1248 break; 1249 } 1250 } 1251 if (!fd_entry) { 1252 // If we did not find an existing entry, create one 1253 fd_entry = _dispatch_fd_entry_create_with_fd(fd, hash); 1254 } 1255 dispatch_async(fd_entry->barrier_queue, ^{ 1256 _dispatch_fd_debug("fd entry init completion", fd); 1257 completion_callback(fd_entry); 1258 // stat() is complete, release reference to fd_entry 1259 _dispatch_fd_entry_release(fd_entry); 1260 }); 1261 }); 1262} 1263 1264static dispatch_fd_entry_t 1265_dispatch_fd_entry_create(dispatch_queue_t q) 1266{ 1267 dispatch_fd_entry_t fd_entry; 1268 fd_entry = _dispatch_calloc(1ul, sizeof(struct dispatch_fd_entry_s)); 1269 fd_entry->close_queue = dispatch_queue_create( 1270 "com.apple.libdispatch-io.closeq", NULL); 1271 // Use target queue to ensure that no concurrent lookups are going on when 1272 // the close queue is running 1273 fd_entry->close_queue->do_targetq = q; 1274 _dispatch_retain(q); 1275 // Suspend the cleanup queue until closing 1276 _dispatch_fd_entry_retain(fd_entry); 1277 return fd_entry; 1278} 1279 1280static dispatch_fd_entry_t 1281_dispatch_fd_entry_create_with_fd(dispatch_fd_t fd, uintptr_t hash) 1282{ 1283 // On fds lock queue 1284 _dispatch_fd_debug("fd entry create", fd); 1285 dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create( 1286 _dispatch_io_fds_lockq); 1287 fd_entry->fd = fd; 1288 TAILQ_INSERT_TAIL(&_dispatch_io_fds[hash], fd_entry, fd_list); 1289 fd_entry->barrier_queue = dispatch_queue_create( 1290 "com.apple.libdispatch-io.barrierq", NULL); 1291 fd_entry->barrier_group = dispatch_group_create(); 1292 dispatch_async(fd_entry->barrier_queue, ^{ 1293 _dispatch_fd_debug("fd entry stat", fd); 1294 int err, orig_flags, orig_nosigpipe = -1; 1295 struct stat st; 1296 _dispatch_io_syscall_switch(err, 1297 fstat(fd, &st), 1298 default: fd_entry->err = err; return; 1299 ); 1300 fd_entry->stat.dev = st.st_dev; 1301 fd_entry->stat.mode = st.st_mode; 1302 _dispatch_fd_entry_guard(fd_entry); 1303 _dispatch_io_syscall_switch(err, 1304 orig_flags = fcntl(fd, F_GETFL), 1305 default: (void)dispatch_assume_zero(err); break; 1306 ); 1307#if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123 1308 if (S_ISFIFO(st.st_mode)) { 1309 _dispatch_io_syscall_switch(err, 1310 orig_nosigpipe = fcntl(fd, F_GETNOSIGPIPE), 1311 default: (void)dispatch_assume_zero(err); break; 1312 ); 1313 if (orig_nosigpipe != -1) { 1314 _dispatch_io_syscall_switch(err, 1315 orig_nosigpipe = fcntl(fd, F_SETNOSIGPIPE, 1), 1316 default: 1317 orig_nosigpipe = -1; 1318 (void)dispatch_assume_zero(err); 1319 break; 1320 ); 1321 } 1322 } 1323#endif 1324 if (S_ISREG(st.st_mode)) { 1325 if (orig_flags != -1) { 1326 _dispatch_io_syscall_switch(err, 1327 fcntl(fd, F_SETFL, orig_flags & ~O_NONBLOCK), 1328 default: 1329 orig_flags = -1; 1330 (void)dispatch_assume_zero(err); 1331 break; 1332 ); 1333 } 1334 int32_t dev = major(st.st_dev); 1335 // We have to get the disk on the global dev queue. The 1336 // barrier queue cannot continue until that is complete 1337 dispatch_suspend(fd_entry->barrier_queue); 1338 dispatch_once_f(&_dispatch_io_devs_lockq_pred, NULL, 1339 _dispatch_io_devs_lockq_init); 1340 dispatch_async(_dispatch_io_devs_lockq, ^{ 1341 _dispatch_disk_init(fd_entry, dev); 1342 dispatch_resume(fd_entry->barrier_queue); 1343 }); 1344 } else { 1345 if (orig_flags != -1) { 1346 _dispatch_io_syscall_switch(err, 1347 fcntl(fd, F_SETFL, orig_flags | O_NONBLOCK), 1348 default: 1349 orig_flags = -1; 1350 (void)dispatch_assume_zero(err); 1351 break; 1352 ); 1353 } 1354 _dispatch_stream_init(fd_entry, _dispatch_get_root_queue( 1355 DISPATCH_QUEUE_PRIORITY_DEFAULT, false)); 1356 } 1357 fd_entry->orig_flags = orig_flags; 1358 fd_entry->orig_nosigpipe = orig_nosigpipe; 1359 }); 1360 // This is the first item run when the close queue is resumed, indicating 1361 // that all channels associated with this entry have been closed and that 1362 // all operations associated with this entry have been freed 1363 dispatch_async(fd_entry->close_queue, ^{ 1364 if (!fd_entry->disk) { 1365 _dispatch_fd_debug("close queue fd_entry cleanup", fd); 1366 dispatch_op_direction_t dir; 1367 for (dir = 0; dir < DOP_DIR_MAX; dir++) { 1368 _dispatch_stream_dispose(fd_entry, dir); 1369 } 1370 } else { 1371 dispatch_disk_t disk = fd_entry->disk; 1372 dispatch_async(_dispatch_io_devs_lockq, ^{ 1373 _dispatch_release(disk); 1374 }); 1375 } 1376 // Remove this entry from the global fd list 1377 TAILQ_REMOVE(&_dispatch_io_fds[hash], fd_entry, fd_list); 1378 }); 1379 // If there was a source associated with this stream, disposing of the 1380 // source cancels it and suspends the close queue. Freeing the fd_entry 1381 // structure must happen after the source cancel handler has finished 1382 dispatch_async(fd_entry->close_queue, ^{ 1383 _dispatch_fd_debug("close queue release", fd); 1384 dispatch_release(fd_entry->close_queue); 1385 _dispatch_fd_debug("barrier queue release", fd); 1386 dispatch_release(fd_entry->barrier_queue); 1387 _dispatch_fd_debug("barrier group release", fd); 1388 dispatch_release(fd_entry->barrier_group); 1389 if (fd_entry->orig_flags != -1) { 1390 _dispatch_io_syscall( 1391 fcntl(fd, F_SETFL, fd_entry->orig_flags) 1392 ); 1393 } 1394#if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123 1395 if (fd_entry->orig_nosigpipe != -1) { 1396 _dispatch_io_syscall( 1397 fcntl(fd, F_SETNOSIGPIPE, fd_entry->orig_nosigpipe) 1398 ); 1399 } 1400#endif 1401 _dispatch_fd_entry_unguard(fd_entry); 1402 if (fd_entry->convenience_channel) { 1403 fd_entry->convenience_channel->fd_entry = NULL; 1404 dispatch_release(fd_entry->convenience_channel); 1405 } 1406 free(fd_entry); 1407 }); 1408 return fd_entry; 1409} 1410 1411static dispatch_fd_entry_t 1412_dispatch_fd_entry_create_with_path(dispatch_io_path_data_t path_data, 1413 dev_t dev, mode_t mode) 1414{ 1415 // On devs lock queue 1416 _dispatch_fd_debug("fd entry create with path %s", -1, path_data->path); 1417 dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create( 1418 path_data->channel->queue); 1419 if (S_ISREG(mode)) { 1420 _dispatch_disk_init(fd_entry, major(dev)); 1421 } else { 1422 _dispatch_stream_init(fd_entry, _dispatch_get_root_queue( 1423 DISPATCH_QUEUE_PRIORITY_DEFAULT, false)); 1424 } 1425 fd_entry->fd = -1; 1426 fd_entry->orig_flags = -1; 1427 fd_entry->path_data = path_data; 1428 fd_entry->stat.dev = dev; 1429 fd_entry->stat.mode = mode; 1430 fd_entry->barrier_queue = dispatch_queue_create( 1431 "com.apple.libdispatch-io.barrierq", NULL); 1432 fd_entry->barrier_group = dispatch_group_create(); 1433 // This is the first item run when the close queue is resumed, indicating 1434 // that the channel associated with this entry has been closed and that 1435 // all operations associated with this entry have been freed 1436 dispatch_async(fd_entry->close_queue, ^{ 1437 _dispatch_fd_debug("close queue fd_entry cleanup", -1); 1438 if (!fd_entry->disk) { 1439 dispatch_op_direction_t dir; 1440 for (dir = 0; dir < DOP_DIR_MAX; dir++) { 1441 _dispatch_stream_dispose(fd_entry, dir); 1442 } 1443 } 1444 if (fd_entry->fd != -1) { 1445 _dispatch_fd_entry_guarded_close(fd_entry, fd_entry->fd); 1446 } 1447 if (fd_entry->path_data->channel) { 1448 // If associated channel has not been released yet, mark it as 1449 // no longer having an fd_entry (for stop after close). 1450 // It is safe to modify channel since we are on close_queue with 1451 // target queue the channel queue 1452 fd_entry->path_data->channel->fd_entry = NULL; 1453 } 1454 }); 1455 dispatch_async(fd_entry->close_queue, ^{ 1456 _dispatch_fd_debug("close queue release", -1); 1457 dispatch_release(fd_entry->close_queue); 1458 dispatch_release(fd_entry->barrier_queue); 1459 dispatch_release(fd_entry->barrier_group); 1460 free(fd_entry->path_data); 1461 free(fd_entry); 1462 }); 1463 return fd_entry; 1464} 1465 1466static int 1467_dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry, dispatch_io_t channel) 1468{ 1469 if (!(fd_entry->fd == -1 && fd_entry->path_data)) { 1470 return 0; 1471 } 1472 if (fd_entry->err) { 1473 return fd_entry->err; 1474 } 1475 int fd = -1; 1476 int oflag = fd_entry->disk ? fd_entry->path_data->oflag & ~O_NONBLOCK : 1477 fd_entry->path_data->oflag | O_NONBLOCK; 1478open: 1479 fd = _dispatch_fd_entry_guarded_open(fd_entry, fd_entry->path_data->path, 1480 oflag, fd_entry->path_data->mode); 1481 if (fd == -1) { 1482 int err = errno; 1483 if (err == EINTR) { 1484 goto open; 1485 } 1486 (void)dispatch_atomic_cmpxchg2o(fd_entry, err, 0, err, relaxed); 1487 return err; 1488 } 1489 if (!dispatch_atomic_cmpxchg2o(fd_entry, fd, -1, fd, relaxed)) { 1490 // Lost the race with another open 1491 _dispatch_fd_entry_guarded_close(fd_entry, fd); 1492 } else { 1493 channel->fd_actual = fd; 1494 } 1495 _dispatch_object_debug(channel, "%s", __func__); 1496 return 0; 1497} 1498 1499static void 1500_dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry, 1501 dispatch_io_t channel) 1502{ 1503 if (fd_entry->disk) { 1504 if (channel) { 1505 _dispatch_retain(channel); 1506 } 1507 _dispatch_fd_entry_retain(fd_entry); 1508 dispatch_async(fd_entry->disk->pick_queue, ^{ 1509 _dispatch_disk_cleanup_operations(fd_entry->disk, channel); 1510 _dispatch_fd_entry_release(fd_entry); 1511 if (channel) { 1512 _dispatch_release(channel); 1513 } 1514 }); 1515 } else { 1516 dispatch_op_direction_t direction; 1517 for (direction = 0; direction < DOP_DIR_MAX; direction++) { 1518 dispatch_stream_t stream = fd_entry->streams[direction]; 1519 if (!stream) { 1520 continue; 1521 } 1522 if (channel) { 1523 _dispatch_retain(channel); 1524 } 1525 _dispatch_fd_entry_retain(fd_entry); 1526 dispatch_async(stream->dq, ^{ 1527 _dispatch_stream_cleanup_operations(stream, channel); 1528 _dispatch_fd_entry_release(fd_entry); 1529 if (channel) { 1530 _dispatch_release(channel); 1531 } 1532 }); 1533 } 1534 } 1535} 1536 1537#pragma mark - 1538#pragma mark dispatch_stream_t/dispatch_disk_t 1539 1540static void 1541_dispatch_stream_init(dispatch_fd_entry_t fd_entry, dispatch_queue_t tq) 1542{ 1543 dispatch_op_direction_t direction; 1544 for (direction = 0; direction < DOP_DIR_MAX; direction++) { 1545 dispatch_stream_t stream; 1546 stream = _dispatch_calloc(1ul, sizeof(struct dispatch_stream_s)); 1547 stream->dq = dispatch_queue_create("com.apple.libdispatch-io.streamq", 1548 NULL); 1549 dispatch_set_context(stream->dq, stream); 1550 _dispatch_retain(tq); 1551 stream->dq->do_targetq = tq; 1552 TAILQ_INIT(&stream->operations[DISPATCH_IO_RANDOM]); 1553 TAILQ_INIT(&stream->operations[DISPATCH_IO_STREAM]); 1554 fd_entry->streams[direction] = stream; 1555 } 1556} 1557 1558static void 1559_dispatch_stream_dispose(dispatch_fd_entry_t fd_entry, 1560 dispatch_op_direction_t direction) 1561{ 1562 // On close queue 1563 dispatch_stream_t stream = fd_entry->streams[direction]; 1564 if (!stream) { 1565 return; 1566 } 1567 dispatch_assert(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM])); 1568 dispatch_assert(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])); 1569 if (stream->source) { 1570 // Balanced by source cancel handler: 1571 _dispatch_fd_entry_retain(fd_entry); 1572 dispatch_source_cancel(stream->source); 1573 dispatch_resume(stream->source); 1574 dispatch_release(stream->source); 1575 } 1576 dispatch_set_context(stream->dq, NULL); 1577 dispatch_release(stream->dq); 1578 free(stream); 1579} 1580 1581static void 1582_dispatch_disk_init(dispatch_fd_entry_t fd_entry, dev_t dev) 1583{ 1584 // On devs lock queue 1585 dispatch_disk_t disk; 1586 // Check to see if there is an existing entry for the given device 1587 uintptr_t hash = DIO_HASH(dev); 1588 TAILQ_FOREACH(disk, &_dispatch_io_devs[hash], disk_list) { 1589 if (disk->dev == dev) { 1590 _dispatch_retain(disk); 1591 goto out; 1592 } 1593 } 1594 // Otherwise create a new entry 1595 size_t pending_reqs_depth = dispatch_io_defaults.max_pending_io_reqs; 1596 disk = _dispatch_alloc(DISPATCH_VTABLE(disk), 1597 sizeof(struct dispatch_disk_s) + 1598 (pending_reqs_depth * sizeof(dispatch_operation_t))); 1599 disk->do_next = DISPATCH_OBJECT_LISTLESS; 1600 disk->do_xref_cnt = -1; 1601 disk->advise_list_depth = pending_reqs_depth; 1602 disk->do_targetq = _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 1603 false); 1604 disk->dev = dev; 1605 TAILQ_INIT(&disk->operations); 1606 disk->cur_rq = TAILQ_FIRST(&disk->operations); 1607 char label[45]; 1608 snprintf(label, sizeof(label), "com.apple.libdispatch-io.deviceq.%d", dev); 1609 disk->pick_queue = dispatch_queue_create(label, NULL); 1610 TAILQ_INSERT_TAIL(&_dispatch_io_devs[hash], disk, disk_list); 1611out: 1612 fd_entry->disk = disk; 1613 TAILQ_INIT(&fd_entry->stream_ops); 1614} 1615 1616void 1617_dispatch_disk_dispose(dispatch_disk_t disk) 1618{ 1619 uintptr_t hash = DIO_HASH(disk->dev); 1620 TAILQ_REMOVE(&_dispatch_io_devs[hash], disk, disk_list); 1621 dispatch_assert(TAILQ_EMPTY(&disk->operations)); 1622 size_t i; 1623 for (i=0; i<disk->advise_list_depth; ++i) { 1624 dispatch_assert(!disk->advise_list[i]); 1625 } 1626 dispatch_release(disk->pick_queue); 1627} 1628 1629#pragma mark - 1630#pragma mark dispatch_stream_operations/dispatch_disk_operations 1631 1632static inline bool 1633_dispatch_stream_operation_avail(dispatch_stream_t stream) 1634{ 1635 return !(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])) || 1636 !(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM])); 1637} 1638 1639static void 1640_dispatch_stream_enqueue_operation(dispatch_stream_t stream, 1641 dispatch_operation_t op, dispatch_data_t data) 1642{ 1643 if (!_dispatch_operation_should_enqueue(op, stream->dq, data)) { 1644 return; 1645 } 1646 _dispatch_object_debug(op, "%s", __func__); 1647 bool no_ops = !_dispatch_stream_operation_avail(stream); 1648 TAILQ_INSERT_TAIL(&stream->operations[op->params.type], op, operation_list); 1649 if (no_ops) { 1650 dispatch_async_f(stream->dq, stream->dq, 1651 _dispatch_stream_queue_handler); 1652 } 1653} 1654 1655static void 1656_dispatch_disk_enqueue_operation(dispatch_disk_t disk, dispatch_operation_t op, 1657 dispatch_data_t data) 1658{ 1659 if (!_dispatch_operation_should_enqueue(op, disk->pick_queue, data)) { 1660 return; 1661 } 1662 _dispatch_object_debug(op, "%s", __func__); 1663 if (op->params.type == DISPATCH_IO_STREAM) { 1664 if (TAILQ_EMPTY(&op->fd_entry->stream_ops)) { 1665 TAILQ_INSERT_TAIL(&disk->operations, op, operation_list); 1666 } 1667 TAILQ_INSERT_TAIL(&op->fd_entry->stream_ops, op, stream_list); 1668 } else { 1669 TAILQ_INSERT_TAIL(&disk->operations, op, operation_list); 1670 } 1671 _dispatch_disk_handler(disk); 1672} 1673 1674static void 1675_dispatch_stream_complete_operation(dispatch_stream_t stream, 1676 dispatch_operation_t op) 1677{ 1678 // On stream queue 1679 _dispatch_object_debug(op, "%s", __func__); 1680 _dispatch_fd_debug("complete operation", op->fd_entry->fd); 1681 TAILQ_REMOVE(&stream->operations[op->params.type], op, operation_list); 1682 if (op == stream->op) { 1683 stream->op = NULL; 1684 } 1685 if (op->timer) { 1686 dispatch_source_cancel(op->timer); 1687 } 1688 // Final release will deliver any pending data 1689 _dispatch_release(op); 1690} 1691 1692static void 1693_dispatch_disk_complete_operation(dispatch_disk_t disk, dispatch_operation_t op) 1694{ 1695 // On pick queue 1696 _dispatch_object_debug(op, "%s", __func__); 1697 _dispatch_fd_debug("complete operation", op->fd_entry->fd); 1698 // Current request is always the last op returned 1699 if (disk->cur_rq == op) { 1700 disk->cur_rq = TAILQ_PREV(op, dispatch_disk_operations_s, 1701 operation_list); 1702 } 1703 if (op->params.type == DISPATCH_IO_STREAM) { 1704 // Check if there are other pending stream operations behind it 1705 dispatch_operation_t op_next = TAILQ_NEXT(op, stream_list); 1706 TAILQ_REMOVE(&op->fd_entry->stream_ops, op, stream_list); 1707 if (op_next) { 1708 TAILQ_INSERT_TAIL(&disk->operations, op_next, operation_list); 1709 } 1710 } 1711 TAILQ_REMOVE(&disk->operations, op, operation_list); 1712 if (op->timer) { 1713 dispatch_source_cancel(op->timer); 1714 } 1715 // Final release will deliver any pending data 1716 _dispatch_release(op); 1717} 1718 1719static dispatch_operation_t 1720_dispatch_stream_pick_next_operation(dispatch_stream_t stream, 1721 dispatch_operation_t op) 1722{ 1723 // On stream queue 1724 if (!op) { 1725 // On the first run through, pick the first operation 1726 if (!_dispatch_stream_operation_avail(stream)) { 1727 return op; 1728 } 1729 if (!TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM])) { 1730 op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_STREAM]); 1731 } else if (!TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])) { 1732 op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]); 1733 } 1734 return op; 1735 } 1736 if (op->params.type == DISPATCH_IO_STREAM) { 1737 // Stream operations need to be serialized so continue the current 1738 // operation until it is finished 1739 return op; 1740 } 1741 // Get the next random operation (round-robin) 1742 if (op->params.type == DISPATCH_IO_RANDOM) { 1743 op = TAILQ_NEXT(op, operation_list); 1744 if (!op) { 1745 op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]); 1746 } 1747 return op; 1748 } 1749 return NULL; 1750} 1751 1752static dispatch_operation_t 1753_dispatch_disk_pick_next_operation(dispatch_disk_t disk) 1754{ 1755 // On pick queue 1756 dispatch_operation_t op; 1757 if (!TAILQ_EMPTY(&disk->operations)) { 1758 if (disk->cur_rq == NULL) { 1759 op = TAILQ_FIRST(&disk->operations); 1760 } else { 1761 op = disk->cur_rq; 1762 do { 1763 op = TAILQ_NEXT(op, operation_list); 1764 if (!op) { 1765 op = TAILQ_FIRST(&disk->operations); 1766 } 1767 // TODO: more involved picking algorithm rdar://problem/8780312 1768 } while (op->active && op != disk->cur_rq); 1769 } 1770 if (!op->active) { 1771 disk->cur_rq = op; 1772 return op; 1773 } 1774 } 1775 return NULL; 1776} 1777 1778static void 1779_dispatch_stream_cleanup_operations(dispatch_stream_t stream, 1780 dispatch_io_t channel) 1781{ 1782 // On stream queue 1783 dispatch_operation_t op, tmp; 1784 typeof(*stream->operations) *operations; 1785 operations = &stream->operations[DISPATCH_IO_RANDOM]; 1786 TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) { 1787 if (!channel || op->channel == channel) { 1788 _dispatch_stream_complete_operation(stream, op); 1789 } 1790 } 1791 operations = &stream->operations[DISPATCH_IO_STREAM]; 1792 TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) { 1793 if (!channel || op->channel == channel) { 1794 _dispatch_stream_complete_operation(stream, op); 1795 } 1796 } 1797 if (stream->source_running && !_dispatch_stream_operation_avail(stream)) { 1798 dispatch_suspend(stream->source); 1799 stream->source_running = false; 1800 } 1801} 1802 1803static void 1804_dispatch_disk_cleanup_operations(dispatch_disk_t disk, dispatch_io_t channel) 1805{ 1806 // On pick queue 1807 dispatch_operation_t op, tmp; 1808 TAILQ_FOREACH_SAFE(op, &disk->operations, operation_list, tmp) { 1809 if (!channel || op->channel == channel) { 1810 _dispatch_disk_complete_operation(disk, op); 1811 } 1812 } 1813} 1814 1815#pragma mark - 1816#pragma mark dispatch_stream_handler/dispatch_disk_handler 1817 1818static dispatch_source_t 1819_dispatch_stream_source(dispatch_stream_t stream, dispatch_operation_t op) 1820{ 1821 // On stream queue 1822 if (stream->source) { 1823 return stream->source; 1824 } 1825 dispatch_fd_t fd = op->fd_entry->fd; 1826 _dispatch_fd_debug("stream source create", fd); 1827 dispatch_source_t source = NULL; 1828 if (op->direction == DOP_DIR_READ) { 1829 source = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, 1830 (uintptr_t)fd, 0, stream->dq); 1831 } else if (op->direction == DOP_DIR_WRITE) { 1832 source = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, 1833 (uintptr_t)fd, 0, stream->dq); 1834 } else { 1835 dispatch_assert(op->direction < DOP_DIR_MAX); 1836 return NULL; 1837 } 1838 dispatch_set_context(source, stream); 1839 dispatch_source_set_event_handler_f(source, 1840 _dispatch_stream_source_handler); 1841 // Close queue must not run user cleanup handlers until sources are fully 1842 // unregistered 1843 dispatch_queue_t close_queue = op->fd_entry->close_queue; 1844 dispatch_source_set_cancel_handler(source, ^{ 1845 _dispatch_fd_debug("stream source cancel", fd); 1846 dispatch_resume(close_queue); 1847 }); 1848 stream->source = source; 1849 return stream->source; 1850} 1851 1852static void 1853_dispatch_stream_source_handler(void *ctx) 1854{ 1855 // On stream queue 1856 dispatch_stream_t stream = (dispatch_stream_t)ctx; 1857 dispatch_suspend(stream->source); 1858 stream->source_running = false; 1859 return _dispatch_stream_handler(stream); 1860} 1861 1862static void 1863_dispatch_stream_queue_handler(void *ctx) 1864{ 1865 // On stream queue 1866 dispatch_stream_t stream = (dispatch_stream_t)dispatch_get_context(ctx); 1867 if (!stream) { 1868 // _dispatch_stream_dispose has been called 1869 return; 1870 } 1871 return _dispatch_stream_handler(stream); 1872} 1873 1874static void 1875_dispatch_stream_handler(void *ctx) 1876{ 1877 // On stream queue 1878 dispatch_stream_t stream = (dispatch_stream_t)ctx; 1879 dispatch_operation_t op; 1880pick: 1881 op = _dispatch_stream_pick_next_operation(stream, stream->op); 1882 if (!op) { 1883 _dispatch_debug("no operation found: stream %p", stream); 1884 return; 1885 } 1886 int err = _dispatch_io_get_error(op, NULL, true); 1887 if (err) { 1888 op->err = err; 1889 _dispatch_stream_complete_operation(stream, op); 1890 goto pick; 1891 } 1892 stream->op = op; 1893 _dispatch_fd_debug("stream handler", op->fd_entry->fd); 1894 dispatch_fd_entry_t fd_entry = op->fd_entry; 1895 _dispatch_fd_entry_retain(fd_entry); 1896 // For performance analysis 1897 if (!op->total && dispatch_io_defaults.initial_delivery) { 1898 // Empty delivery to signal the start of the operation 1899 _dispatch_fd_debug("initial delivery", op->fd_entry->fd); 1900 _dispatch_operation_deliver_data(op, DOP_DELIVER); 1901 } 1902 // TODO: perform on the operation target queue to get correct priority 1903 int result = _dispatch_operation_perform(op); 1904 dispatch_op_flags_t flags = ~0u; 1905 switch (result) { 1906 case DISPATCH_OP_DELIVER: 1907 flags = DOP_DEFAULT; 1908 // Fall through 1909 case DISPATCH_OP_DELIVER_AND_COMPLETE: 1910 flags = (flags != DOP_DEFAULT) ? DOP_DELIVER | DOP_NO_EMPTY : 1911 DOP_DEFAULT; 1912 _dispatch_operation_deliver_data(op, flags); 1913 // Fall through 1914 case DISPATCH_OP_COMPLETE: 1915 if (flags != DOP_DEFAULT) { 1916 _dispatch_stream_complete_operation(stream, op); 1917 } 1918 if (_dispatch_stream_operation_avail(stream)) { 1919 dispatch_async_f(stream->dq, stream->dq, 1920 _dispatch_stream_queue_handler); 1921 } 1922 break; 1923 case DISPATCH_OP_COMPLETE_RESUME: 1924 _dispatch_stream_complete_operation(stream, op); 1925 // Fall through 1926 case DISPATCH_OP_RESUME: 1927 if (_dispatch_stream_operation_avail(stream)) { 1928 stream->source_running = true; 1929 dispatch_resume(_dispatch_stream_source(stream, op)); 1930 } 1931 break; 1932 case DISPATCH_OP_ERR: 1933 _dispatch_stream_cleanup_operations(stream, op->channel); 1934 break; 1935 case DISPATCH_OP_FD_ERR: 1936 _dispatch_fd_entry_retain(fd_entry); 1937 dispatch_async(fd_entry->barrier_queue, ^{ 1938 _dispatch_fd_entry_cleanup_operations(fd_entry, NULL); 1939 _dispatch_fd_entry_release(fd_entry); 1940 }); 1941 break; 1942 default: 1943 break; 1944 } 1945 _dispatch_fd_entry_release(fd_entry); 1946 return; 1947} 1948 1949static void 1950_dispatch_disk_handler(void *ctx) 1951{ 1952 // On pick queue 1953 dispatch_disk_t disk = (dispatch_disk_t)ctx; 1954 if (disk->io_active) { 1955 return; 1956 } 1957 _dispatch_fd_debug("disk handler", -1); 1958 dispatch_operation_t op; 1959 size_t i = disk->free_idx, j = disk->req_idx; 1960 if (j <= i) { 1961 j += disk->advise_list_depth; 1962 } 1963 while (i <= j) { 1964 if ((!disk->advise_list[i%disk->advise_list_depth]) && 1965 (op = _dispatch_disk_pick_next_operation(disk))) { 1966 int err = _dispatch_io_get_error(op, NULL, true); 1967 if (err) { 1968 op->err = err; 1969 _dispatch_disk_complete_operation(disk, op); 1970 continue; 1971 } 1972 _dispatch_retain(op); 1973 disk->advise_list[i%disk->advise_list_depth] = op; 1974 op->active = true; 1975 _dispatch_object_debug(op, "%s", __func__); 1976 } else { 1977 // No more operations to get 1978 break; 1979 } 1980 i++; 1981 } 1982 disk->free_idx = (i%disk->advise_list_depth); 1983 op = disk->advise_list[disk->req_idx]; 1984 if (op) { 1985 disk->io_active = true; 1986 dispatch_async_f(op->do_targetq, disk, _dispatch_disk_perform); 1987 } 1988} 1989 1990static void 1991_dispatch_disk_perform(void *ctxt) 1992{ 1993 dispatch_disk_t disk = ctxt; 1994 size_t chunk_size = dispatch_io_defaults.chunk_pages * PAGE_SIZE; 1995 _dispatch_fd_debug("disk perform", -1); 1996 dispatch_operation_t op; 1997 size_t i = disk->advise_idx, j = disk->free_idx; 1998 if (j <= i) { 1999 j += disk->advise_list_depth; 2000 } 2001 do { 2002 op = disk->advise_list[i%disk->advise_list_depth]; 2003 if (!op) { 2004 // Nothing more to advise, must be at free_idx 2005 dispatch_assert(i%disk->advise_list_depth == disk->free_idx); 2006 break; 2007 } 2008 if (op->direction == DOP_DIR_WRITE) { 2009 // TODO: preallocate writes ? rdar://problem/9032172 2010 continue; 2011 } 2012 if (op->fd_entry->fd == -1 && _dispatch_fd_entry_open(op->fd_entry, 2013 op->channel)) { 2014 continue; 2015 } 2016 // For performance analysis 2017 if (!op->total && dispatch_io_defaults.initial_delivery) { 2018 // Empty delivery to signal the start of the operation 2019 _dispatch_fd_debug("initial delivery", op->fd_entry->fd); 2020 _dispatch_operation_deliver_data(op, DOP_DELIVER); 2021 } 2022 // Advise two chunks if the list only has one element and this is the 2023 // first advise on the operation 2024 if ((j-i) == 1 && !disk->advise_list[disk->free_idx] && 2025 !op->advise_offset) { 2026 chunk_size *= 2; 2027 } 2028 _dispatch_operation_advise(op, chunk_size); 2029 } while (++i < j); 2030 disk->advise_idx = i%disk->advise_list_depth; 2031 op = disk->advise_list[disk->req_idx]; 2032 int result = _dispatch_operation_perform(op); 2033 disk->advise_list[disk->req_idx] = NULL; 2034 disk->req_idx = (++disk->req_idx)%disk->advise_list_depth; 2035 dispatch_async(disk->pick_queue, ^{ 2036 switch (result) { 2037 case DISPATCH_OP_DELIVER: 2038 _dispatch_operation_deliver_data(op, DOP_DEFAULT); 2039 break; 2040 case DISPATCH_OP_COMPLETE: 2041 _dispatch_disk_complete_operation(disk, op); 2042 break; 2043 case DISPATCH_OP_DELIVER_AND_COMPLETE: 2044 _dispatch_operation_deliver_data(op, DOP_DELIVER | DOP_NO_EMPTY); 2045 _dispatch_disk_complete_operation(disk, op); 2046 break; 2047 case DISPATCH_OP_ERR: 2048 _dispatch_disk_cleanup_operations(disk, op->channel); 2049 break; 2050 case DISPATCH_OP_FD_ERR: 2051 _dispatch_disk_cleanup_operations(disk, NULL); 2052 break; 2053 default: 2054 dispatch_assert(result); 2055 break; 2056 } 2057 op->active = false; 2058 disk->io_active = false; 2059 _dispatch_disk_handler(disk); 2060 // Balancing the retain in _dispatch_disk_handler. Note that op must be 2061 // released at the very end, since it might hold the last reference to 2062 // the disk 2063 _dispatch_release(op); 2064 }); 2065} 2066 2067#pragma mark - 2068#pragma mark dispatch_operation_perform 2069 2070static void 2071_dispatch_operation_advise(dispatch_operation_t op, size_t chunk_size) 2072{ 2073 int err; 2074 struct radvisory advise; 2075 // No point in issuing a read advise for the next chunk if we are already 2076 // a chunk ahead from reading the bytes 2077 if (op->advise_offset > (off_t)(((size_t)op->offset + op->total) + 2078 chunk_size + PAGE_SIZE)) { 2079 return; 2080 } 2081 _dispatch_object_debug(op, "%s", __func__); 2082 advise.ra_count = (int)chunk_size; 2083 if (!op->advise_offset) { 2084 op->advise_offset = op->offset; 2085 // If this is the first time through, align the advised range to a 2086 // page boundary 2087 size_t pg_fraction = ((size_t)op->offset + chunk_size) % PAGE_SIZE; 2088 advise.ra_count += (int)(pg_fraction ? PAGE_SIZE - pg_fraction : 0); 2089 } 2090 advise.ra_offset = op->advise_offset; 2091 op->advise_offset += advise.ra_count; 2092 _dispatch_io_syscall_switch(err, 2093 fcntl(op->fd_entry->fd, F_RDADVISE, &advise), 2094 case EFBIG: break; // advised past the end of the file rdar://10415691 2095 case ENOTSUP: break; // not all FS support radvise rdar://13484629 2096 // TODO: set disk status on error 2097 default: (void)dispatch_assume_zero(err); break; 2098 ); 2099} 2100 2101static int 2102_dispatch_operation_perform(dispatch_operation_t op) 2103{ 2104 int err = _dispatch_io_get_error(op, NULL, true); 2105 if (err) { 2106 goto error; 2107 } 2108 _dispatch_object_debug(op, "%s", __func__); 2109 if (!op->buf) { 2110 size_t max_buf_siz = op->params.high; 2111 size_t chunk_siz = dispatch_io_defaults.chunk_pages * PAGE_SIZE; 2112 if (op->direction == DOP_DIR_READ) { 2113 // If necessary, create a buffer for the ongoing operation, large 2114 // enough to fit chunk_pages but at most high-water 2115 size_t data_siz = dispatch_data_get_size(op->data); 2116 if (data_siz) { 2117 dispatch_assert(data_siz < max_buf_siz); 2118 max_buf_siz -= data_siz; 2119 } 2120 if (max_buf_siz > chunk_siz) { 2121 max_buf_siz = chunk_siz; 2122 } 2123 if (op->length < SIZE_MAX) { 2124 op->buf_siz = op->length - op->total; 2125 if (op->buf_siz > max_buf_siz) { 2126 op->buf_siz = max_buf_siz; 2127 } 2128 } else { 2129 op->buf_siz = max_buf_siz; 2130 } 2131 op->buf = valloc(op->buf_siz); 2132 _dispatch_fd_debug("buffer allocated", op->fd_entry->fd); 2133 } else if (op->direction == DOP_DIR_WRITE) { 2134 // Always write the first data piece, if that is smaller than a 2135 // chunk, accumulate further data pieces until chunk size is reached 2136 if (chunk_siz > max_buf_siz) { 2137 chunk_siz = max_buf_siz; 2138 } 2139 op->buf_siz = 0; 2140 dispatch_data_apply(op->data, 2141 ^(dispatch_data_t region DISPATCH_UNUSED, 2142 size_t offset DISPATCH_UNUSED, 2143 const void* buf DISPATCH_UNUSED, size_t len) { 2144 size_t siz = op->buf_siz + len; 2145 if (!op->buf_siz || siz <= chunk_siz) { 2146 op->buf_siz = siz; 2147 } 2148 return (bool)(siz < chunk_siz); 2149 }); 2150 if (op->buf_siz > max_buf_siz) { 2151 op->buf_siz = max_buf_siz; 2152 } 2153 dispatch_data_t d; 2154 d = dispatch_data_create_subrange(op->data, 0, op->buf_siz); 2155 op->buf_data = dispatch_data_create_map(d, (const void**)&op->buf, 2156 NULL); 2157 _dispatch_io_data_release(d); 2158 _dispatch_fd_debug("buffer mapped", op->fd_entry->fd); 2159 } 2160 } 2161 if (op->fd_entry->fd == -1) { 2162 err = _dispatch_fd_entry_open(op->fd_entry, op->channel); 2163 if (err) { 2164 goto error; 2165 } 2166 } 2167 void *buf = op->buf + op->buf_len; 2168 size_t len = op->buf_siz - op->buf_len; 2169 off_t off = (off_t)((size_t)op->offset + op->total); 2170 ssize_t processed = -1; 2171syscall: 2172 if (op->direction == DOP_DIR_READ) { 2173 if (op->params.type == DISPATCH_IO_STREAM) { 2174 processed = read(op->fd_entry->fd, buf, len); 2175 } else if (op->params.type == DISPATCH_IO_RANDOM) { 2176 processed = pread(op->fd_entry->fd, buf, len, off); 2177 } 2178 } else if (op->direction == DOP_DIR_WRITE) { 2179 if (op->params.type == DISPATCH_IO_STREAM) { 2180 processed = write(op->fd_entry->fd, buf, len); 2181 } else if (op->params.type == DISPATCH_IO_RANDOM) { 2182 processed = pwrite(op->fd_entry->fd, buf, len, off); 2183 } 2184 } 2185 // Encountered an error on the file descriptor 2186 if (processed == -1) { 2187 err = errno; 2188 if (err == EINTR) { 2189 goto syscall; 2190 } 2191 goto error; 2192 } 2193 // EOF is indicated by two handler invocations 2194 if (processed == 0) { 2195 _dispatch_fd_debug("EOF", op->fd_entry->fd); 2196 return DISPATCH_OP_DELIVER_AND_COMPLETE; 2197 } 2198 op->buf_len += (size_t)processed; 2199 op->total += (size_t)processed; 2200 if (op->total == op->length) { 2201 // Finished processing all the bytes requested by the operation 2202 return DISPATCH_OP_COMPLETE; 2203 } else { 2204 // Deliver data only if we satisfy the filters 2205 return DISPATCH_OP_DELIVER; 2206 } 2207error: 2208 if (err == EAGAIN) { 2209 // For disk based files with blocking I/O we should never get EAGAIN 2210 dispatch_assert(!op->fd_entry->disk); 2211 _dispatch_fd_debug("EAGAIN %d", op->fd_entry->fd, err); 2212 if (op->direction == DOP_DIR_READ && op->total && 2213 op->channel == op->fd_entry->convenience_channel) { 2214 // Convenience read with available data completes on EAGAIN 2215 return DISPATCH_OP_COMPLETE_RESUME; 2216 } 2217 return DISPATCH_OP_RESUME; 2218 } 2219 op->err = err; 2220 switch (err) { 2221 case ECANCELED: 2222 return DISPATCH_OP_ERR; 2223 case EBADF: 2224 (void)dispatch_atomic_cmpxchg2o(op->fd_entry, err, 0, err, relaxed); 2225 return DISPATCH_OP_FD_ERR; 2226 default: 2227 return DISPATCH_OP_COMPLETE; 2228 } 2229} 2230 2231static void 2232_dispatch_operation_deliver_data(dispatch_operation_t op, 2233 dispatch_op_flags_t flags) 2234{ 2235 // Either called from stream resp. pick queue or when op is finalized 2236 dispatch_data_t data = NULL; 2237 int err = 0; 2238 size_t undelivered = op->undelivered + op->buf_len; 2239 bool deliver = (flags & (DOP_DELIVER|DOP_DONE)) || 2240 (op->flags & DOP_DELIVER); 2241 op->flags = DOP_DEFAULT; 2242 if (!deliver) { 2243 // Don't deliver data until low water mark has been reached 2244 if (undelivered >= op->params.low) { 2245 deliver = true; 2246 } else if (op->buf_len < op->buf_siz) { 2247 // Request buffer is not yet used up 2248 _dispatch_fd_debug("buffer data", op->fd_entry->fd); 2249 return; 2250 } 2251 } else { 2252 err = op->err; 2253 if (!err && (op->channel->atomic_flags & DIO_STOPPED)) { 2254 err = ECANCELED; 2255 op->err = err; 2256 } 2257 } 2258 // Deliver data or buffer used up 2259 if (op->direction == DOP_DIR_READ) { 2260 if (op->buf_len) { 2261 void *buf = op->buf; 2262 data = dispatch_data_create(buf, op->buf_len, NULL, 2263 DISPATCH_DATA_DESTRUCTOR_FREE); 2264 op->buf = NULL; 2265 op->buf_len = 0; 2266 dispatch_data_t d = dispatch_data_create_concat(op->data, data); 2267 _dispatch_io_data_release(op->data); 2268 _dispatch_io_data_release(data); 2269 data = d; 2270 } else { 2271 data = op->data; 2272 } 2273 op->data = deliver ? dispatch_data_empty : data; 2274 } else if (op->direction == DOP_DIR_WRITE) { 2275 if (deliver) { 2276 data = dispatch_data_create_subrange(op->data, op->buf_len, 2277 op->length); 2278 } 2279 if (op->buf_data && op->buf_len == op->buf_siz) { 2280 _dispatch_io_data_release(op->buf_data); 2281 op->buf_data = NULL; 2282 op->buf = NULL; 2283 op->buf_len = 0; 2284 // Trim newly written buffer from head of unwritten data 2285 dispatch_data_t d; 2286 if (deliver) { 2287 _dispatch_io_data_retain(data); 2288 d = data; 2289 } else { 2290 d = dispatch_data_create_subrange(op->data, op->buf_siz, 2291 op->length); 2292 } 2293 _dispatch_io_data_release(op->data); 2294 op->data = d; 2295 } 2296 } else { 2297 dispatch_assert(op->direction < DOP_DIR_MAX); 2298 return; 2299 } 2300 if (!deliver || ((flags & DOP_NO_EMPTY) && !dispatch_data_get_size(data))) { 2301 op->undelivered = undelivered; 2302 _dispatch_fd_debug("buffer data", op->fd_entry->fd); 2303 return; 2304 } 2305 op->undelivered = 0; 2306 _dispatch_object_debug(op, "%s", __func__); 2307 _dispatch_fd_debug("deliver data", op->fd_entry->fd); 2308 dispatch_op_direction_t direction = op->direction; 2309 dispatch_io_handler_t handler = op->handler; 2310#if DISPATCH_IO_DEBUG 2311 int fd = op->fd_entry->fd; 2312#endif 2313 dispatch_fd_entry_t fd_entry = op->fd_entry; 2314 _dispatch_fd_entry_retain(fd_entry); 2315 dispatch_io_t channel = op->channel; 2316 _dispatch_retain(channel); 2317 // Note that data delivery may occur after the operation is freed 2318 dispatch_async(op->op_q, ^{ 2319 bool done = (flags & DOP_DONE); 2320 dispatch_data_t d = data; 2321 if (done) { 2322 if (direction == DOP_DIR_READ && err) { 2323 if (dispatch_data_get_size(d)) { 2324 _dispatch_fd_debug("IO handler invoke", fd); 2325 handler(false, d, 0); 2326 } 2327 d = NULL; 2328 } else if (direction == DOP_DIR_WRITE && !err) { 2329 d = NULL; 2330 } 2331 } 2332 _dispatch_fd_debug("IO handler invoke", fd); 2333 handler(done, d, err); 2334 _dispatch_release(channel); 2335 _dispatch_fd_entry_release(fd_entry); 2336 _dispatch_io_data_release(data); 2337 }); 2338} 2339 2340#pragma mark - 2341#pragma mark dispatch_io_debug 2342 2343static size_t 2344_dispatch_io_debug_attr(dispatch_io_t channel, char* buf, size_t bufsiz) 2345{ 2346 dispatch_queue_t target = channel->do_targetq; 2347 return dsnprintf(buf, bufsiz, "type = %s, fd = 0x%x, %sfd_entry = %p, " 2348 "queue = %p, target = %s[%p], barrier_queue = %p, barrier_group = " 2349 "%p, err = 0x%x, low = 0x%zx, high = 0x%zx, interval%s = %llu ", 2350 channel->params.type == DISPATCH_IO_STREAM ? "stream" : "random", 2351 channel->fd_actual, channel->atomic_flags & DIO_STOPPED ? 2352 "stopped, " : channel->atomic_flags & DIO_CLOSED ? "closed, " : "", 2353 channel->fd_entry, channel->queue, target && target->dq_label ? 2354 target->dq_label : "", target, channel->barrier_queue, 2355 channel->barrier_group, channel->err, channel->params.low, 2356 channel->params.high, channel->params.interval_flags & 2357 DISPATCH_IO_STRICT_INTERVAL ? "(strict)" : "", 2358 channel->params.interval); 2359} 2360 2361size_t 2362_dispatch_io_debug(dispatch_io_t channel, char* buf, size_t bufsiz) 2363{ 2364 size_t offset = 0; 2365 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ", 2366 dx_kind(channel), channel); 2367 offset += _dispatch_object_debug_attr(channel, &buf[offset], 2368 bufsiz - offset); 2369 offset += _dispatch_io_debug_attr(channel, &buf[offset], bufsiz - offset); 2370 offset += dsnprintf(&buf[offset], bufsiz - offset, "}"); 2371 return offset; 2372} 2373 2374static size_t 2375_dispatch_operation_debug_attr(dispatch_operation_t op, char* buf, 2376 size_t bufsiz) 2377{ 2378 dispatch_queue_t target = op->do_targetq; 2379 dispatch_queue_t oqtarget = op->op_q ? op->op_q->do_targetq : NULL; 2380 return dsnprintf(buf, bufsiz, "type = %s %s, fd = 0x%x, fd_entry = %p, " 2381 "channel = %p, queue = %p -> %s[%p], target = %s[%p], " 2382 "offset = %lld, length = %zu, done = %zu, undelivered = %zu, " 2383 "flags = %u, err = 0x%x, low = 0x%zx, high = 0x%zx, " 2384 "interval%s = %llu ", op->params.type == DISPATCH_IO_STREAM ? 2385 "stream" : "random", op->direction == DOP_DIR_READ ? "read" : 2386 "write", op->fd_entry ? op->fd_entry->fd : -1, op->fd_entry, 2387 op->channel, op->op_q, oqtarget && oqtarget->dq_label ? 2388 oqtarget->dq_label : "", oqtarget, target && target->dq_label ? 2389 target->dq_label : "", target, op->offset, op->length, op->total, 2390 op->undelivered + op->buf_len, op->flags, op->err, op->params.low, 2391 op->params.high, op->params.interval_flags & 2392 DISPATCH_IO_STRICT_INTERVAL ? "(strict)" : "", op->params.interval); 2393} 2394 2395size_t 2396_dispatch_operation_debug(dispatch_operation_t op, char* buf, size_t bufsiz) 2397{ 2398 size_t offset = 0; 2399 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ", 2400 dx_kind(op), op); 2401 offset += _dispatch_object_debug_attr(op, &buf[offset], bufsiz - offset); 2402 offset += _dispatch_operation_debug_attr(op, &buf[offset], bufsiz - offset); 2403 offset += dsnprintf(&buf[offset], bufsiz - offset, "}"); 2404 return offset; 2405} 2406