1/* 2 * Copyright (c) 2009-2013 Apple Inc. All rights reserved. 3 * 4 * @APPLE_APACHE_LICENSE_HEADER_START@ 5 * 6 * Licensed under the Apache License, Version 2.0 (the "License"); 7 * you may not use this file except in compliance with the License. 8 * You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 * 18 * @APPLE_APACHE_LICENSE_HEADER_END@ 19 */ 20 21#include "internal.h" 22 23#ifndef DISPATCH_IO_DEBUG 24#define DISPATCH_IO_DEBUG DISPATCH_DEBUG 25#endif 26 27#if DISPATCH_IO_DEBUG 28#define _dispatch_fd_debug(msg, fd, args...) \ 29 _dispatch_debug("fd[0x%x]: " msg, (fd), ##args) 30#else 31#define _dispatch_fd_debug(msg, fd, args...) 32#endif 33 34#if USE_OBJC 35#define _dispatch_io_data_retain(x) _dispatch_objc_retain(x) 36#define _dispatch_io_data_release(x) _dispatch_objc_release(x) 37#else 38#define _dispatch_io_data_retain(x) dispatch_retain(x) 39#define _dispatch_io_data_release(x) dispatch_release(x) 40#endif 41 42typedef void (^dispatch_fd_entry_init_callback_t)(dispatch_fd_entry_t fd_entry); 43 44DISPATCH_EXPORT DISPATCH_NOTHROW 45void _dispatch_iocntl(uint32_t param, uint64_t value); 46 47static dispatch_operation_t _dispatch_operation_create( 48 dispatch_op_direction_t direction, dispatch_io_t channel, off_t offset, 49 size_t length, dispatch_data_t data, dispatch_queue_t queue, 50 dispatch_io_handler_t handler); 51static void _dispatch_operation_enqueue(dispatch_operation_t op, 52 dispatch_op_direction_t direction, dispatch_data_t data); 53static dispatch_source_t _dispatch_operation_timer(dispatch_queue_t tq, 54 dispatch_operation_t op); 55static inline void _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry); 56static inline void _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry); 57static void _dispatch_fd_entry_init_async(dispatch_fd_t fd, 58 dispatch_fd_entry_init_callback_t completion_callback); 59static dispatch_fd_entry_t _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd, 60 uintptr_t hash); 61static dispatch_fd_entry_t _dispatch_fd_entry_create_with_path( 62 dispatch_io_path_data_t path_data, dev_t dev, mode_t mode); 63static int _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry, 64 dispatch_io_t channel); 65static void _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry, 66 dispatch_io_t channel); 67static void _dispatch_stream_init(dispatch_fd_entry_t fd_entry, 68 dispatch_queue_t tq); 69static void _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry, 70 dispatch_op_direction_t direction); 71static void _dispatch_disk_init(dispatch_fd_entry_t fd_entry, dev_t dev); 72static void _dispatch_stream_enqueue_operation(dispatch_stream_t stream, 73 dispatch_operation_t operation, dispatch_data_t data); 74static void _dispatch_disk_enqueue_operation(dispatch_disk_t dsk, 75 dispatch_operation_t operation, dispatch_data_t data); 76static void _dispatch_stream_cleanup_operations(dispatch_stream_t stream, 77 dispatch_io_t channel); 78static void _dispatch_disk_cleanup_operations(dispatch_disk_t disk, 79 dispatch_io_t channel); 80static void _dispatch_stream_source_handler(void *ctx); 81static void _dispatch_stream_queue_handler(void *ctx); 82static void _dispatch_stream_handler(void *ctx); 83static void _dispatch_disk_handler(void *ctx); 84static void _dispatch_disk_perform(void *ctxt); 85static void _dispatch_operation_advise(dispatch_operation_t op, 86 size_t chunk_size); 87static int _dispatch_operation_perform(dispatch_operation_t op); 88static void _dispatch_operation_deliver_data(dispatch_operation_t op, 89 dispatch_op_flags_t flags); 90 91// Macros to wrap syscalls which return -1 on error, and retry on EINTR 92#define _dispatch_io_syscall_switch_noerr(_err, _syscall, ...) do { \ 93 switch (((_err) = (((_syscall) == -1) ? errno : 0))) { \ 94 case EINTR: continue; \ 95 __VA_ARGS__ \ 96 } \ 97 break; \ 98 } while (1) 99#define _dispatch_io_syscall_switch(__err, __syscall, ...) do { \ 100 _dispatch_io_syscall_switch_noerr(__err, __syscall, \ 101 case 0: break; \ 102 __VA_ARGS__ \ 103 ); \ 104 } while (0) 105#define _dispatch_io_syscall(__syscall) do { int __err; \ 106 _dispatch_io_syscall_switch(__err, __syscall); \ 107 } while (0) 108 109enum { 110 DISPATCH_OP_COMPLETE = 1, 111 DISPATCH_OP_DELIVER, 112 DISPATCH_OP_DELIVER_AND_COMPLETE, 113 DISPATCH_OP_COMPLETE_RESUME, 114 DISPATCH_OP_RESUME, 115 DISPATCH_OP_ERR, 116 DISPATCH_OP_FD_ERR, 117}; 118 119#define _dispatch_io_Block_copy(x) \ 120 ((typeof(x))_dispatch_Block_copy((dispatch_block_t)(x))) 121 122#pragma mark - 123#pragma mark dispatch_io_hashtables 124 125#if TARGET_OS_EMBEDDED 126#define DIO_HASH_SIZE 64u // must be a power of two 127#else 128#define DIO_HASH_SIZE 256u // must be a power of two 129#endif 130#define DIO_HASH(x) ((uintptr_t)(x) & (DIO_HASH_SIZE - 1)) 131 132// Global hashtable of dev_t -> disk_s mappings 133DISPATCH_CACHELINE_ALIGN 134static TAILQ_HEAD(, dispatch_disk_s) _dispatch_io_devs[DIO_HASH_SIZE]; 135// Global hashtable of fd -> fd_entry_s mappings 136DISPATCH_CACHELINE_ALIGN 137static TAILQ_HEAD(, dispatch_fd_entry_s) _dispatch_io_fds[DIO_HASH_SIZE]; 138 139static dispatch_once_t _dispatch_io_devs_lockq_pred; 140static dispatch_queue_t _dispatch_io_devs_lockq; 141static dispatch_queue_t _dispatch_io_fds_lockq; 142 143static void 144_dispatch_io_fds_lockq_init(void *context DISPATCH_UNUSED) 145{ 146 _dispatch_io_fds_lockq = dispatch_queue_create( 147 "com.apple.libdispatch-io.fd_lockq", NULL); 148 unsigned int i; 149 for (i = 0; i < DIO_HASH_SIZE; i++) { 150 TAILQ_INIT(&_dispatch_io_fds[i]); 151 } 152} 153 154static void 155_dispatch_io_devs_lockq_init(void *context DISPATCH_UNUSED) 156{ 157 _dispatch_io_devs_lockq = dispatch_queue_create( 158 "com.apple.libdispatch-io.dev_lockq", NULL); 159 unsigned int i; 160 for (i = 0; i < DIO_HASH_SIZE; i++) { 161 TAILQ_INIT(&_dispatch_io_devs[i]); 162 } 163} 164 165#pragma mark - 166#pragma mark dispatch_io_defaults 167 168enum { 169 DISPATCH_IOCNTL_CHUNK_PAGES = 1, 170 DISPATCH_IOCNTL_LOW_WATER_CHUNKS, 171 DISPATCH_IOCNTL_INITIAL_DELIVERY, 172 DISPATCH_IOCNTL_MAX_PENDING_IO_REQS, 173}; 174 175static struct dispatch_io_defaults_s { 176 size_t chunk_pages, low_water_chunks, max_pending_io_reqs; 177 bool initial_delivery; 178} dispatch_io_defaults = { 179 .chunk_pages = DIO_MAX_CHUNK_PAGES, 180 .low_water_chunks = DIO_DEFAULT_LOW_WATER_CHUNKS, 181 .max_pending_io_reqs = DIO_MAX_PENDING_IO_REQS, 182}; 183 184#define _dispatch_iocntl_set_default(p, v) do { \ 185 dispatch_io_defaults.p = (typeof(dispatch_io_defaults.p))(v); \ 186 } while (0) 187 188void 189_dispatch_iocntl(uint32_t param, uint64_t value) 190{ 191 switch (param) { 192 case DISPATCH_IOCNTL_CHUNK_PAGES: 193 _dispatch_iocntl_set_default(chunk_pages, value); 194 break; 195 case DISPATCH_IOCNTL_LOW_WATER_CHUNKS: 196 _dispatch_iocntl_set_default(low_water_chunks, value); 197 break; 198 case DISPATCH_IOCNTL_INITIAL_DELIVERY: 199 _dispatch_iocntl_set_default(initial_delivery, value); 200 case DISPATCH_IOCNTL_MAX_PENDING_IO_REQS: 201 _dispatch_iocntl_set_default(max_pending_io_reqs, value); 202 break; 203 } 204} 205 206#pragma mark - 207#pragma mark dispatch_io_t 208 209static dispatch_io_t 210_dispatch_io_create(dispatch_io_type_t type) 211{ 212 dispatch_io_t channel = _dispatch_alloc(DISPATCH_VTABLE(io), 213 sizeof(struct dispatch_io_s)); 214 channel->do_next = DISPATCH_OBJECT_LISTLESS; 215 channel->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT, 216 true); 217 channel->params.type = type; 218 channel->params.high = SIZE_MAX; 219 channel->params.low = dispatch_io_defaults.low_water_chunks * 220 dispatch_io_defaults.chunk_pages * PAGE_SIZE; 221 channel->queue = dispatch_queue_create("com.apple.libdispatch-io.channelq", 222 NULL); 223 return channel; 224} 225 226static void 227_dispatch_io_init(dispatch_io_t channel, dispatch_fd_entry_t fd_entry, 228 dispatch_queue_t queue, int err, void (^cleanup_handler)(int)) 229{ 230 // Enqueue the cleanup handler on the suspended close queue 231 if (cleanup_handler) { 232 _dispatch_retain(queue); 233 dispatch_async(!err ? fd_entry->close_queue : channel->queue, ^{ 234 dispatch_async(queue, ^{ 235 _dispatch_fd_debug("cleanup handler invoke", -1); 236 cleanup_handler(err); 237 }); 238 _dispatch_release(queue); 239 }); 240 } 241 if (fd_entry) { 242 channel->fd_entry = fd_entry; 243 dispatch_retain(fd_entry->barrier_queue); 244 dispatch_retain(fd_entry->barrier_group); 245 channel->barrier_queue = fd_entry->barrier_queue; 246 channel->barrier_group = fd_entry->barrier_group; 247 } else { 248 // Still need to create a barrier queue, since all operations go 249 // through it 250 channel->barrier_queue = dispatch_queue_create( 251 "com.apple.libdispatch-io.barrierq", NULL); 252 channel->barrier_group = dispatch_group_create(); 253 } 254} 255 256void 257_dispatch_io_dispose(dispatch_io_t channel) 258{ 259 _dispatch_object_debug(channel, "%s", __func__); 260 if (channel->fd_entry && 261 !(channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED))) { 262 if (channel->fd_entry->path_data) { 263 // This modification is safe since path_data->channel is checked 264 // only on close_queue (which is still suspended at this point) 265 channel->fd_entry->path_data->channel = NULL; 266 } 267 // Cleanup handlers will only run when all channels related to this 268 // fd are complete 269 _dispatch_fd_entry_release(channel->fd_entry); 270 } 271 if (channel->queue) { 272 dispatch_release(channel->queue); 273 } 274 if (channel->barrier_queue) { 275 dispatch_release(channel->barrier_queue); 276 } 277 if (channel->barrier_group) { 278 dispatch_release(channel->barrier_group); 279 } 280} 281 282static int 283_dispatch_io_validate_type(dispatch_io_t channel, mode_t mode) 284{ 285 int err = 0; 286 if (S_ISDIR(mode)) { 287 err = EISDIR; 288 } else if (channel->params.type == DISPATCH_IO_RANDOM && 289 (S_ISFIFO(mode) || S_ISSOCK(mode))) { 290 err = ESPIPE; 291 } 292 return err; 293} 294 295static int 296_dispatch_io_get_error(dispatch_operation_t op, dispatch_io_t channel, 297 bool ignore_closed) 298{ 299 // On _any_ queue 300 int err; 301 if (op) { 302 channel = op->channel; 303 } 304 if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) { 305 if (!ignore_closed || channel->atomic_flags & DIO_STOPPED) { 306 err = ECANCELED; 307 } else { 308 err = 0; 309 } 310 } else { 311 err = op ? op->fd_entry->err : channel->err; 312 } 313 return err; 314} 315 316#pragma mark - 317#pragma mark dispatch_io_channels 318 319dispatch_io_t 320dispatch_io_create(dispatch_io_type_t type, dispatch_fd_t fd, 321 dispatch_queue_t queue, void (^cleanup_handler)(int)) 322{ 323 if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) { 324 return NULL; 325 } 326 _dispatch_fd_debug("io create", fd); 327 dispatch_io_t channel = _dispatch_io_create(type); 328 channel->fd = fd; 329 channel->fd_actual = fd; 330 dispatch_suspend(channel->queue); 331 _dispatch_retain(queue); 332 _dispatch_retain(channel); 333 _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) { 334 // On barrier queue 335 int err = fd_entry->err; 336 if (!err) { 337 err = _dispatch_io_validate_type(channel, fd_entry->stat.mode); 338 } 339 if (!err && type == DISPATCH_IO_RANDOM) { 340 off_t f_ptr; 341 _dispatch_io_syscall_switch_noerr(err, 342 f_ptr = lseek(fd_entry->fd, 0, SEEK_CUR), 343 case 0: channel->f_ptr = f_ptr; break; 344 default: (void)dispatch_assume_zero(err); break; 345 ); 346 } 347 channel->err = err; 348 _dispatch_fd_entry_retain(fd_entry); 349 _dispatch_io_init(channel, fd_entry, queue, err, cleanup_handler); 350 dispatch_resume(channel->queue); 351 _dispatch_object_debug(channel, "%s", __func__); 352 _dispatch_release(channel); 353 _dispatch_release(queue); 354 }); 355 _dispatch_object_debug(channel, "%s", __func__); 356 return channel; 357} 358 359dispatch_io_t 360dispatch_io_create_f(dispatch_io_type_t type, dispatch_fd_t fd, 361 dispatch_queue_t queue, void *context, 362 void (*cleanup_handler)(void *context, int error)) 363{ 364 return dispatch_io_create(type, fd, queue, !cleanup_handler ? NULL : 365 ^(int error){ cleanup_handler(context, error); }); 366} 367 368dispatch_io_t 369dispatch_io_create_with_path(dispatch_io_type_t type, const char *path, 370 int oflag, mode_t mode, dispatch_queue_t queue, 371 void (^cleanup_handler)(int error)) 372{ 373 if ((type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) || 374 !(path && *path == '/')) { 375 return NULL; 376 } 377 size_t pathlen = strlen(path); 378 dispatch_io_path_data_t path_data = malloc(sizeof(*path_data) + pathlen+1); 379 if (!path_data) { 380 return NULL; 381 } 382 _dispatch_fd_debug("io create with path %s", -1, path); 383 dispatch_io_t channel = _dispatch_io_create(type); 384 channel->fd = -1; 385 channel->fd_actual = -1; 386 path_data->channel = channel; 387 path_data->oflag = oflag; 388 path_data->mode = mode; 389 path_data->pathlen = pathlen; 390 memcpy(path_data->path, path, pathlen + 1); 391 _dispatch_retain(queue); 392 _dispatch_retain(channel); 393 dispatch_async(channel->queue, ^{ 394 int err = 0; 395 struct stat st; 396 _dispatch_io_syscall_switch_noerr(err, 397 (path_data->oflag & O_NOFOLLOW) == O_NOFOLLOW || 398 (path_data->oflag & O_SYMLINK) == O_SYMLINK ? 399 lstat(path_data->path, &st) : stat(path_data->path, &st), 400 case 0: 401 err = _dispatch_io_validate_type(channel, st.st_mode); 402 break; 403 default: 404 if ((path_data->oflag & O_CREAT) && 405 (*(path_data->path + path_data->pathlen - 1) != '/')) { 406 // Check parent directory 407 char *c = strrchr(path_data->path, '/'); 408 dispatch_assert(c); 409 *c = 0; 410 int perr; 411 _dispatch_io_syscall_switch_noerr(perr, 412 stat(path_data->path, &st), 413 case 0: 414 // Since the parent directory exists, open() will 415 // create a regular file after the fd_entry has 416 // been filled in 417 st.st_mode = S_IFREG; 418 err = 0; 419 break; 420 ); 421 *c = '/'; 422 } 423 break; 424 ); 425 channel->err = err; 426 if (err) { 427 free(path_data); 428 _dispatch_io_init(channel, NULL, queue, err, cleanup_handler); 429 _dispatch_release(channel); 430 _dispatch_release(queue); 431 return; 432 } 433 dispatch_suspend(channel->queue); 434 dispatch_once_f(&_dispatch_io_devs_lockq_pred, NULL, 435 _dispatch_io_devs_lockq_init); 436 dispatch_async(_dispatch_io_devs_lockq, ^{ 437 dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create_with_path( 438 path_data, st.st_dev, st.st_mode); 439 _dispatch_io_init(channel, fd_entry, queue, 0, cleanup_handler); 440 dispatch_resume(channel->queue); 441 _dispatch_object_debug(channel, "%s", __func__); 442 _dispatch_release(channel); 443 _dispatch_release(queue); 444 }); 445 }); 446 _dispatch_object_debug(channel, "%s", __func__); 447 return channel; 448} 449 450dispatch_io_t 451dispatch_io_create_with_path_f(dispatch_io_type_t type, const char *path, 452 int oflag, mode_t mode, dispatch_queue_t queue, void *context, 453 void (*cleanup_handler)(void *context, int error)) 454{ 455 return dispatch_io_create_with_path(type, path, oflag, mode, queue, 456 !cleanup_handler ? NULL : 457 ^(int error){ cleanup_handler(context, error); }); 458} 459 460dispatch_io_t 461dispatch_io_create_with_io(dispatch_io_type_t type, dispatch_io_t in_channel, 462 dispatch_queue_t queue, void (^cleanup_handler)(int error)) 463{ 464 if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) { 465 return NULL; 466 } 467 _dispatch_fd_debug("io create with io %p", -1, in_channel); 468 dispatch_io_t channel = _dispatch_io_create(type); 469 dispatch_suspend(channel->queue); 470 _dispatch_retain(queue); 471 _dispatch_retain(channel); 472 _dispatch_retain(in_channel); 473 dispatch_async(in_channel->queue, ^{ 474 int err0 = _dispatch_io_get_error(NULL, in_channel, false); 475 if (err0) { 476 channel->err = err0; 477 _dispatch_io_init(channel, NULL, queue, err0, cleanup_handler); 478 dispatch_resume(channel->queue); 479 _dispatch_release(channel); 480 _dispatch_release(in_channel); 481 _dispatch_release(queue); 482 return; 483 } 484 dispatch_async(in_channel->barrier_queue, ^{ 485 int err = _dispatch_io_get_error(NULL, in_channel, false); 486 // If there is no error, the fd_entry for the in_channel is valid. 487 // Since we are running on in_channel's queue, the fd_entry has been 488 // fully resolved and will stay valid for the duration of this block 489 if (!err) { 490 err = in_channel->err; 491 if (!err) { 492 err = in_channel->fd_entry->err; 493 } 494 } 495 if (!err) { 496 err = _dispatch_io_validate_type(channel, 497 in_channel->fd_entry->stat.mode); 498 } 499 if (!err && type == DISPATCH_IO_RANDOM && in_channel->fd != -1) { 500 off_t f_ptr; 501 _dispatch_io_syscall_switch_noerr(err, 502 f_ptr = lseek(in_channel->fd_entry->fd, 0, SEEK_CUR), 503 case 0: channel->f_ptr = f_ptr; break; 504 default: (void)dispatch_assume_zero(err); break; 505 ); 506 } 507 channel->err = err; 508 if (err) { 509 _dispatch_io_init(channel, NULL, queue, err, cleanup_handler); 510 dispatch_resume(channel->queue); 511 _dispatch_release(channel); 512 _dispatch_release(in_channel); 513 _dispatch_release(queue); 514 return; 515 } 516 if (in_channel->fd == -1) { 517 // in_channel was created from path 518 channel->fd = -1; 519 channel->fd_actual = -1; 520 mode_t mode = in_channel->fd_entry->stat.mode; 521 dev_t dev = in_channel->fd_entry->stat.dev; 522 size_t path_data_len = sizeof(struct dispatch_io_path_data_s) + 523 in_channel->fd_entry->path_data->pathlen + 1; 524 dispatch_io_path_data_t path_data = malloc(path_data_len); 525 memcpy(path_data, in_channel->fd_entry->path_data, 526 path_data_len); 527 path_data->channel = channel; 528 // lockq_io_devs is known to already exist 529 dispatch_async(_dispatch_io_devs_lockq, ^{ 530 dispatch_fd_entry_t fd_entry; 531 fd_entry = _dispatch_fd_entry_create_with_path(path_data, 532 dev, mode); 533 _dispatch_io_init(channel, fd_entry, queue, 0, 534 cleanup_handler); 535 dispatch_resume(channel->queue); 536 _dispatch_release(channel); 537 _dispatch_release(queue); 538 }); 539 } else { 540 dispatch_fd_entry_t fd_entry = in_channel->fd_entry; 541 channel->fd = in_channel->fd; 542 channel->fd_actual = in_channel->fd_actual; 543 _dispatch_fd_entry_retain(fd_entry); 544 _dispatch_io_init(channel, fd_entry, queue, 0, cleanup_handler); 545 dispatch_resume(channel->queue); 546 _dispatch_release(channel); 547 _dispatch_release(queue); 548 } 549 _dispatch_release(in_channel); 550 _dispatch_object_debug(channel, "%s", __func__); 551 }); 552 }); 553 _dispatch_object_debug(channel, "%s", __func__); 554 return channel; 555} 556 557dispatch_io_t 558dispatch_io_create_with_io_f(dispatch_io_type_t type, dispatch_io_t in_channel, 559 dispatch_queue_t queue, void *context, 560 void (*cleanup_handler)(void *context, int error)) 561{ 562 return dispatch_io_create_with_io(type, in_channel, queue, 563 !cleanup_handler ? NULL : 564 ^(int error){ cleanup_handler(context, error); }); 565} 566 567#pragma mark - 568#pragma mark dispatch_io_accessors 569 570void 571dispatch_io_set_high_water(dispatch_io_t channel, size_t high_water) 572{ 573 _dispatch_retain(channel); 574 dispatch_async(channel->queue, ^{ 575 _dispatch_fd_debug("io set high water", channel->fd); 576 if (channel->params.low > high_water) { 577 channel->params.low = high_water; 578 } 579 channel->params.high = high_water ? high_water : 1; 580 _dispatch_release(channel); 581 }); 582} 583 584void 585dispatch_io_set_low_water(dispatch_io_t channel, size_t low_water) 586{ 587 _dispatch_retain(channel); 588 dispatch_async(channel->queue, ^{ 589 _dispatch_fd_debug("io set low water", channel->fd); 590 if (channel->params.high < low_water) { 591 channel->params.high = low_water ? low_water : 1; 592 } 593 channel->params.low = low_water; 594 _dispatch_release(channel); 595 }); 596} 597 598void 599dispatch_io_set_interval(dispatch_io_t channel, uint64_t interval, 600 unsigned long flags) 601{ 602 _dispatch_retain(channel); 603 dispatch_async(channel->queue, ^{ 604 _dispatch_fd_debug("io set interval", channel->fd); 605 channel->params.interval = interval < INT64_MAX ? interval : INT64_MAX; 606 channel->params.interval_flags = flags; 607 _dispatch_release(channel); 608 }); 609} 610 611void 612_dispatch_io_set_target_queue(dispatch_io_t channel, dispatch_queue_t dq) 613{ 614 _dispatch_retain(dq); 615 _dispatch_retain(channel); 616 dispatch_async(channel->queue, ^{ 617 dispatch_queue_t prev_dq = channel->do_targetq; 618 channel->do_targetq = dq; 619 _dispatch_release(prev_dq); 620 _dispatch_object_debug(channel, "%s", __func__); 621 _dispatch_release(channel); 622 }); 623} 624 625dispatch_fd_t 626dispatch_io_get_descriptor(dispatch_io_t channel) 627{ 628 if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) { 629 return -1; 630 } 631 dispatch_fd_t fd = channel->fd_actual; 632 if (fd == -1 && _dispatch_thread_getspecific(dispatch_io_key) == channel && 633 !_dispatch_io_get_error(NULL, channel, false)) { 634 dispatch_fd_entry_t fd_entry = channel->fd_entry; 635 (void)_dispatch_fd_entry_open(fd_entry, channel); 636 } 637 return channel->fd_actual; 638} 639 640#pragma mark - 641#pragma mark dispatch_io_operations 642 643static void 644_dispatch_io_stop(dispatch_io_t channel) 645{ 646 _dispatch_fd_debug("io stop", channel->fd); 647 (void)dispatch_atomic_or2o(channel, atomic_flags, DIO_STOPPED, relaxed); 648 _dispatch_retain(channel); 649 dispatch_async(channel->queue, ^{ 650 dispatch_async(channel->barrier_queue, ^{ 651 _dispatch_object_debug(channel, "%s", __func__); 652 dispatch_fd_entry_t fd_entry = channel->fd_entry; 653 if (fd_entry) { 654 _dispatch_fd_debug("io stop cleanup", channel->fd); 655 _dispatch_fd_entry_cleanup_operations(fd_entry, channel); 656 if (!(channel->atomic_flags & DIO_CLOSED)) { 657 channel->fd_entry = NULL; 658 _dispatch_fd_entry_release(fd_entry); 659 } 660 } else if (channel->fd != -1) { 661 // Stop after close, need to check if fd_entry still exists 662 _dispatch_retain(channel); 663 dispatch_async(_dispatch_io_fds_lockq, ^{ 664 _dispatch_object_debug(channel, "%s", __func__); 665 _dispatch_fd_debug("io stop after close cleanup", 666 channel->fd); 667 dispatch_fd_entry_t fdi; 668 uintptr_t hash = DIO_HASH(channel->fd); 669 TAILQ_FOREACH(fdi, &_dispatch_io_fds[hash], fd_list) { 670 if (fdi->fd == channel->fd) { 671 _dispatch_fd_entry_cleanup_operations(fdi, channel); 672 break; 673 } 674 } 675 _dispatch_release(channel); 676 }); 677 } 678 _dispatch_release(channel); 679 }); 680 }); 681} 682 683void 684dispatch_io_close(dispatch_io_t channel, unsigned long flags) 685{ 686 if (flags & DISPATCH_IO_STOP) { 687 // Don't stop an already stopped channel 688 if (channel->atomic_flags & DIO_STOPPED) { 689 return; 690 } 691 return _dispatch_io_stop(channel); 692 } 693 // Don't close an already closed or stopped channel 694 if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) { 695 return; 696 } 697 _dispatch_retain(channel); 698 dispatch_async(channel->queue, ^{ 699 dispatch_async(channel->barrier_queue, ^{ 700 _dispatch_object_debug(channel, "%s", __func__); 701 _dispatch_fd_debug("io close", channel->fd); 702 if (!(channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED))) { 703 (void)dispatch_atomic_or2o(channel, atomic_flags, DIO_CLOSED, 704 relaxed); 705 dispatch_fd_entry_t fd_entry = channel->fd_entry; 706 if (fd_entry) { 707 if (!fd_entry->path_data) { 708 channel->fd_entry = NULL; 709 } 710 _dispatch_fd_entry_release(fd_entry); 711 } 712 } 713 _dispatch_release(channel); 714 }); 715 }); 716} 717 718void 719dispatch_io_barrier(dispatch_io_t channel, dispatch_block_t barrier) 720{ 721 _dispatch_retain(channel); 722 dispatch_async(channel->queue, ^{ 723 dispatch_queue_t io_q = channel->do_targetq; 724 dispatch_queue_t barrier_queue = channel->barrier_queue; 725 dispatch_group_t barrier_group = channel->barrier_group; 726 dispatch_async(barrier_queue, ^{ 727 dispatch_suspend(barrier_queue); 728 dispatch_group_notify(barrier_group, io_q, ^{ 729 _dispatch_object_debug(channel, "%s", __func__); 730 _dispatch_thread_setspecific(dispatch_io_key, channel); 731 barrier(); 732 _dispatch_thread_setspecific(dispatch_io_key, NULL); 733 dispatch_resume(barrier_queue); 734 _dispatch_release(channel); 735 }); 736 }); 737 }); 738} 739 740void 741dispatch_io_barrier_f(dispatch_io_t channel, void *context, 742 dispatch_function_t barrier) 743{ 744 return dispatch_io_barrier(channel, ^{ barrier(context); }); 745} 746 747void 748dispatch_io_read(dispatch_io_t channel, off_t offset, size_t length, 749 dispatch_queue_t queue, dispatch_io_handler_t handler) 750{ 751 _dispatch_retain(channel); 752 _dispatch_retain(queue); 753 dispatch_async(channel->queue, ^{ 754 dispatch_operation_t op; 755 op = _dispatch_operation_create(DOP_DIR_READ, channel, offset, 756 length, dispatch_data_empty, queue, handler); 757 if (op) { 758 dispatch_queue_t barrier_q = channel->barrier_queue; 759 dispatch_async(barrier_q, ^{ 760 _dispatch_operation_enqueue(op, DOP_DIR_READ, 761 dispatch_data_empty); 762 }); 763 } 764 _dispatch_release(channel); 765 _dispatch_release(queue); 766 }); 767} 768 769void 770dispatch_io_read_f(dispatch_io_t channel, off_t offset, size_t length, 771 dispatch_queue_t queue, void *context, 772 dispatch_io_handler_function_t handler) 773{ 774 return dispatch_io_read(channel, offset, length, queue, 775 ^(bool done, dispatch_data_t d, int error){ 776 handler(context, done, d, error); 777 }); 778} 779 780void 781dispatch_io_write(dispatch_io_t channel, off_t offset, dispatch_data_t data, 782 dispatch_queue_t queue, dispatch_io_handler_t handler) 783{ 784 _dispatch_io_data_retain(data); 785 _dispatch_retain(channel); 786 _dispatch_retain(queue); 787 dispatch_async(channel->queue, ^{ 788 dispatch_operation_t op; 789 op = _dispatch_operation_create(DOP_DIR_WRITE, channel, offset, 790 dispatch_data_get_size(data), data, queue, handler); 791 if (op) { 792 dispatch_queue_t barrier_q = channel->barrier_queue; 793 dispatch_async(barrier_q, ^{ 794 _dispatch_operation_enqueue(op, DOP_DIR_WRITE, data); 795 _dispatch_io_data_release(data); 796 }); 797 } else { 798 _dispatch_io_data_release(data); 799 } 800 _dispatch_release(channel); 801 _dispatch_release(queue); 802 }); 803} 804 805void 806dispatch_io_write_f(dispatch_io_t channel, off_t offset, dispatch_data_t data, 807 dispatch_queue_t queue, void *context, 808 dispatch_io_handler_function_t handler) 809{ 810 return dispatch_io_write(channel, offset, data, queue, 811 ^(bool done, dispatch_data_t d, int error){ 812 handler(context, done, d, error); 813 }); 814} 815 816void 817dispatch_read(dispatch_fd_t fd, size_t length, dispatch_queue_t queue, 818 void (^handler)(dispatch_data_t, int)) 819{ 820 _dispatch_retain(queue); 821 _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) { 822 // On barrier queue 823 if (fd_entry->err) { 824 int err = fd_entry->err; 825 dispatch_async(queue, ^{ 826 _dispatch_fd_debug("convenience handler invoke", fd); 827 handler(dispatch_data_empty, err); 828 }); 829 _dispatch_release(queue); 830 return; 831 } 832 // Safe to access fd_entry on barrier queue 833 dispatch_io_t channel = fd_entry->convenience_channel; 834 if (!channel) { 835 channel = _dispatch_io_create(DISPATCH_IO_STREAM); 836 channel->fd = fd; 837 channel->fd_actual = fd; 838 channel->fd_entry = fd_entry; 839 dispatch_retain(fd_entry->barrier_queue); 840 dispatch_retain(fd_entry->barrier_group); 841 channel->barrier_queue = fd_entry->barrier_queue; 842 channel->barrier_group = fd_entry->barrier_group; 843 fd_entry->convenience_channel = channel; 844 } 845 __block dispatch_data_t deliver_data = dispatch_data_empty; 846 __block int err = 0; 847 dispatch_async(fd_entry->close_queue, ^{ 848 dispatch_async(queue, ^{ 849 _dispatch_fd_debug("convenience handler invoke", fd); 850 handler(deliver_data, err); 851 _dispatch_io_data_release(deliver_data); 852 }); 853 _dispatch_release(queue); 854 }); 855 dispatch_operation_t op = 856 _dispatch_operation_create(DOP_DIR_READ, channel, 0, 857 length, dispatch_data_empty, 858 _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,false), 859 ^(bool done, dispatch_data_t data, int error) { 860 if (data) { 861 data = dispatch_data_create_concat(deliver_data, data); 862 _dispatch_io_data_release(deliver_data); 863 deliver_data = data; 864 } 865 if (done) { 866 err = error; 867 } 868 }); 869 if (op) { 870 _dispatch_operation_enqueue(op, DOP_DIR_READ, dispatch_data_empty); 871 } 872 }); 873} 874 875void 876dispatch_read_f(dispatch_fd_t fd, size_t length, dispatch_queue_t queue, 877 void *context, void (*handler)(void *, dispatch_data_t, int)) 878{ 879 return dispatch_read(fd, length, queue, ^(dispatch_data_t d, int error){ 880 handler(context, d, error); 881 }); 882} 883 884void 885dispatch_write(dispatch_fd_t fd, dispatch_data_t data, dispatch_queue_t queue, 886 void (^handler)(dispatch_data_t, int)) 887{ 888 _dispatch_io_data_retain(data); 889 _dispatch_retain(queue); 890 _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) { 891 // On barrier queue 892 if (fd_entry->err) { 893 int err = fd_entry->err; 894 dispatch_async(queue, ^{ 895 _dispatch_fd_debug("convenience handler invoke", fd); 896 handler(NULL, err); 897 }); 898 _dispatch_release(queue); 899 return; 900 } 901 // Safe to access fd_entry on barrier queue 902 dispatch_io_t channel = fd_entry->convenience_channel; 903 if (!channel) { 904 channel = _dispatch_io_create(DISPATCH_IO_STREAM); 905 channel->fd = fd; 906 channel->fd_actual = fd; 907 channel->fd_entry = fd_entry; 908 dispatch_retain(fd_entry->barrier_queue); 909 dispatch_retain(fd_entry->barrier_group); 910 channel->barrier_queue = fd_entry->barrier_queue; 911 channel->barrier_group = fd_entry->barrier_group; 912 fd_entry->convenience_channel = channel; 913 } 914 __block dispatch_data_t deliver_data = NULL; 915 __block int err = 0; 916 dispatch_async(fd_entry->close_queue, ^{ 917 dispatch_async(queue, ^{ 918 _dispatch_fd_debug("convenience handler invoke", fd); 919 handler(deliver_data, err); 920 if (deliver_data) { 921 _dispatch_io_data_release(deliver_data); 922 } 923 }); 924 _dispatch_release(queue); 925 }); 926 dispatch_operation_t op = 927 _dispatch_operation_create(DOP_DIR_WRITE, channel, 0, 928 dispatch_data_get_size(data), data, 929 _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,false), 930 ^(bool done, dispatch_data_t d, int error) { 931 if (done) { 932 if (d) { 933 _dispatch_io_data_retain(d); 934 deliver_data = d; 935 } 936 err = error; 937 } 938 }); 939 if (op) { 940 _dispatch_operation_enqueue(op, DOP_DIR_WRITE, data); 941 } 942 _dispatch_io_data_release(data); 943 }); 944} 945 946void 947dispatch_write_f(dispatch_fd_t fd, dispatch_data_t data, dispatch_queue_t queue, 948 void *context, void (*handler)(void *, dispatch_data_t, int)) 949{ 950 return dispatch_write(fd, data, queue, ^(dispatch_data_t d, int error){ 951 handler(context, d, error); 952 }); 953} 954 955#pragma mark - 956#pragma mark dispatch_operation_t 957 958static dispatch_operation_t 959_dispatch_operation_create(dispatch_op_direction_t direction, 960 dispatch_io_t channel, off_t offset, size_t length, 961 dispatch_data_t data, dispatch_queue_t queue, 962 dispatch_io_handler_t handler) 963{ 964 // On channel queue 965 dispatch_assert(direction < DOP_DIR_MAX); 966 _dispatch_fd_debug("operation create", channel->fd); 967#if DISPATCH_IO_DEBUG 968 int fd = channel->fd; 969#endif 970 // Safe to call _dispatch_io_get_error() with channel->fd_entry since 971 // that can only be NULL if atomic_flags are set rdar://problem/8362514 972 int err = _dispatch_io_get_error(NULL, channel, false); 973 if (err || !length) { 974 _dispatch_io_data_retain(data); 975 _dispatch_retain(queue); 976 dispatch_async(channel->barrier_queue, ^{ 977 dispatch_async(queue, ^{ 978 dispatch_data_t d = data; 979 if (direction == DOP_DIR_READ && err) { 980 d = NULL; 981 } else if (direction == DOP_DIR_WRITE && !err) { 982 d = NULL; 983 } 984 _dispatch_fd_debug("IO handler invoke", fd); 985 handler(true, d, err); 986 _dispatch_io_data_release(data); 987 }); 988 _dispatch_release(queue); 989 }); 990 return NULL; 991 } 992 dispatch_operation_t op = _dispatch_alloc(DISPATCH_VTABLE(operation), 993 sizeof(struct dispatch_operation_s)); 994 op->do_next = DISPATCH_OBJECT_LISTLESS; 995 op->do_xref_cnt = -1; // operation object is not exposed externally 996 op->op_q = dispatch_queue_create("com.apple.libdispatch-io.opq", NULL); 997 op->op_q->do_targetq = queue; 998 _dispatch_retain(queue); 999 op->active = false; 1000 op->direction = direction; 1001 op->offset = offset + channel->f_ptr; 1002 op->length = length; 1003 op->handler = _dispatch_io_Block_copy(handler); 1004 _dispatch_retain(channel); 1005 op->channel = channel; 1006 op->params = channel->params; 1007 // Take a snapshot of the priority of the channel queue. The actual I/O 1008 // for this operation will be performed at this priority 1009 dispatch_queue_t targetq = op->channel->do_targetq; 1010 while (fastpath(targetq->do_targetq)) { 1011 targetq = targetq->do_targetq; 1012 } 1013 op->do_targetq = targetq; 1014 _dispatch_object_debug(op, "%s", __func__); 1015 return op; 1016} 1017 1018void 1019_dispatch_operation_dispose(dispatch_operation_t op) 1020{ 1021 _dispatch_object_debug(op, "%s", __func__); 1022 // Deliver the data if there's any 1023 if (op->fd_entry) { 1024 _dispatch_operation_deliver_data(op, DOP_DONE); 1025 dispatch_group_leave(op->fd_entry->barrier_group); 1026 _dispatch_fd_entry_release(op->fd_entry); 1027 } 1028 if (op->channel) { 1029 _dispatch_release(op->channel); 1030 } 1031 if (op->timer) { 1032 dispatch_release(op->timer); 1033 } 1034 // For write operations, op->buf is owned by op->buf_data 1035 if (op->buf && op->direction == DOP_DIR_READ) { 1036 free(op->buf); 1037 } 1038 if (op->buf_data) { 1039 _dispatch_io_data_release(op->buf_data); 1040 } 1041 if (op->data) { 1042 _dispatch_io_data_release(op->data); 1043 } 1044 if (op->op_q) { 1045 dispatch_release(op->op_q); 1046 } 1047 Block_release(op->handler); 1048} 1049 1050static void 1051_dispatch_operation_enqueue(dispatch_operation_t op, 1052 dispatch_op_direction_t direction, dispatch_data_t data) 1053{ 1054 // Called from the barrier queue 1055 _dispatch_io_data_retain(data); 1056 // If channel is closed or stopped, then call the handler immediately 1057 int err = _dispatch_io_get_error(NULL, op->channel, false); 1058 if (err) { 1059 dispatch_io_handler_t handler = op->handler; 1060 dispatch_async(op->op_q, ^{ 1061 dispatch_data_t d = data; 1062 if (direction == DOP_DIR_READ && err) { 1063 d = NULL; 1064 } else if (direction == DOP_DIR_WRITE && !err) { 1065 d = NULL; 1066 } 1067 handler(true, d, err); 1068 _dispatch_io_data_release(data); 1069 }); 1070 _dispatch_release(op); 1071 return; 1072 } 1073 // Finish operation init 1074 op->fd_entry = op->channel->fd_entry; 1075 _dispatch_fd_entry_retain(op->fd_entry); 1076 dispatch_group_enter(op->fd_entry->barrier_group); 1077 dispatch_disk_t disk = op->fd_entry->disk; 1078 if (!disk) { 1079 dispatch_stream_t stream = op->fd_entry->streams[direction]; 1080 dispatch_async(stream->dq, ^{ 1081 _dispatch_stream_enqueue_operation(stream, op, data); 1082 _dispatch_io_data_release(data); 1083 }); 1084 } else { 1085 dispatch_async(disk->pick_queue, ^{ 1086 _dispatch_disk_enqueue_operation(disk, op, data); 1087 _dispatch_io_data_release(data); 1088 }); 1089 } 1090} 1091 1092static bool 1093_dispatch_operation_should_enqueue(dispatch_operation_t op, 1094 dispatch_queue_t tq, dispatch_data_t data) 1095{ 1096 // On stream queue or disk queue 1097 _dispatch_fd_debug("enqueue operation", op->fd_entry->fd); 1098 _dispatch_io_data_retain(data); 1099 op->data = data; 1100 int err = _dispatch_io_get_error(op, NULL, true); 1101 if (err) { 1102 op->err = err; 1103 // Final release 1104 _dispatch_release(op); 1105 return false; 1106 } 1107 if (op->params.interval) { 1108 dispatch_resume(_dispatch_operation_timer(tq, op)); 1109 } 1110 return true; 1111} 1112 1113static dispatch_source_t 1114_dispatch_operation_timer(dispatch_queue_t tq, dispatch_operation_t op) 1115{ 1116 // On stream queue or pick queue 1117 if (op->timer) { 1118 return op->timer; 1119 } 1120 dispatch_source_t timer = dispatch_source_create( 1121 DISPATCH_SOURCE_TYPE_TIMER, 0, 0, tq); 1122 dispatch_source_set_timer(timer, dispatch_time(DISPATCH_TIME_NOW, 1123 (int64_t)op->params.interval), op->params.interval, 0); 1124 dispatch_source_set_event_handler(timer, ^{ 1125 // On stream queue or pick queue 1126 if (dispatch_source_testcancel(timer)) { 1127 // Do nothing. The operation has already completed 1128 return; 1129 } 1130 dispatch_op_flags_t flags = DOP_DEFAULT; 1131 if (op->params.interval_flags & DISPATCH_IO_STRICT_INTERVAL) { 1132 // Deliver even if there is less data than the low-water mark 1133 flags |= DOP_DELIVER; 1134 } 1135 // If the operation is active, dont deliver data 1136 if ((op->active) && (flags & DOP_DELIVER)) { 1137 op->flags = flags; 1138 } else { 1139 _dispatch_operation_deliver_data(op, flags); 1140 } 1141 }); 1142 op->timer = timer; 1143 return op->timer; 1144} 1145 1146#pragma mark - 1147#pragma mark dispatch_fd_entry_t 1148 1149#if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD 1150static void 1151_dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry) 1152{ 1153 guardid_t guard = fd_entry; 1154 const unsigned int guard_flags = GUARD_CLOSE; 1155 int err, fd_flags = 0; 1156 _dispatch_io_syscall_switch_noerr(err, 1157 change_fdguard_np(fd_entry->fd, NULL, 0, &guard, guard_flags, 1158 &fd_flags), 1159 case 0: 1160 fd_entry->guard_flags = guard_flags; 1161 fd_entry->orig_fd_flags = fd_flags; 1162 break; 1163 case EPERM: break; 1164 default: (void)dispatch_assume_zero(err); break; 1165 ); 1166} 1167 1168static void 1169_dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry) 1170{ 1171 if (!fd_entry->guard_flags) { 1172 return; 1173 } 1174 guardid_t guard = fd_entry; 1175 int err, fd_flags = fd_entry->orig_fd_flags; 1176 _dispatch_io_syscall_switch(err, 1177 change_fdguard_np(fd_entry->fd, &guard, fd_entry->guard_flags, NULL, 0, 1178 &fd_flags), 1179 default: (void)dispatch_assume_zero(err); break; 1180 ); 1181} 1182#else 1183static inline void 1184_dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry) { (void)fd_entry; } 1185static inline void 1186_dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry) { (void)fd_entry; } 1187#endif // DISPATCH_USE_GUARDED_FD 1188 1189static inline int 1190_dispatch_fd_entry_guarded_open(dispatch_fd_entry_t fd_entry, const char *path, 1191 int oflag, mode_t mode) { 1192#if DISPATCH_USE_GUARDED_FD 1193 guardid_t guard = (uintptr_t)fd_entry; 1194 const unsigned int guard_flags = GUARD_CLOSE | GUARD_DUP | 1195 GUARD_SOCKET_IPC | GUARD_FILEPORT; 1196 int fd = guarded_open_np(path, &guard, guard_flags, oflag | O_CLOEXEC, 1197 mode); 1198 if (fd != -1) { 1199 fd_entry->guard_flags = guard_flags; 1200 return fd; 1201 } 1202 errno = 0; 1203#endif 1204 return open(path, oflag, mode); 1205 (void)fd_entry; 1206} 1207 1208static inline int 1209_dispatch_fd_entry_guarded_close(dispatch_fd_entry_t fd_entry, int fd) { 1210#if DISPATCH_USE_GUARDED_FD 1211 if (fd_entry->guard_flags) { 1212 guardid_t guard = (uintptr_t)fd_entry; 1213 return guarded_close_np(fd, &guard); 1214 } else 1215#endif 1216 { 1217 return close(fd); 1218 } 1219 (void)fd_entry; 1220} 1221 1222static inline void 1223_dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry) { 1224 dispatch_suspend(fd_entry->close_queue); 1225} 1226 1227static inline void 1228_dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry) { 1229 dispatch_resume(fd_entry->close_queue); 1230} 1231 1232static void 1233_dispatch_fd_entry_init_async(dispatch_fd_t fd, 1234 dispatch_fd_entry_init_callback_t completion_callback) 1235{ 1236 static dispatch_once_t _dispatch_io_fds_lockq_pred; 1237 dispatch_once_f(&_dispatch_io_fds_lockq_pred, NULL, 1238 _dispatch_io_fds_lockq_init); 1239 dispatch_async(_dispatch_io_fds_lockq, ^{ 1240 _dispatch_fd_debug("fd entry init", fd); 1241 dispatch_fd_entry_t fd_entry = NULL; 1242 // Check to see if there is an existing entry for the given fd 1243 uintptr_t hash = DIO_HASH(fd); 1244 TAILQ_FOREACH(fd_entry, &_dispatch_io_fds[hash], fd_list) { 1245 if (fd_entry->fd == fd) { 1246 // Retain the fd_entry to ensure it cannot go away until the 1247 // stat() has completed 1248 _dispatch_fd_entry_retain(fd_entry); 1249 break; 1250 } 1251 } 1252 if (!fd_entry) { 1253 // If we did not find an existing entry, create one 1254 fd_entry = _dispatch_fd_entry_create_with_fd(fd, hash); 1255 } 1256 dispatch_async(fd_entry->barrier_queue, ^{ 1257 _dispatch_fd_debug("fd entry init completion", fd); 1258 completion_callback(fd_entry); 1259 // stat() is complete, release reference to fd_entry 1260 _dispatch_fd_entry_release(fd_entry); 1261 }); 1262 }); 1263} 1264 1265static dispatch_fd_entry_t 1266_dispatch_fd_entry_create(dispatch_queue_t q) 1267{ 1268 dispatch_fd_entry_t fd_entry; 1269 fd_entry = _dispatch_calloc(1ul, sizeof(struct dispatch_fd_entry_s)); 1270 fd_entry->close_queue = dispatch_queue_create( 1271 "com.apple.libdispatch-io.closeq", NULL); 1272 // Use target queue to ensure that no concurrent lookups are going on when 1273 // the close queue is running 1274 fd_entry->close_queue->do_targetq = q; 1275 _dispatch_retain(q); 1276 // Suspend the cleanup queue until closing 1277 _dispatch_fd_entry_retain(fd_entry); 1278 return fd_entry; 1279} 1280 1281static dispatch_fd_entry_t 1282_dispatch_fd_entry_create_with_fd(dispatch_fd_t fd, uintptr_t hash) 1283{ 1284 // On fds lock queue 1285 _dispatch_fd_debug("fd entry create", fd); 1286 dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create( 1287 _dispatch_io_fds_lockq); 1288 fd_entry->fd = fd; 1289 TAILQ_INSERT_TAIL(&_dispatch_io_fds[hash], fd_entry, fd_list); 1290 fd_entry->barrier_queue = dispatch_queue_create( 1291 "com.apple.libdispatch-io.barrierq", NULL); 1292 fd_entry->barrier_group = dispatch_group_create(); 1293 dispatch_async(fd_entry->barrier_queue, ^{ 1294 _dispatch_fd_debug("fd entry stat", fd); 1295 int err, orig_flags, orig_nosigpipe = -1; 1296 struct stat st; 1297 _dispatch_io_syscall_switch(err, 1298 fstat(fd, &st), 1299 default: fd_entry->err = err; return; 1300 ); 1301 fd_entry->stat.dev = st.st_dev; 1302 fd_entry->stat.mode = st.st_mode; 1303 _dispatch_fd_entry_guard(fd_entry); 1304 _dispatch_io_syscall_switch(err, 1305 orig_flags = fcntl(fd, F_GETFL), 1306 default: (void)dispatch_assume_zero(err); break; 1307 ); 1308#if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123 1309 if (S_ISFIFO(st.st_mode)) { 1310 _dispatch_io_syscall_switch(err, 1311 orig_nosigpipe = fcntl(fd, F_GETNOSIGPIPE), 1312 default: (void)dispatch_assume_zero(err); break; 1313 ); 1314 if (orig_nosigpipe != -1) { 1315 _dispatch_io_syscall_switch(err, 1316 orig_nosigpipe = fcntl(fd, F_SETNOSIGPIPE, 1), 1317 default: 1318 orig_nosigpipe = -1; 1319 (void)dispatch_assume_zero(err); 1320 break; 1321 ); 1322 } 1323 } 1324#endif 1325 if (S_ISREG(st.st_mode)) { 1326 if (orig_flags != -1) { 1327 _dispatch_io_syscall_switch(err, 1328 fcntl(fd, F_SETFL, orig_flags & ~O_NONBLOCK), 1329 default: 1330 orig_flags = -1; 1331 (void)dispatch_assume_zero(err); 1332 break; 1333 ); 1334 } 1335 int32_t dev = major(st.st_dev); 1336 // We have to get the disk on the global dev queue. The 1337 // barrier queue cannot continue until that is complete 1338 dispatch_suspend(fd_entry->barrier_queue); 1339 dispatch_once_f(&_dispatch_io_devs_lockq_pred, NULL, 1340 _dispatch_io_devs_lockq_init); 1341 dispatch_async(_dispatch_io_devs_lockq, ^{ 1342 _dispatch_disk_init(fd_entry, dev); 1343 dispatch_resume(fd_entry->barrier_queue); 1344 }); 1345 } else { 1346 if (orig_flags != -1) { 1347 _dispatch_io_syscall_switch(err, 1348 fcntl(fd, F_SETFL, orig_flags | O_NONBLOCK), 1349 default: 1350 orig_flags = -1; 1351 (void)dispatch_assume_zero(err); 1352 break; 1353 ); 1354 } 1355 _dispatch_stream_init(fd_entry, _dispatch_get_root_queue( 1356 _DISPATCH_QOS_CLASS_DEFAULT, false)); 1357 } 1358 fd_entry->orig_flags = orig_flags; 1359 fd_entry->orig_nosigpipe = orig_nosigpipe; 1360 }); 1361 // This is the first item run when the close queue is resumed, indicating 1362 // that all channels associated with this entry have been closed and that 1363 // all operations associated with this entry have been freed 1364 dispatch_async(fd_entry->close_queue, ^{ 1365 if (!fd_entry->disk) { 1366 _dispatch_fd_debug("close queue fd_entry cleanup", fd); 1367 dispatch_op_direction_t dir; 1368 for (dir = 0; dir < DOP_DIR_MAX; dir++) { 1369 _dispatch_stream_dispose(fd_entry, dir); 1370 } 1371 } else { 1372 dispatch_disk_t disk = fd_entry->disk; 1373 dispatch_async(_dispatch_io_devs_lockq, ^{ 1374 _dispatch_release(disk); 1375 }); 1376 } 1377 // Remove this entry from the global fd list 1378 TAILQ_REMOVE(&_dispatch_io_fds[hash], fd_entry, fd_list); 1379 }); 1380 // If there was a source associated with this stream, disposing of the 1381 // source cancels it and suspends the close queue. Freeing the fd_entry 1382 // structure must happen after the source cancel handler has finished 1383 dispatch_async(fd_entry->close_queue, ^{ 1384 _dispatch_fd_debug("close queue release", fd); 1385 dispatch_release(fd_entry->close_queue); 1386 _dispatch_fd_debug("barrier queue release", fd); 1387 dispatch_release(fd_entry->barrier_queue); 1388 _dispatch_fd_debug("barrier group release", fd); 1389 dispatch_release(fd_entry->barrier_group); 1390 if (fd_entry->orig_flags != -1) { 1391 _dispatch_io_syscall( 1392 fcntl(fd, F_SETFL, fd_entry->orig_flags) 1393 ); 1394 } 1395#if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123 1396 if (fd_entry->orig_nosigpipe != -1) { 1397 _dispatch_io_syscall( 1398 fcntl(fd, F_SETNOSIGPIPE, fd_entry->orig_nosigpipe) 1399 ); 1400 } 1401#endif 1402 _dispatch_fd_entry_unguard(fd_entry); 1403 if (fd_entry->convenience_channel) { 1404 fd_entry->convenience_channel->fd_entry = NULL; 1405 dispatch_release(fd_entry->convenience_channel); 1406 } 1407 free(fd_entry); 1408 }); 1409 return fd_entry; 1410} 1411 1412static dispatch_fd_entry_t 1413_dispatch_fd_entry_create_with_path(dispatch_io_path_data_t path_data, 1414 dev_t dev, mode_t mode) 1415{ 1416 // On devs lock queue 1417 _dispatch_fd_debug("fd entry create with path %s", -1, path_data->path); 1418 dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create( 1419 path_data->channel->queue); 1420 if (S_ISREG(mode)) { 1421 _dispatch_disk_init(fd_entry, major(dev)); 1422 } else { 1423 _dispatch_stream_init(fd_entry, _dispatch_get_root_queue( 1424 _DISPATCH_QOS_CLASS_DEFAULT, false)); 1425 } 1426 fd_entry->fd = -1; 1427 fd_entry->orig_flags = -1; 1428 fd_entry->path_data = path_data; 1429 fd_entry->stat.dev = dev; 1430 fd_entry->stat.mode = mode; 1431 fd_entry->barrier_queue = dispatch_queue_create( 1432 "com.apple.libdispatch-io.barrierq", NULL); 1433 fd_entry->barrier_group = dispatch_group_create(); 1434 // This is the first item run when the close queue is resumed, indicating 1435 // that the channel associated with this entry has been closed and that 1436 // all operations associated with this entry have been freed 1437 dispatch_async(fd_entry->close_queue, ^{ 1438 _dispatch_fd_debug("close queue fd_entry cleanup", -1); 1439 if (!fd_entry->disk) { 1440 dispatch_op_direction_t dir; 1441 for (dir = 0; dir < DOP_DIR_MAX; dir++) { 1442 _dispatch_stream_dispose(fd_entry, dir); 1443 } 1444 } 1445 if (fd_entry->fd != -1) { 1446 _dispatch_fd_entry_guarded_close(fd_entry, fd_entry->fd); 1447 } 1448 if (fd_entry->path_data->channel) { 1449 // If associated channel has not been released yet, mark it as 1450 // no longer having an fd_entry (for stop after close). 1451 // It is safe to modify channel since we are on close_queue with 1452 // target queue the channel queue 1453 fd_entry->path_data->channel->fd_entry = NULL; 1454 } 1455 }); 1456 dispatch_async(fd_entry->close_queue, ^{ 1457 _dispatch_fd_debug("close queue release", -1); 1458 dispatch_release(fd_entry->close_queue); 1459 dispatch_release(fd_entry->barrier_queue); 1460 dispatch_release(fd_entry->barrier_group); 1461 free(fd_entry->path_data); 1462 free(fd_entry); 1463 }); 1464 return fd_entry; 1465} 1466 1467static int 1468_dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry, dispatch_io_t channel) 1469{ 1470 if (!(fd_entry->fd == -1 && fd_entry->path_data)) { 1471 return 0; 1472 } 1473 if (fd_entry->err) { 1474 return fd_entry->err; 1475 } 1476 int fd = -1; 1477 int oflag = fd_entry->disk ? fd_entry->path_data->oflag & ~O_NONBLOCK : 1478 fd_entry->path_data->oflag | O_NONBLOCK; 1479open: 1480 fd = _dispatch_fd_entry_guarded_open(fd_entry, fd_entry->path_data->path, 1481 oflag, fd_entry->path_data->mode); 1482 if (fd == -1) { 1483 int err = errno; 1484 if (err == EINTR) { 1485 goto open; 1486 } 1487 (void)dispatch_atomic_cmpxchg2o(fd_entry, err, 0, err, relaxed); 1488 return err; 1489 } 1490 if (!dispatch_atomic_cmpxchg2o(fd_entry, fd, -1, fd, relaxed)) { 1491 // Lost the race with another open 1492 _dispatch_fd_entry_guarded_close(fd_entry, fd); 1493 } else { 1494 channel->fd_actual = fd; 1495 } 1496 _dispatch_object_debug(channel, "%s", __func__); 1497 return 0; 1498} 1499 1500static void 1501_dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry, 1502 dispatch_io_t channel) 1503{ 1504 if (fd_entry->disk) { 1505 if (channel) { 1506 _dispatch_retain(channel); 1507 } 1508 _dispatch_fd_entry_retain(fd_entry); 1509 dispatch_async(fd_entry->disk->pick_queue, ^{ 1510 _dispatch_disk_cleanup_operations(fd_entry->disk, channel); 1511 _dispatch_fd_entry_release(fd_entry); 1512 if (channel) { 1513 _dispatch_release(channel); 1514 } 1515 }); 1516 } else { 1517 dispatch_op_direction_t direction; 1518 for (direction = 0; direction < DOP_DIR_MAX; direction++) { 1519 dispatch_stream_t stream = fd_entry->streams[direction]; 1520 if (!stream) { 1521 continue; 1522 } 1523 if (channel) { 1524 _dispatch_retain(channel); 1525 } 1526 _dispatch_fd_entry_retain(fd_entry); 1527 dispatch_async(stream->dq, ^{ 1528 _dispatch_stream_cleanup_operations(stream, channel); 1529 _dispatch_fd_entry_release(fd_entry); 1530 if (channel) { 1531 _dispatch_release(channel); 1532 } 1533 }); 1534 } 1535 } 1536} 1537 1538#pragma mark - 1539#pragma mark dispatch_stream_t/dispatch_disk_t 1540 1541static void 1542_dispatch_stream_init(dispatch_fd_entry_t fd_entry, dispatch_queue_t tq) 1543{ 1544 dispatch_op_direction_t direction; 1545 for (direction = 0; direction < DOP_DIR_MAX; direction++) { 1546 dispatch_stream_t stream; 1547 stream = _dispatch_calloc(1ul, sizeof(struct dispatch_stream_s)); 1548 stream->dq = dispatch_queue_create("com.apple.libdispatch-io.streamq", 1549 NULL); 1550 dispatch_set_context(stream->dq, stream); 1551 _dispatch_retain(tq); 1552 stream->dq->do_targetq = tq; 1553 TAILQ_INIT(&stream->operations[DISPATCH_IO_RANDOM]); 1554 TAILQ_INIT(&stream->operations[DISPATCH_IO_STREAM]); 1555 fd_entry->streams[direction] = stream; 1556 } 1557} 1558 1559static void 1560_dispatch_stream_dispose(dispatch_fd_entry_t fd_entry, 1561 dispatch_op_direction_t direction) 1562{ 1563 // On close queue 1564 dispatch_stream_t stream = fd_entry->streams[direction]; 1565 if (!stream) { 1566 return; 1567 } 1568 dispatch_assert(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM])); 1569 dispatch_assert(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])); 1570 if (stream->source) { 1571 // Balanced by source cancel handler: 1572 _dispatch_fd_entry_retain(fd_entry); 1573 dispatch_source_cancel(stream->source); 1574 dispatch_resume(stream->source); 1575 dispatch_release(stream->source); 1576 } 1577 dispatch_set_context(stream->dq, NULL); 1578 dispatch_release(stream->dq); 1579 free(stream); 1580} 1581 1582static void 1583_dispatch_disk_init(dispatch_fd_entry_t fd_entry, dev_t dev) 1584{ 1585 // On devs lock queue 1586 dispatch_disk_t disk; 1587 // Check to see if there is an existing entry for the given device 1588 uintptr_t hash = DIO_HASH(dev); 1589 TAILQ_FOREACH(disk, &_dispatch_io_devs[hash], disk_list) { 1590 if (disk->dev == dev) { 1591 _dispatch_retain(disk); 1592 goto out; 1593 } 1594 } 1595 // Otherwise create a new entry 1596 size_t pending_reqs_depth = dispatch_io_defaults.max_pending_io_reqs; 1597 disk = _dispatch_alloc(DISPATCH_VTABLE(disk), 1598 sizeof(struct dispatch_disk_s) + 1599 (pending_reqs_depth * sizeof(dispatch_operation_t))); 1600 disk->do_next = DISPATCH_OBJECT_LISTLESS; 1601 disk->do_xref_cnt = -1; 1602 disk->advise_list_depth = pending_reqs_depth; 1603 disk->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT, 1604 false); 1605 disk->dev = dev; 1606 TAILQ_INIT(&disk->operations); 1607 disk->cur_rq = TAILQ_FIRST(&disk->operations); 1608 char label[45]; 1609 snprintf(label, sizeof(label), "com.apple.libdispatch-io.deviceq.%d", dev); 1610 disk->pick_queue = dispatch_queue_create(label, NULL); 1611 TAILQ_INSERT_TAIL(&_dispatch_io_devs[hash], disk, disk_list); 1612out: 1613 fd_entry->disk = disk; 1614 TAILQ_INIT(&fd_entry->stream_ops); 1615} 1616 1617void 1618_dispatch_disk_dispose(dispatch_disk_t disk) 1619{ 1620 uintptr_t hash = DIO_HASH(disk->dev); 1621 TAILQ_REMOVE(&_dispatch_io_devs[hash], disk, disk_list); 1622 dispatch_assert(TAILQ_EMPTY(&disk->operations)); 1623 size_t i; 1624 for (i=0; i<disk->advise_list_depth; ++i) { 1625 dispatch_assert(!disk->advise_list[i]); 1626 } 1627 dispatch_release(disk->pick_queue); 1628} 1629 1630#pragma mark - 1631#pragma mark dispatch_stream_operations/dispatch_disk_operations 1632 1633static inline bool 1634_dispatch_stream_operation_avail(dispatch_stream_t stream) 1635{ 1636 return !(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])) || 1637 !(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM])); 1638} 1639 1640static void 1641_dispatch_stream_enqueue_operation(dispatch_stream_t stream, 1642 dispatch_operation_t op, dispatch_data_t data) 1643{ 1644 if (!_dispatch_operation_should_enqueue(op, stream->dq, data)) { 1645 return; 1646 } 1647 _dispatch_object_debug(op, "%s", __func__); 1648 bool no_ops = !_dispatch_stream_operation_avail(stream); 1649 TAILQ_INSERT_TAIL(&stream->operations[op->params.type], op, operation_list); 1650 if (no_ops) { 1651 dispatch_async_f(stream->dq, stream->dq, 1652 _dispatch_stream_queue_handler); 1653 } 1654} 1655 1656static void 1657_dispatch_disk_enqueue_operation(dispatch_disk_t disk, dispatch_operation_t op, 1658 dispatch_data_t data) 1659{ 1660 if (!_dispatch_operation_should_enqueue(op, disk->pick_queue, data)) { 1661 return; 1662 } 1663 _dispatch_object_debug(op, "%s", __func__); 1664 if (op->params.type == DISPATCH_IO_STREAM) { 1665 if (TAILQ_EMPTY(&op->fd_entry->stream_ops)) { 1666 TAILQ_INSERT_TAIL(&disk->operations, op, operation_list); 1667 } 1668 TAILQ_INSERT_TAIL(&op->fd_entry->stream_ops, op, stream_list); 1669 } else { 1670 TAILQ_INSERT_TAIL(&disk->operations, op, operation_list); 1671 } 1672 _dispatch_disk_handler(disk); 1673} 1674 1675static void 1676_dispatch_stream_complete_operation(dispatch_stream_t stream, 1677 dispatch_operation_t op) 1678{ 1679 // On stream queue 1680 _dispatch_object_debug(op, "%s", __func__); 1681 _dispatch_fd_debug("complete operation", op->fd_entry->fd); 1682 TAILQ_REMOVE(&stream->operations[op->params.type], op, operation_list); 1683 if (op == stream->op) { 1684 stream->op = NULL; 1685 } 1686 if (op->timer) { 1687 dispatch_source_cancel(op->timer); 1688 } 1689 // Final release will deliver any pending data 1690 _dispatch_release(op); 1691} 1692 1693static void 1694_dispatch_disk_complete_operation(dispatch_disk_t disk, dispatch_operation_t op) 1695{ 1696 // On pick queue 1697 _dispatch_object_debug(op, "%s", __func__); 1698 _dispatch_fd_debug("complete operation", op->fd_entry->fd); 1699 // Current request is always the last op returned 1700 if (disk->cur_rq == op) { 1701 disk->cur_rq = TAILQ_PREV(op, dispatch_disk_operations_s, 1702 operation_list); 1703 } 1704 if (op->params.type == DISPATCH_IO_STREAM) { 1705 // Check if there are other pending stream operations behind it 1706 dispatch_operation_t op_next = TAILQ_NEXT(op, stream_list); 1707 TAILQ_REMOVE(&op->fd_entry->stream_ops, op, stream_list); 1708 if (op_next) { 1709 TAILQ_INSERT_TAIL(&disk->operations, op_next, operation_list); 1710 } 1711 } 1712 TAILQ_REMOVE(&disk->operations, op, operation_list); 1713 if (op->timer) { 1714 dispatch_source_cancel(op->timer); 1715 } 1716 // Final release will deliver any pending data 1717 _dispatch_release(op); 1718} 1719 1720static dispatch_operation_t 1721_dispatch_stream_pick_next_operation(dispatch_stream_t stream, 1722 dispatch_operation_t op) 1723{ 1724 // On stream queue 1725 if (!op) { 1726 // On the first run through, pick the first operation 1727 if (!_dispatch_stream_operation_avail(stream)) { 1728 return op; 1729 } 1730 if (!TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM])) { 1731 op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_STREAM]); 1732 } else if (!TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])) { 1733 op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]); 1734 } 1735 return op; 1736 } 1737 if (op->params.type == DISPATCH_IO_STREAM) { 1738 // Stream operations need to be serialized so continue the current 1739 // operation until it is finished 1740 return op; 1741 } 1742 // Get the next random operation (round-robin) 1743 if (op->params.type == DISPATCH_IO_RANDOM) { 1744 op = TAILQ_NEXT(op, operation_list); 1745 if (!op) { 1746 op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]); 1747 } 1748 return op; 1749 } 1750 return NULL; 1751} 1752 1753static dispatch_operation_t 1754_dispatch_disk_pick_next_operation(dispatch_disk_t disk) 1755{ 1756 // On pick queue 1757 dispatch_operation_t op; 1758 if (!TAILQ_EMPTY(&disk->operations)) { 1759 if (disk->cur_rq == NULL) { 1760 op = TAILQ_FIRST(&disk->operations); 1761 } else { 1762 op = disk->cur_rq; 1763 do { 1764 op = TAILQ_NEXT(op, operation_list); 1765 if (!op) { 1766 op = TAILQ_FIRST(&disk->operations); 1767 } 1768 // TODO: more involved picking algorithm rdar://problem/8780312 1769 } while (op->active && op != disk->cur_rq); 1770 } 1771 if (!op->active) { 1772 disk->cur_rq = op; 1773 return op; 1774 } 1775 } 1776 return NULL; 1777} 1778 1779static void 1780_dispatch_stream_cleanup_operations(dispatch_stream_t stream, 1781 dispatch_io_t channel) 1782{ 1783 // On stream queue 1784 dispatch_operation_t op, tmp; 1785 typeof(*stream->operations) *operations; 1786 operations = &stream->operations[DISPATCH_IO_RANDOM]; 1787 TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) { 1788 if (!channel || op->channel == channel) { 1789 _dispatch_stream_complete_operation(stream, op); 1790 } 1791 } 1792 operations = &stream->operations[DISPATCH_IO_STREAM]; 1793 TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) { 1794 if (!channel || op->channel == channel) { 1795 _dispatch_stream_complete_operation(stream, op); 1796 } 1797 } 1798 if (stream->source_running && !_dispatch_stream_operation_avail(stream)) { 1799 dispatch_suspend(stream->source); 1800 stream->source_running = false; 1801 } 1802} 1803 1804static void 1805_dispatch_disk_cleanup_operations(dispatch_disk_t disk, dispatch_io_t channel) 1806{ 1807 // On pick queue 1808 dispatch_operation_t op, tmp; 1809 TAILQ_FOREACH_SAFE(op, &disk->operations, operation_list, tmp) { 1810 if (!channel || op->channel == channel) { 1811 _dispatch_disk_complete_operation(disk, op); 1812 } 1813 } 1814} 1815 1816#pragma mark - 1817#pragma mark dispatch_stream_handler/dispatch_disk_handler 1818 1819static dispatch_source_t 1820_dispatch_stream_source(dispatch_stream_t stream, dispatch_operation_t op) 1821{ 1822 // On stream queue 1823 if (stream->source) { 1824 return stream->source; 1825 } 1826 dispatch_fd_t fd = op->fd_entry->fd; 1827 _dispatch_fd_debug("stream source create", fd); 1828 dispatch_source_t source = NULL; 1829 if (op->direction == DOP_DIR_READ) { 1830 source = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, 1831 (uintptr_t)fd, 0, stream->dq); 1832 } else if (op->direction == DOP_DIR_WRITE) { 1833 source = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, 1834 (uintptr_t)fd, 0, stream->dq); 1835 } else { 1836 dispatch_assert(op->direction < DOP_DIR_MAX); 1837 return NULL; 1838 } 1839 dispatch_set_context(source, stream); 1840 dispatch_source_set_event_handler_f(source, 1841 _dispatch_stream_source_handler); 1842 // Close queue must not run user cleanup handlers until sources are fully 1843 // unregistered 1844 dispatch_queue_t close_queue = op->fd_entry->close_queue; 1845 dispatch_source_set_cancel_handler(source, ^{ 1846 _dispatch_fd_debug("stream source cancel", fd); 1847 dispatch_resume(close_queue); 1848 }); 1849 stream->source = source; 1850 return stream->source; 1851} 1852 1853static void 1854_dispatch_stream_source_handler(void *ctx) 1855{ 1856 // On stream queue 1857 dispatch_stream_t stream = (dispatch_stream_t)ctx; 1858 dispatch_suspend(stream->source); 1859 stream->source_running = false; 1860 return _dispatch_stream_handler(stream); 1861} 1862 1863static void 1864_dispatch_stream_queue_handler(void *ctx) 1865{ 1866 // On stream queue 1867 dispatch_stream_t stream = (dispatch_stream_t)dispatch_get_context(ctx); 1868 if (!stream) { 1869 // _dispatch_stream_dispose has been called 1870 return; 1871 } 1872 return _dispatch_stream_handler(stream); 1873} 1874 1875static void 1876_dispatch_stream_handler(void *ctx) 1877{ 1878 // On stream queue 1879 dispatch_stream_t stream = (dispatch_stream_t)ctx; 1880 dispatch_operation_t op; 1881pick: 1882 op = _dispatch_stream_pick_next_operation(stream, stream->op); 1883 if (!op) { 1884 _dispatch_debug("no operation found: stream %p", stream); 1885 return; 1886 } 1887 int err = _dispatch_io_get_error(op, NULL, true); 1888 if (err) { 1889 op->err = err; 1890 _dispatch_stream_complete_operation(stream, op); 1891 goto pick; 1892 } 1893 stream->op = op; 1894 _dispatch_fd_debug("stream handler", op->fd_entry->fd); 1895 dispatch_fd_entry_t fd_entry = op->fd_entry; 1896 _dispatch_fd_entry_retain(fd_entry); 1897 // For performance analysis 1898 if (!op->total && dispatch_io_defaults.initial_delivery) { 1899 // Empty delivery to signal the start of the operation 1900 _dispatch_fd_debug("initial delivery", op->fd_entry->fd); 1901 _dispatch_operation_deliver_data(op, DOP_DELIVER); 1902 } 1903 // TODO: perform on the operation target queue to get correct priority 1904 int result = _dispatch_operation_perform(op); 1905 dispatch_op_flags_t flags = ~0u; 1906 switch (result) { 1907 case DISPATCH_OP_DELIVER: 1908 flags = DOP_DEFAULT; 1909 // Fall through 1910 case DISPATCH_OP_DELIVER_AND_COMPLETE: 1911 flags = (flags != DOP_DEFAULT) ? DOP_DELIVER | DOP_NO_EMPTY : 1912 DOP_DEFAULT; 1913 _dispatch_operation_deliver_data(op, flags); 1914 // Fall through 1915 case DISPATCH_OP_COMPLETE: 1916 if (flags != DOP_DEFAULT) { 1917 _dispatch_stream_complete_operation(stream, op); 1918 } 1919 if (_dispatch_stream_operation_avail(stream)) { 1920 dispatch_async_f(stream->dq, stream->dq, 1921 _dispatch_stream_queue_handler); 1922 } 1923 break; 1924 case DISPATCH_OP_COMPLETE_RESUME: 1925 _dispatch_stream_complete_operation(stream, op); 1926 // Fall through 1927 case DISPATCH_OP_RESUME: 1928 if (_dispatch_stream_operation_avail(stream)) { 1929 stream->source_running = true; 1930 dispatch_resume(_dispatch_stream_source(stream, op)); 1931 } 1932 break; 1933 case DISPATCH_OP_ERR: 1934 _dispatch_stream_cleanup_operations(stream, op->channel); 1935 break; 1936 case DISPATCH_OP_FD_ERR: 1937 _dispatch_fd_entry_retain(fd_entry); 1938 dispatch_async(fd_entry->barrier_queue, ^{ 1939 _dispatch_fd_entry_cleanup_operations(fd_entry, NULL); 1940 _dispatch_fd_entry_release(fd_entry); 1941 }); 1942 break; 1943 default: 1944 break; 1945 } 1946 _dispatch_fd_entry_release(fd_entry); 1947 return; 1948} 1949 1950static void 1951_dispatch_disk_handler(void *ctx) 1952{ 1953 // On pick queue 1954 dispatch_disk_t disk = (dispatch_disk_t)ctx; 1955 if (disk->io_active) { 1956 return; 1957 } 1958 _dispatch_fd_debug("disk handler", -1); 1959 dispatch_operation_t op; 1960 size_t i = disk->free_idx, j = disk->req_idx; 1961 if (j <= i) { 1962 j += disk->advise_list_depth; 1963 } 1964 while (i <= j) { 1965 if ((!disk->advise_list[i%disk->advise_list_depth]) && 1966 (op = _dispatch_disk_pick_next_operation(disk))) { 1967 int err = _dispatch_io_get_error(op, NULL, true); 1968 if (err) { 1969 op->err = err; 1970 _dispatch_disk_complete_operation(disk, op); 1971 continue; 1972 } 1973 _dispatch_retain(op); 1974 disk->advise_list[i%disk->advise_list_depth] = op; 1975 op->active = true; 1976 _dispatch_object_debug(op, "%s", __func__); 1977 } else { 1978 // No more operations to get 1979 break; 1980 } 1981 i++; 1982 } 1983 disk->free_idx = (i%disk->advise_list_depth); 1984 op = disk->advise_list[disk->req_idx]; 1985 if (op) { 1986 disk->io_active = true; 1987 dispatch_async_f(op->do_targetq, disk, _dispatch_disk_perform); 1988 } 1989} 1990 1991static void 1992_dispatch_disk_perform(void *ctxt) 1993{ 1994 dispatch_disk_t disk = ctxt; 1995 size_t chunk_size = dispatch_io_defaults.chunk_pages * PAGE_SIZE; 1996 _dispatch_fd_debug("disk perform", -1); 1997 dispatch_operation_t op; 1998 size_t i = disk->advise_idx, j = disk->free_idx; 1999 if (j <= i) { 2000 j += disk->advise_list_depth; 2001 } 2002 do { 2003 op = disk->advise_list[i%disk->advise_list_depth]; 2004 if (!op) { 2005 // Nothing more to advise, must be at free_idx 2006 dispatch_assert(i%disk->advise_list_depth == disk->free_idx); 2007 break; 2008 } 2009 if (op->direction == DOP_DIR_WRITE) { 2010 // TODO: preallocate writes ? rdar://problem/9032172 2011 continue; 2012 } 2013 if (op->fd_entry->fd == -1 && _dispatch_fd_entry_open(op->fd_entry, 2014 op->channel)) { 2015 continue; 2016 } 2017 // For performance analysis 2018 if (!op->total && dispatch_io_defaults.initial_delivery) { 2019 // Empty delivery to signal the start of the operation 2020 _dispatch_fd_debug("initial delivery", op->fd_entry->fd); 2021 _dispatch_operation_deliver_data(op, DOP_DELIVER); 2022 } 2023 // Advise two chunks if the list only has one element and this is the 2024 // first advise on the operation 2025 if ((j-i) == 1 && !disk->advise_list[disk->free_idx] && 2026 !op->advise_offset) { 2027 chunk_size *= 2; 2028 } 2029 _dispatch_operation_advise(op, chunk_size); 2030 } while (++i < j); 2031 disk->advise_idx = i%disk->advise_list_depth; 2032 op = disk->advise_list[disk->req_idx]; 2033 int result = _dispatch_operation_perform(op); 2034 disk->advise_list[disk->req_idx] = NULL; 2035 disk->req_idx = (++disk->req_idx)%disk->advise_list_depth; 2036 dispatch_async(disk->pick_queue, ^{ 2037 switch (result) { 2038 case DISPATCH_OP_DELIVER: 2039 _dispatch_operation_deliver_data(op, DOP_DEFAULT); 2040 break; 2041 case DISPATCH_OP_COMPLETE: 2042 _dispatch_disk_complete_operation(disk, op); 2043 break; 2044 case DISPATCH_OP_DELIVER_AND_COMPLETE: 2045 _dispatch_operation_deliver_data(op, DOP_DELIVER | DOP_NO_EMPTY); 2046 _dispatch_disk_complete_operation(disk, op); 2047 break; 2048 case DISPATCH_OP_ERR: 2049 _dispatch_disk_cleanup_operations(disk, op->channel); 2050 break; 2051 case DISPATCH_OP_FD_ERR: 2052 _dispatch_disk_cleanup_operations(disk, NULL); 2053 break; 2054 default: 2055 dispatch_assert(result); 2056 break; 2057 } 2058 op->active = false; 2059 disk->io_active = false; 2060 _dispatch_disk_handler(disk); 2061 // Balancing the retain in _dispatch_disk_handler. Note that op must be 2062 // released at the very end, since it might hold the last reference to 2063 // the disk 2064 _dispatch_release(op); 2065 }); 2066} 2067 2068#pragma mark - 2069#pragma mark dispatch_operation_perform 2070 2071static void 2072_dispatch_operation_advise(dispatch_operation_t op, size_t chunk_size) 2073{ 2074 int err; 2075 struct radvisory advise; 2076 // No point in issuing a read advise for the next chunk if we are already 2077 // a chunk ahead from reading the bytes 2078 if (op->advise_offset > (off_t)(((size_t)op->offset + op->total) + 2079 chunk_size + PAGE_SIZE)) { 2080 return; 2081 } 2082 _dispatch_object_debug(op, "%s", __func__); 2083 advise.ra_count = (int)chunk_size; 2084 if (!op->advise_offset) { 2085 op->advise_offset = op->offset; 2086 // If this is the first time through, align the advised range to a 2087 // page boundary 2088 size_t pg_fraction = ((size_t)op->offset + chunk_size) % PAGE_SIZE; 2089 advise.ra_count += (int)(pg_fraction ? PAGE_SIZE - pg_fraction : 0); 2090 } 2091 advise.ra_offset = op->advise_offset; 2092 op->advise_offset += advise.ra_count; 2093 _dispatch_io_syscall_switch(err, 2094 fcntl(op->fd_entry->fd, F_RDADVISE, &advise), 2095 case EFBIG: break; // advised past the end of the file rdar://10415691 2096 case ENOTSUP: break; // not all FS support radvise rdar://13484629 2097 // TODO: set disk status on error 2098 default: (void)dispatch_assume_zero(err); break; 2099 ); 2100} 2101 2102static int 2103_dispatch_operation_perform(dispatch_operation_t op) 2104{ 2105 int err = _dispatch_io_get_error(op, NULL, true); 2106 if (err) { 2107 goto error; 2108 } 2109 _dispatch_object_debug(op, "%s", __func__); 2110 if (!op->buf) { 2111 size_t max_buf_siz = op->params.high; 2112 size_t chunk_siz = dispatch_io_defaults.chunk_pages * PAGE_SIZE; 2113 if (op->direction == DOP_DIR_READ) { 2114 // If necessary, create a buffer for the ongoing operation, large 2115 // enough to fit chunk_pages but at most high-water 2116 size_t data_siz = dispatch_data_get_size(op->data); 2117 if (data_siz) { 2118 dispatch_assert(data_siz < max_buf_siz); 2119 max_buf_siz -= data_siz; 2120 } 2121 if (max_buf_siz > chunk_siz) { 2122 max_buf_siz = chunk_siz; 2123 } 2124 if (op->length < SIZE_MAX) { 2125 op->buf_siz = op->length - op->total; 2126 if (op->buf_siz > max_buf_siz) { 2127 op->buf_siz = max_buf_siz; 2128 } 2129 } else { 2130 op->buf_siz = max_buf_siz; 2131 } 2132 op->buf = valloc(op->buf_siz); 2133 _dispatch_fd_debug("buffer allocated", op->fd_entry->fd); 2134 } else if (op->direction == DOP_DIR_WRITE) { 2135 // Always write the first data piece, if that is smaller than a 2136 // chunk, accumulate further data pieces until chunk size is reached 2137 if (chunk_siz > max_buf_siz) { 2138 chunk_siz = max_buf_siz; 2139 } 2140 op->buf_siz = 0; 2141 dispatch_data_apply(op->data, 2142 ^(dispatch_data_t region DISPATCH_UNUSED, 2143 size_t offset DISPATCH_UNUSED, 2144 const void* buf DISPATCH_UNUSED, size_t len) { 2145 size_t siz = op->buf_siz + len; 2146 if (!op->buf_siz || siz <= chunk_siz) { 2147 op->buf_siz = siz; 2148 } 2149 return (bool)(siz < chunk_siz); 2150 }); 2151 if (op->buf_siz > max_buf_siz) { 2152 op->buf_siz = max_buf_siz; 2153 } 2154 dispatch_data_t d; 2155 d = dispatch_data_create_subrange(op->data, 0, op->buf_siz); 2156 op->buf_data = dispatch_data_create_map(d, (const void**)&op->buf, 2157 NULL); 2158 _dispatch_io_data_release(d); 2159 _dispatch_fd_debug("buffer mapped", op->fd_entry->fd); 2160 } 2161 } 2162 if (op->fd_entry->fd == -1) { 2163 err = _dispatch_fd_entry_open(op->fd_entry, op->channel); 2164 if (err) { 2165 goto error; 2166 } 2167 } 2168 void *buf = op->buf + op->buf_len; 2169 size_t len = op->buf_siz - op->buf_len; 2170 off_t off = (off_t)((size_t)op->offset + op->total); 2171 ssize_t processed = -1; 2172syscall: 2173 if (op->direction == DOP_DIR_READ) { 2174 if (op->params.type == DISPATCH_IO_STREAM) { 2175 processed = read(op->fd_entry->fd, buf, len); 2176 } else if (op->params.type == DISPATCH_IO_RANDOM) { 2177 processed = pread(op->fd_entry->fd, buf, len, off); 2178 } 2179 } else if (op->direction == DOP_DIR_WRITE) { 2180 if (op->params.type == DISPATCH_IO_STREAM) { 2181 processed = write(op->fd_entry->fd, buf, len); 2182 } else if (op->params.type == DISPATCH_IO_RANDOM) { 2183 processed = pwrite(op->fd_entry->fd, buf, len, off); 2184 } 2185 } 2186 // Encountered an error on the file descriptor 2187 if (processed == -1) { 2188 err = errno; 2189 if (err == EINTR) { 2190 goto syscall; 2191 } 2192 goto error; 2193 } 2194 // EOF is indicated by two handler invocations 2195 if (processed == 0) { 2196 _dispatch_fd_debug("EOF", op->fd_entry->fd); 2197 return DISPATCH_OP_DELIVER_AND_COMPLETE; 2198 } 2199 op->buf_len += (size_t)processed; 2200 op->total += (size_t)processed; 2201 if (op->total == op->length) { 2202 // Finished processing all the bytes requested by the operation 2203 return DISPATCH_OP_COMPLETE; 2204 } else { 2205 // Deliver data only if we satisfy the filters 2206 return DISPATCH_OP_DELIVER; 2207 } 2208error: 2209 if (err == EAGAIN) { 2210 // For disk based files with blocking I/O we should never get EAGAIN 2211 dispatch_assert(!op->fd_entry->disk); 2212 _dispatch_fd_debug("EAGAIN %d", op->fd_entry->fd, err); 2213 if (op->direction == DOP_DIR_READ && op->total && 2214 op->channel == op->fd_entry->convenience_channel) { 2215 // Convenience read with available data completes on EAGAIN 2216 return DISPATCH_OP_COMPLETE_RESUME; 2217 } 2218 return DISPATCH_OP_RESUME; 2219 } 2220 op->err = err; 2221 switch (err) { 2222 case ECANCELED: 2223 return DISPATCH_OP_ERR; 2224 case EBADF: 2225 (void)dispatch_atomic_cmpxchg2o(op->fd_entry, err, 0, err, relaxed); 2226 return DISPATCH_OP_FD_ERR; 2227 default: 2228 return DISPATCH_OP_COMPLETE; 2229 } 2230} 2231 2232static void 2233_dispatch_operation_deliver_data(dispatch_operation_t op, 2234 dispatch_op_flags_t flags) 2235{ 2236 // Either called from stream resp. pick queue or when op is finalized 2237 dispatch_data_t data = NULL; 2238 int err = 0; 2239 size_t undelivered = op->undelivered + op->buf_len; 2240 bool deliver = (flags & (DOP_DELIVER|DOP_DONE)) || 2241 (op->flags & DOP_DELIVER); 2242 op->flags = DOP_DEFAULT; 2243 if (!deliver) { 2244 // Don't deliver data until low water mark has been reached 2245 if (undelivered >= op->params.low) { 2246 deliver = true; 2247 } else if (op->buf_len < op->buf_siz) { 2248 // Request buffer is not yet used up 2249 _dispatch_fd_debug("buffer data", op->fd_entry->fd); 2250 return; 2251 } 2252 } else { 2253 err = op->err; 2254 if (!err && (op->channel->atomic_flags & DIO_STOPPED)) { 2255 err = ECANCELED; 2256 op->err = err; 2257 } 2258 } 2259 // Deliver data or buffer used up 2260 if (op->direction == DOP_DIR_READ) { 2261 if (op->buf_len) { 2262 void *buf = op->buf; 2263 data = dispatch_data_create(buf, op->buf_len, NULL, 2264 DISPATCH_DATA_DESTRUCTOR_FREE); 2265 op->buf = NULL; 2266 op->buf_len = 0; 2267 dispatch_data_t d = dispatch_data_create_concat(op->data, data); 2268 _dispatch_io_data_release(op->data); 2269 _dispatch_io_data_release(data); 2270 data = d; 2271 } else { 2272 data = op->data; 2273 } 2274 op->data = deliver ? dispatch_data_empty : data; 2275 } else if (op->direction == DOP_DIR_WRITE) { 2276 if (deliver) { 2277 data = dispatch_data_create_subrange(op->data, op->buf_len, 2278 op->length); 2279 } 2280 if (op->buf_data && op->buf_len == op->buf_siz) { 2281 _dispatch_io_data_release(op->buf_data); 2282 op->buf_data = NULL; 2283 op->buf = NULL; 2284 op->buf_len = 0; 2285 // Trim newly written buffer from head of unwritten data 2286 dispatch_data_t d; 2287 if (deliver) { 2288 _dispatch_io_data_retain(data); 2289 d = data; 2290 } else { 2291 d = dispatch_data_create_subrange(op->data, op->buf_siz, 2292 op->length); 2293 } 2294 _dispatch_io_data_release(op->data); 2295 op->data = d; 2296 } 2297 } else { 2298 dispatch_assert(op->direction < DOP_DIR_MAX); 2299 return; 2300 } 2301 if (!deliver || ((flags & DOP_NO_EMPTY) && !dispatch_data_get_size(data))) { 2302 op->undelivered = undelivered; 2303 _dispatch_fd_debug("buffer data", op->fd_entry->fd); 2304 return; 2305 } 2306 op->undelivered = 0; 2307 _dispatch_object_debug(op, "%s", __func__); 2308 _dispatch_fd_debug("deliver data", op->fd_entry->fd); 2309 dispatch_op_direction_t direction = op->direction; 2310 dispatch_io_handler_t handler = op->handler; 2311#if DISPATCH_IO_DEBUG 2312 int fd = op->fd_entry->fd; 2313#endif 2314 dispatch_fd_entry_t fd_entry = op->fd_entry; 2315 _dispatch_fd_entry_retain(fd_entry); 2316 dispatch_io_t channel = op->channel; 2317 _dispatch_retain(channel); 2318 // Note that data delivery may occur after the operation is freed 2319 dispatch_async(op->op_q, ^{ 2320 bool done = (flags & DOP_DONE); 2321 dispatch_data_t d = data; 2322 if (done) { 2323 if (direction == DOP_DIR_READ && err) { 2324 if (dispatch_data_get_size(d)) { 2325 _dispatch_fd_debug("IO handler invoke", fd); 2326 handler(false, d, 0); 2327 } 2328 d = NULL; 2329 } else if (direction == DOP_DIR_WRITE && !err) { 2330 d = NULL; 2331 } 2332 } 2333 _dispatch_fd_debug("IO handler invoke", fd); 2334 handler(done, d, err); 2335 _dispatch_release(channel); 2336 _dispatch_fd_entry_release(fd_entry); 2337 _dispatch_io_data_release(data); 2338 }); 2339} 2340 2341#pragma mark - 2342#pragma mark dispatch_io_debug 2343 2344static size_t 2345_dispatch_io_debug_attr(dispatch_io_t channel, char* buf, size_t bufsiz) 2346{ 2347 dispatch_queue_t target = channel->do_targetq; 2348 return dsnprintf(buf, bufsiz, "type = %s, fd = 0x%x, %sfd_entry = %p, " 2349 "queue = %p, target = %s[%p], barrier_queue = %p, barrier_group = " 2350 "%p, err = 0x%x, low = 0x%zx, high = 0x%zx, interval%s = %llu ", 2351 channel->params.type == DISPATCH_IO_STREAM ? "stream" : "random", 2352 channel->fd_actual, channel->atomic_flags & DIO_STOPPED ? 2353 "stopped, " : channel->atomic_flags & DIO_CLOSED ? "closed, " : "", 2354 channel->fd_entry, channel->queue, target && target->dq_label ? 2355 target->dq_label : "", target, channel->barrier_queue, 2356 channel->barrier_group, channel->err, channel->params.low, 2357 channel->params.high, channel->params.interval_flags & 2358 DISPATCH_IO_STRICT_INTERVAL ? "(strict)" : "", 2359 channel->params.interval); 2360} 2361 2362size_t 2363_dispatch_io_debug(dispatch_io_t channel, char* buf, size_t bufsiz) 2364{ 2365 size_t offset = 0; 2366 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ", 2367 dx_kind(channel), channel); 2368 offset += _dispatch_object_debug_attr(channel, &buf[offset], 2369 bufsiz - offset); 2370 offset += _dispatch_io_debug_attr(channel, &buf[offset], bufsiz - offset); 2371 offset += dsnprintf(&buf[offset], bufsiz - offset, "}"); 2372 return offset; 2373} 2374 2375static size_t 2376_dispatch_operation_debug_attr(dispatch_operation_t op, char* buf, 2377 size_t bufsiz) 2378{ 2379 dispatch_queue_t target = op->do_targetq; 2380 dispatch_queue_t oqtarget = op->op_q ? op->op_q->do_targetq : NULL; 2381 return dsnprintf(buf, bufsiz, "type = %s %s, fd = 0x%x, fd_entry = %p, " 2382 "channel = %p, queue = %p -> %s[%p], target = %s[%p], " 2383 "offset = %lld, length = %zu, done = %zu, undelivered = %zu, " 2384 "flags = %u, err = 0x%x, low = 0x%zx, high = 0x%zx, " 2385 "interval%s = %llu ", op->params.type == DISPATCH_IO_STREAM ? 2386 "stream" : "random", op->direction == DOP_DIR_READ ? "read" : 2387 "write", op->fd_entry ? op->fd_entry->fd : -1, op->fd_entry, 2388 op->channel, op->op_q, oqtarget && oqtarget->dq_label ? 2389 oqtarget->dq_label : "", oqtarget, target && target->dq_label ? 2390 target->dq_label : "", target, op->offset, op->length, op->total, 2391 op->undelivered + op->buf_len, op->flags, op->err, op->params.low, 2392 op->params.high, op->params.interval_flags & 2393 DISPATCH_IO_STRICT_INTERVAL ? "(strict)" : "", op->params.interval); 2394} 2395 2396size_t 2397_dispatch_operation_debug(dispatch_operation_t op, char* buf, size_t bufsiz) 2398{ 2399 size_t offset = 0; 2400 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ", 2401 dx_kind(op), op); 2402 offset += _dispatch_object_debug_attr(op, &buf[offset], bufsiz - offset); 2403 offset += _dispatch_operation_debug_attr(op, &buf[offset], bufsiz - offset); 2404 offset += dsnprintf(&buf[offset], bufsiz - offset, "}"); 2405 return offset; 2406} 2407