sys_pipe.c revision 91395
1/* 2 * Copyright (c) 1996 John S. Dyson 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice immediately at the beginning of the file, without modification, 10 * this list of conditions, and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 3. Absolutely no warranty of function or purpose is made by the author 15 * John S. Dyson. 16 * 4. Modifications may be freely made to this file if the above conditions 17 * are met. 18 * 19 * $FreeBSD: head/sys/kern/sys_pipe.c 91395 2002-02-27 17:23:16Z alfred $ 20 */ 21 22/* 23 * This file contains a high-performance replacement for the socket-based 24 * pipes scheme originally used in FreeBSD/4.4Lite. It does not support 25 * all features of sockets, but does do everything that pipes normally 26 * do. 27 */ 28 29/* 30 * This code has two modes of operation, a small write mode and a large 31 * write mode. The small write mode acts like conventional pipes with 32 * a kernel buffer. If the buffer is less than PIPE_MINDIRECT, then the 33 * "normal" pipe buffering is done. If the buffer is between PIPE_MINDIRECT 34 * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and 35 * the receiving process can copy it directly from the pages in the sending 36 * process. 37 * 38 * If the sending process receives a signal, it is possible that it will 39 * go away, and certainly its address space can change, because control 40 * is returned back to the user-mode side. In that case, the pipe code 41 * arranges to copy the buffer supplied by the user process, to a pageable 42 * kernel buffer, and the receiving process will grab the data from the 43 * pageable kernel buffer. Since signals don't happen all that often, 44 * the copy operation is normally eliminated. 45 * 46 * The constant PIPE_MINDIRECT is chosen to make sure that buffering will 47 * happen for small transfers so that the system will not spend all of 48 * its time context switching. PIPE_SIZE is constrained by the 49 * amount of kernel virtual memory. 50 */ 51 52#include <sys/param.h> 53#include <sys/systm.h> 54#include <sys/fcntl.h> 55#include <sys/file.h> 56#include <sys/filedesc.h> 57#include <sys/filio.h> 58#include <sys/kernel.h> 59#include <sys/lock.h> 60#include <sys/mutex.h> 61#include <sys/ttycom.h> 62#include <sys/stat.h> 63#include <sys/poll.h> 64#include <sys/selinfo.h> 65#include <sys/signalvar.h> 66#include <sys/sysproto.h> 67#include <sys/pipe.h> 68#include <sys/proc.h> 69#include <sys/vnode.h> 70#include <sys/uio.h> 71#include <sys/event.h> 72 73#include <vm/vm.h> 74#include <vm/vm_param.h> 75#include <vm/vm_object.h> 76#include <vm/vm_kern.h> 77#include <vm/vm_extern.h> 78#include <vm/pmap.h> 79#include <vm/vm_map.h> 80#include <vm/vm_page.h> 81#include <vm/vm_zone.h> 82 83/* 84 * Use this define if you want to disable *fancy* VM things. Expect an 85 * approx 30% decrease in transfer rate. This could be useful for 86 * NetBSD or OpenBSD. 87 */ 88/* #define PIPE_NODIRECT */ 89 90/* 91 * interfaces to the outside world 92 */ 93static int pipe_read __P((struct file *fp, struct uio *uio, 94 struct ucred *cred, int flags, struct thread *td)); 95static int pipe_write __P((struct file *fp, struct uio *uio, 96 struct ucred *cred, int flags, struct thread *td)); 97static int pipe_close __P((struct file *fp, struct thread *td)); 98static int pipe_poll __P((struct file *fp, int events, struct ucred *cred, 99 struct thread *td)); 100static int pipe_kqfilter __P((struct file *fp, struct knote *kn)); 101static int pipe_stat __P((struct file *fp, struct stat *sb, struct thread *td)); 102static int pipe_ioctl __P((struct file *fp, u_long cmd, caddr_t data, struct thread *td)); 103 104static struct fileops pipeops = { 105 pipe_read, pipe_write, pipe_ioctl, pipe_poll, pipe_kqfilter, 106 pipe_stat, pipe_close 107}; 108 109static void filt_pipedetach(struct knote *kn); 110static int filt_piperead(struct knote *kn, long hint); 111static int filt_pipewrite(struct knote *kn, long hint); 112 113static struct filterops pipe_rfiltops = 114 { 1, NULL, filt_pipedetach, filt_piperead }; 115static struct filterops pipe_wfiltops = 116 { 1, NULL, filt_pipedetach, filt_pipewrite }; 117 118#define PIPE_GET_GIANT(pipe) \ 119 do { \ 120 PIPE_UNLOCK(wpipe); \ 121 mtx_lock(&Giant); \ 122 } while (0) 123 124#define PIPE_DROP_GIANT(pipe) \ 125 do { \ 126 mtx_unlock(&Giant); \ 127 PIPE_LOCK(wpipe); \ 128 } while (0) 129 130/* 131 * Default pipe buffer size(s), this can be kind-of large now because pipe 132 * space is pageable. The pipe code will try to maintain locality of 133 * reference for performance reasons, so small amounts of outstanding I/O 134 * will not wipe the cache. 135 */ 136#define MINPIPESIZE (PIPE_SIZE/3) 137#define MAXPIPESIZE (2*PIPE_SIZE/3) 138 139/* 140 * Maximum amount of kva for pipes -- this is kind-of a soft limit, but 141 * is there so that on large systems, we don't exhaust it. 142 */ 143#define MAXPIPEKVA (8*1024*1024) 144 145/* 146 * Limit for direct transfers, we cannot, of course limit 147 * the amount of kva for pipes in general though. 148 */ 149#define LIMITPIPEKVA (16*1024*1024) 150 151/* 152 * Limit the number of "big" pipes 153 */ 154#define LIMITBIGPIPES 32 155static int nbigpipe; 156 157static int amountpipekva; 158 159static void pipeinit __P((void *dummy __unused)); 160static void pipeclose __P((struct pipe *cpipe)); 161static void pipe_free_kmem __P((struct pipe *cpipe)); 162static int pipe_create __P((struct pipe **cpipep)); 163static __inline int pipelock __P((struct pipe *cpipe, int catch)); 164static __inline void pipeunlock __P((struct pipe *cpipe)); 165static __inline void pipeselwakeup __P((struct pipe *cpipe)); 166#ifndef PIPE_NODIRECT 167static int pipe_build_write_buffer __P((struct pipe *wpipe, struct uio *uio)); 168static void pipe_destroy_write_buffer __P((struct pipe *wpipe)); 169static int pipe_direct_write __P((struct pipe *wpipe, struct uio *uio)); 170static void pipe_clone_write_buffer __P((struct pipe *wpipe)); 171#endif 172static int pipespace __P((struct pipe *cpipe, int size)); 173 174static vm_zone_t pipe_zone; 175 176SYSINIT(vfs, SI_SUB_VFS, SI_ORDER_ANY, pipeinit, NULL); 177 178static void 179pipeinit(void *dummy __unused) 180{ 181 182 pipe_zone = zinit("PIPE", sizeof(struct pipe), 0, 0, 4); 183} 184 185/* 186 * The pipe system call for the DTYPE_PIPE type of pipes 187 */ 188 189/* ARGSUSED */ 190int 191pipe(td, uap) 192 struct thread *td; 193 struct pipe_args /* { 194 int dummy; 195 } */ *uap; 196{ 197 struct filedesc *fdp = td->td_proc->p_fd; 198 struct file *rf, *wf; 199 struct pipe *rpipe, *wpipe; 200 int fd, error; 201 202 KASSERT(pipe_zone != NULL, ("pipe_zone not initialized")); 203 204 rpipe = wpipe = NULL; 205 if (pipe_create(&rpipe) || pipe_create(&wpipe)) { 206 pipeclose(rpipe); 207 pipeclose(wpipe); 208 return (ENFILE); 209 } 210 211 rpipe->pipe_state |= PIPE_DIRECTOK; 212 wpipe->pipe_state |= PIPE_DIRECTOK; 213 214 error = falloc(td, &rf, &fd); 215 if (error) { 216 pipeclose(rpipe); 217 pipeclose(wpipe); 218 return (error); 219 } 220 fhold(rf); 221 td->td_retval[0] = fd; 222 223 /* 224 * Warning: once we've gotten past allocation of the fd for the 225 * read-side, we can only drop the read side via fdrop() in order 226 * to avoid races against processes which manage to dup() the read 227 * side while we are blocked trying to allocate the write side. 228 */ 229 FILE_LOCK(rf); 230 rf->f_flag = FREAD | FWRITE; 231 rf->f_type = DTYPE_PIPE; 232 rf->f_data = (caddr_t)rpipe; 233 rf->f_ops = &pipeops; 234 FILE_UNLOCK(rf); 235 error = falloc(td, &wf, &fd); 236 if (error) { 237 FILEDESC_LOCK(fdp); 238 if (fdp->fd_ofiles[td->td_retval[0]] == rf) { 239 fdp->fd_ofiles[td->td_retval[0]] = NULL; 240 FILEDESC_UNLOCK(fdp); 241 fdrop(rf, td); 242 } else 243 FILEDESC_UNLOCK(fdp); 244 fdrop(rf, td); 245 /* rpipe has been closed by fdrop(). */ 246 pipeclose(wpipe); 247 return (error); 248 } 249 FILE_LOCK(wf); 250 wf->f_flag = FREAD | FWRITE; 251 wf->f_type = DTYPE_PIPE; 252 wf->f_data = (caddr_t)wpipe; 253 wf->f_ops = &pipeops; 254 FILE_UNLOCK(wf); 255 td->td_retval[1] = fd; 256 rpipe->pipe_peer = wpipe; 257 wpipe->pipe_peer = rpipe; 258 rpipe->pipe_mtxp = wpipe->pipe_mtxp = mtx_pool_alloc(); 259 fdrop(rf, td); 260 261 return (0); 262} 263 264/* 265 * Allocate kva for pipe circular buffer, the space is pageable 266 * This routine will 'realloc' the size of a pipe safely, if it fails 267 * it will retain the old buffer. 268 * If it fails it will return ENOMEM. 269 */ 270static int 271pipespace(cpipe, size) 272 struct pipe *cpipe; 273 int size; 274{ 275 struct vm_object *object; 276 caddr_t buffer; 277 int npages, error; 278 279 GIANT_REQUIRED; 280 281 npages = round_page(size)/PAGE_SIZE; 282 /* 283 * Create an object, I don't like the idea of paging to/from 284 * kernel_object. 285 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems. 286 */ 287 object = vm_object_allocate(OBJT_DEFAULT, npages); 288 buffer = (caddr_t) vm_map_min(kernel_map); 289 290 /* 291 * Insert the object into the kernel map, and allocate kva for it. 292 * The map entry is, by default, pageable. 293 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems. 294 */ 295 error = vm_map_find(kernel_map, object, 0, 296 (vm_offset_t *) &buffer, size, 1, 297 VM_PROT_ALL, VM_PROT_ALL, 0); 298 299 if (error != KERN_SUCCESS) { 300 vm_object_deallocate(object); 301 return (ENOMEM); 302 } 303 304 /* free old resources if we're resizing */ 305 pipe_free_kmem(cpipe); 306 cpipe->pipe_buffer.object = object; 307 cpipe->pipe_buffer.buffer = buffer; 308 cpipe->pipe_buffer.size = size; 309 cpipe->pipe_buffer.in = 0; 310 cpipe->pipe_buffer.out = 0; 311 cpipe->pipe_buffer.cnt = 0; 312 amountpipekva += cpipe->pipe_buffer.size; 313 return (0); 314} 315 316/* 317 * initialize and allocate VM and memory for pipe 318 */ 319static int 320pipe_create(cpipep) 321 struct pipe **cpipep; 322{ 323 struct pipe *cpipe; 324 int error; 325 326 *cpipep = zalloc(pipe_zone); 327 if (*cpipep == NULL) 328 return (ENOMEM); 329 330 cpipe = *cpipep; 331 332 /* so pipespace()->pipe_free_kmem() doesn't follow junk pointer */ 333 cpipe->pipe_buffer.object = NULL; 334#ifndef PIPE_NODIRECT 335 cpipe->pipe_map.kva = NULL; 336#endif 337 /* 338 * protect so pipeclose() doesn't follow a junk pointer 339 * if pipespace() fails. 340 */ 341 bzero(&cpipe->pipe_sel, sizeof(cpipe->pipe_sel)); 342 cpipe->pipe_state = 0; 343 cpipe->pipe_peer = NULL; 344 cpipe->pipe_busy = 0; 345 346#ifndef PIPE_NODIRECT 347 /* 348 * pipe data structure initializations to support direct pipe I/O 349 */ 350 cpipe->pipe_map.cnt = 0; 351 cpipe->pipe_map.kva = 0; 352 cpipe->pipe_map.pos = 0; 353 cpipe->pipe_map.npages = 0; 354 /* cpipe->pipe_map.ms[] = invalid */ 355#endif 356 357 error = pipespace(cpipe, PIPE_SIZE); 358 if (error) 359 return (error); 360 361 vfs_timestamp(&cpipe->pipe_ctime); 362 cpipe->pipe_atime = cpipe->pipe_ctime; 363 cpipe->pipe_mtime = cpipe->pipe_ctime; 364 365 return (0); 366} 367 368 369/* 370 * lock a pipe for I/O, blocking other access 371 */ 372static __inline int 373pipelock(cpipe, catch) 374 struct pipe *cpipe; 375 int catch; 376{ 377 int error; 378 379 PIPE_LOCK_ASSERT(cpipe, MA_OWNED); 380 while (cpipe->pipe_state & PIPE_LOCKFL) { 381 cpipe->pipe_state |= PIPE_LWANT; 382 error = msleep(cpipe, PIPE_MTX(cpipe), 383 catch ? (PRIBIO | PCATCH) : PRIBIO, 384 "pipelk", 0); 385 if (error != 0) 386 return (error); 387 } 388 cpipe->pipe_state |= PIPE_LOCKFL; 389 return (0); 390} 391 392/* 393 * unlock a pipe I/O lock 394 */ 395static __inline void 396pipeunlock(cpipe) 397 struct pipe *cpipe; 398{ 399 400 PIPE_LOCK_ASSERT(cpipe, MA_OWNED); 401 cpipe->pipe_state &= ~PIPE_LOCKFL; 402 if (cpipe->pipe_state & PIPE_LWANT) { 403 cpipe->pipe_state &= ~PIPE_LWANT; 404 wakeup(cpipe); 405 } 406} 407 408static __inline void 409pipeselwakeup(cpipe) 410 struct pipe *cpipe; 411{ 412 413 if (cpipe->pipe_state & PIPE_SEL) { 414 cpipe->pipe_state &= ~PIPE_SEL; 415 selwakeup(&cpipe->pipe_sel); 416 } 417 if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio) 418 pgsigio(cpipe->pipe_sigio, SIGIO, 0); 419 KNOTE(&cpipe->pipe_sel.si_note, 0); 420} 421 422/* ARGSUSED */ 423static int 424pipe_read(fp, uio, cred, flags, td) 425 struct file *fp; 426 struct uio *uio; 427 struct ucred *cred; 428 struct thread *td; 429 int flags; 430{ 431 struct pipe *rpipe = (struct pipe *) fp->f_data; 432 int error; 433 int nread = 0; 434 u_int size; 435 436 PIPE_LOCK(rpipe); 437 ++rpipe->pipe_busy; 438 error = pipelock(rpipe, 1); 439 if (error) 440 goto unlocked_error; 441 442 while (uio->uio_resid) { 443 /* 444 * normal pipe buffer receive 445 */ 446 if (rpipe->pipe_buffer.cnt > 0) { 447 size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out; 448 if (size > rpipe->pipe_buffer.cnt) 449 size = rpipe->pipe_buffer.cnt; 450 if (size > (u_int) uio->uio_resid) 451 size = (u_int) uio->uio_resid; 452 453 PIPE_UNLOCK(rpipe); 454 error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out], 455 size, uio); 456 PIPE_LOCK(rpipe); 457 if (error) 458 break; 459 460 rpipe->pipe_buffer.out += size; 461 if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size) 462 rpipe->pipe_buffer.out = 0; 463 464 rpipe->pipe_buffer.cnt -= size; 465 466 /* 467 * If there is no more to read in the pipe, reset 468 * its pointers to the beginning. This improves 469 * cache hit stats. 470 */ 471 if (rpipe->pipe_buffer.cnt == 0) { 472 rpipe->pipe_buffer.in = 0; 473 rpipe->pipe_buffer.out = 0; 474 } 475 nread += size; 476#ifndef PIPE_NODIRECT 477 /* 478 * Direct copy, bypassing a kernel buffer. 479 */ 480 } else if ((size = rpipe->pipe_map.cnt) && 481 (rpipe->pipe_state & PIPE_DIRECTW)) { 482 caddr_t va; 483 if (size > (u_int) uio->uio_resid) 484 size = (u_int) uio->uio_resid; 485 486 va = (caddr_t) rpipe->pipe_map.kva + 487 rpipe->pipe_map.pos; 488 PIPE_UNLOCK(rpipe); 489 error = uiomove(va, size, uio); 490 PIPE_LOCK(rpipe); 491 if (error) 492 break; 493 nread += size; 494 rpipe->pipe_map.pos += size; 495 rpipe->pipe_map.cnt -= size; 496 if (rpipe->pipe_map.cnt == 0) { 497 rpipe->pipe_state &= ~PIPE_DIRECTW; 498 wakeup(rpipe); 499 } 500#endif 501 } else { 502 /* 503 * detect EOF condition 504 * read returns 0 on EOF, no need to set error 505 */ 506 if (rpipe->pipe_state & PIPE_EOF) 507 break; 508 509 /* 510 * If the "write-side" has been blocked, wake it up now. 511 */ 512 if (rpipe->pipe_state & PIPE_WANTW) { 513 rpipe->pipe_state &= ~PIPE_WANTW; 514 wakeup(rpipe); 515 } 516 517 /* 518 * Break if some data was read. 519 */ 520 if (nread > 0) 521 break; 522 523 /* 524 * Unlock the pipe buffer for our remaining processing. We 525 * will either break out with an error or we will sleep and 526 * relock to loop. 527 */ 528 pipeunlock(rpipe); 529 530 /* 531 * Handle non-blocking mode operation or 532 * wait for more data. 533 */ 534 if (fp->f_flag & FNONBLOCK) { 535 error = EAGAIN; 536 } else { 537 rpipe->pipe_state |= PIPE_WANTR; 538 if ((error = msleep(rpipe, PIPE_MTX(rpipe), 539 PRIBIO | PCATCH, 540 "piperd", 0)) == 0) 541 error = pipelock(rpipe, 1); 542 } 543 if (error) 544 goto unlocked_error; 545 } 546 } 547 pipeunlock(rpipe); 548 549 /* XXX: should probably do this before getting any locks. */ 550 if (error == 0) 551 vfs_timestamp(&rpipe->pipe_atime); 552unlocked_error: 553 --rpipe->pipe_busy; 554 555 /* 556 * PIPE_WANT processing only makes sense if pipe_busy is 0. 557 */ 558 if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) { 559 rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW); 560 wakeup(rpipe); 561 } else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) { 562 /* 563 * Handle write blocking hysteresis. 564 */ 565 if (rpipe->pipe_state & PIPE_WANTW) { 566 rpipe->pipe_state &= ~PIPE_WANTW; 567 wakeup(rpipe); 568 } 569 } 570 571 if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF) 572 pipeselwakeup(rpipe); 573 574 PIPE_UNLOCK(rpipe); 575 return (error); 576} 577 578#ifndef PIPE_NODIRECT 579/* 580 * Map the sending processes' buffer into kernel space and wire it. 581 * This is similar to a physical write operation. 582 */ 583static int 584pipe_build_write_buffer(wpipe, uio) 585 struct pipe *wpipe; 586 struct uio *uio; 587{ 588 u_int size; 589 int i; 590 vm_offset_t addr, endaddr, paddr; 591 592 GIANT_REQUIRED; 593 594 size = (u_int) uio->uio_iov->iov_len; 595 if (size > wpipe->pipe_buffer.size) 596 size = wpipe->pipe_buffer.size; 597 598 endaddr = round_page((vm_offset_t)uio->uio_iov->iov_base + size); 599 addr = trunc_page((vm_offset_t)uio->uio_iov->iov_base); 600 for (i = 0; addr < endaddr; addr += PAGE_SIZE, i++) { 601 vm_page_t m; 602 603 if (vm_fault_quick((caddr_t)addr, VM_PROT_READ) < 0 || 604 (paddr = pmap_kextract(addr)) == 0) { 605 int j; 606 607 for (j = 0; j < i; j++) 608 vm_page_unwire(wpipe->pipe_map.ms[j], 1); 609 return (EFAULT); 610 } 611 612 m = PHYS_TO_VM_PAGE(paddr); 613 vm_page_wire(m); 614 wpipe->pipe_map.ms[i] = m; 615 } 616 617/* 618 * set up the control block 619 */ 620 wpipe->pipe_map.npages = i; 621 wpipe->pipe_map.pos = 622 ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK; 623 wpipe->pipe_map.cnt = size; 624 625/* 626 * and map the buffer 627 */ 628 if (wpipe->pipe_map.kva == 0) { 629 /* 630 * We need to allocate space for an extra page because the 631 * address range might (will) span pages at times. 632 */ 633 wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map, 634 wpipe->pipe_buffer.size + PAGE_SIZE); 635 amountpipekva += wpipe->pipe_buffer.size + PAGE_SIZE; 636 } 637 pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms, 638 wpipe->pipe_map.npages); 639 640/* 641 * and update the uio data 642 */ 643 644 uio->uio_iov->iov_len -= size; 645 uio->uio_iov->iov_base += size; 646 if (uio->uio_iov->iov_len == 0) 647 uio->uio_iov++; 648 uio->uio_resid -= size; 649 uio->uio_offset += size; 650 return (0); 651} 652 653/* 654 * unmap and unwire the process buffer 655 */ 656static void 657pipe_destroy_write_buffer(wpipe) 658 struct pipe *wpipe; 659{ 660 int i; 661 662 GIANT_REQUIRED; 663 664 if (wpipe->pipe_map.kva) { 665 pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages); 666 667 if (amountpipekva > MAXPIPEKVA) { 668 vm_offset_t kva = wpipe->pipe_map.kva; 669 wpipe->pipe_map.kva = 0; 670 kmem_free(kernel_map, kva, 671 wpipe->pipe_buffer.size + PAGE_SIZE); 672 amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE; 673 } 674 } 675 for (i = 0; i < wpipe->pipe_map.npages; i++) 676 vm_page_unwire(wpipe->pipe_map.ms[i], 1); 677} 678 679/* 680 * In the case of a signal, the writing process might go away. This 681 * code copies the data into the circular buffer so that the source 682 * pages can be freed without loss of data. 683 */ 684static void 685pipe_clone_write_buffer(wpipe) 686 struct pipe *wpipe; 687{ 688 int size; 689 int pos; 690 691 PIPE_LOCK_ASSERT(wpipe, MA_OWNED); 692 size = wpipe->pipe_map.cnt; 693 pos = wpipe->pipe_map.pos; 694 bcopy((caddr_t) wpipe->pipe_map.kva + pos, 695 (caddr_t) wpipe->pipe_buffer.buffer, size); 696 697 wpipe->pipe_buffer.in = size; 698 wpipe->pipe_buffer.out = 0; 699 wpipe->pipe_buffer.cnt = size; 700 wpipe->pipe_state &= ~PIPE_DIRECTW; 701 702 pipe_destroy_write_buffer(wpipe); 703} 704 705/* 706 * This implements the pipe buffer write mechanism. Note that only 707 * a direct write OR a normal pipe write can be pending at any given time. 708 * If there are any characters in the pipe buffer, the direct write will 709 * be deferred until the receiving process grabs all of the bytes from 710 * the pipe buffer. Then the direct mapping write is set-up. 711 */ 712static int 713pipe_direct_write(wpipe, uio) 714 struct pipe *wpipe; 715 struct uio *uio; 716{ 717 int error; 718 719retry: 720 PIPE_LOCK_ASSERT(wpipe, MA_OWNED); 721 while (wpipe->pipe_state & PIPE_DIRECTW) { 722 if (wpipe->pipe_state & PIPE_WANTR) { 723 wpipe->pipe_state &= ~PIPE_WANTR; 724 wakeup(wpipe); 725 } 726 wpipe->pipe_state |= PIPE_WANTW; 727 error = msleep(wpipe, PIPE_MTX(wpipe), 728 PRIBIO | PCATCH, "pipdww", 0); 729 if (error) 730 goto error1; 731 if (wpipe->pipe_state & PIPE_EOF) { 732 error = EPIPE; 733 goto error1; 734 } 735 } 736 wpipe->pipe_map.cnt = 0; /* transfer not ready yet */ 737 if (wpipe->pipe_buffer.cnt > 0) { 738 if (wpipe->pipe_state & PIPE_WANTR) { 739 wpipe->pipe_state &= ~PIPE_WANTR; 740 wakeup(wpipe); 741 } 742 743 wpipe->pipe_state |= PIPE_WANTW; 744 error = msleep(wpipe, PIPE_MTX(wpipe), 745 PRIBIO | PCATCH, "pipdwc", 0); 746 if (error) 747 goto error1; 748 if (wpipe->pipe_state & PIPE_EOF) { 749 error = EPIPE; 750 goto error1; 751 } 752 goto retry; 753 } 754 755 wpipe->pipe_state |= PIPE_DIRECTW; 756 757 PIPE_GET_GIANT(wpipe); 758 error = pipe_build_write_buffer(wpipe, uio); 759 PIPE_DROP_GIANT(wpipe); 760 if (error) { 761 wpipe->pipe_state &= ~PIPE_DIRECTW; 762 goto error1; 763 } 764 765 error = 0; 766 while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) { 767 if (wpipe->pipe_state & PIPE_EOF) { 768 pipelock(wpipe, 0); 769 PIPE_GET_GIANT(wpipe); 770 pipe_destroy_write_buffer(wpipe); 771 PIPE_DROP_GIANT(wpipe); 772 pipeunlock(wpipe); 773 pipeselwakeup(wpipe); 774 error = EPIPE; 775 goto error1; 776 } 777 if (wpipe->pipe_state & PIPE_WANTR) { 778 wpipe->pipe_state &= ~PIPE_WANTR; 779 wakeup(wpipe); 780 } 781 pipeselwakeup(wpipe); 782 error = msleep(wpipe, PIPE_MTX(wpipe), PRIBIO | PCATCH, 783 "pipdwt", 0); 784 } 785 786 pipelock(wpipe,0); 787 if (wpipe->pipe_state & PIPE_DIRECTW) { 788 /* 789 * this bit of trickery substitutes a kernel buffer for 790 * the process that might be going away. 791 */ 792 pipe_clone_write_buffer(wpipe); 793 } else { 794 pipe_destroy_write_buffer(wpipe); 795 } 796 pipeunlock(wpipe); 797 return (error); 798 799error1: 800 wakeup(wpipe); 801 return (error); 802} 803#endif 804 805static int 806pipe_write(fp, uio, cred, flags, td) 807 struct file *fp; 808 struct uio *uio; 809 struct ucred *cred; 810 struct thread *td; 811 int flags; 812{ 813 int error = 0; 814 int orig_resid; 815 struct pipe *wpipe, *rpipe; 816 817 rpipe = (struct pipe *) fp->f_data; 818 wpipe = rpipe->pipe_peer; 819 820 PIPE_LOCK(rpipe); 821 /* 822 * detect loss of pipe read side, issue SIGPIPE if lost. 823 */ 824 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 825 PIPE_UNLOCK(rpipe); 826 return (EPIPE); 827 } 828 ++wpipe->pipe_busy; 829 830 /* 831 * If it is advantageous to resize the pipe buffer, do 832 * so. 833 */ 834 if ((uio->uio_resid > PIPE_SIZE) && 835 (nbigpipe < LIMITBIGPIPES) && 836 (wpipe->pipe_state & PIPE_DIRECTW) == 0 && 837 (wpipe->pipe_buffer.size <= PIPE_SIZE) && 838 (wpipe->pipe_buffer.cnt == 0)) { 839 840 if ((error = pipelock(wpipe,1)) == 0) { 841 PIPE_GET_GIANT(rpipe); 842 if (pipespace(wpipe, BIG_PIPE_SIZE) == 0) 843 nbigpipe++; 844 PIPE_DROP_GIANT(rpipe); 845 pipeunlock(wpipe); 846 } 847 } 848 849 /* 850 * If an early error occured unbusy and return, waking up any pending 851 * readers. 852 */ 853 if (error) { 854 --wpipe->pipe_busy; 855 if ((wpipe->pipe_busy == 0) && 856 (wpipe->pipe_state & PIPE_WANT)) { 857 wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR); 858 wakeup(wpipe); 859 } 860 PIPE_UNLOCK(rpipe); 861 return(error); 862 } 863 864 KASSERT(wpipe->pipe_buffer.buffer != NULL, ("pipe buffer gone")); 865 866 orig_resid = uio->uio_resid; 867 868 while (uio->uio_resid) { 869 int space; 870 871#ifndef PIPE_NODIRECT 872 /* 873 * If the transfer is large, we can gain performance if 874 * we do process-to-process copies directly. 875 * If the write is non-blocking, we don't use the 876 * direct write mechanism. 877 * 878 * The direct write mechanism will detect the reader going 879 * away on us. 880 */ 881 if ((uio->uio_iov->iov_len >= PIPE_MINDIRECT) && 882 (fp->f_flag & FNONBLOCK) == 0 && 883 (wpipe->pipe_map.kva || (amountpipekva < LIMITPIPEKVA)) && 884 (uio->uio_iov->iov_len >= PIPE_MINDIRECT)) { 885 error = pipe_direct_write( wpipe, uio); 886 if (error) 887 break; 888 continue; 889 } 890#endif 891 892 /* 893 * Pipe buffered writes cannot be coincidental with 894 * direct writes. We wait until the currently executing 895 * direct write is completed before we start filling the 896 * pipe buffer. We break out if a signal occurs or the 897 * reader goes away. 898 */ 899 retrywrite: 900 while (wpipe->pipe_state & PIPE_DIRECTW) { 901 if (wpipe->pipe_state & PIPE_WANTR) { 902 wpipe->pipe_state &= ~PIPE_WANTR; 903 wakeup(wpipe); 904 } 905 error = msleep(wpipe, PIPE_MTX(rpipe), PRIBIO | PCATCH, 906 "pipbww", 0); 907 if (wpipe->pipe_state & PIPE_EOF) 908 break; 909 if (error) 910 break; 911 } 912 if (wpipe->pipe_state & PIPE_EOF) { 913 error = EPIPE; 914 break; 915 } 916 917 space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; 918 919 /* Writes of size <= PIPE_BUF must be atomic. */ 920 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF)) 921 space = 0; 922 923 if (space > 0 && (wpipe->pipe_buffer.cnt < PIPE_SIZE)) { 924 if ((error = pipelock(wpipe,1)) == 0) { 925 int size; /* Transfer size */ 926 int segsize; /* first segment to transfer */ 927 928 /* 929 * It is possible for a direct write to 930 * slip in on us... handle it here... 931 */ 932 if (wpipe->pipe_state & PIPE_DIRECTW) { 933 pipeunlock(wpipe); 934 goto retrywrite; 935 } 936 /* 937 * If a process blocked in uiomove, our 938 * value for space might be bad. 939 * 940 * XXX will we be ok if the reader has gone 941 * away here? 942 */ 943 if (space > wpipe->pipe_buffer.size - 944 wpipe->pipe_buffer.cnt) { 945 pipeunlock(wpipe); 946 goto retrywrite; 947 } 948 949 /* 950 * Transfer size is minimum of uio transfer 951 * and free space in pipe buffer. 952 */ 953 if (space > uio->uio_resid) 954 size = uio->uio_resid; 955 else 956 size = space; 957 /* 958 * First segment to transfer is minimum of 959 * transfer size and contiguous space in 960 * pipe buffer. If first segment to transfer 961 * is less than the transfer size, we've got 962 * a wraparound in the buffer. 963 */ 964 segsize = wpipe->pipe_buffer.size - 965 wpipe->pipe_buffer.in; 966 if (segsize > size) 967 segsize = size; 968 969 /* Transfer first segment */ 970 971 PIPE_UNLOCK(rpipe); 972 error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in], 973 segsize, uio); 974 PIPE_LOCK(rpipe); 975 976 if (error == 0 && segsize < size) { 977 /* 978 * Transfer remaining part now, to 979 * support atomic writes. Wraparound 980 * happened. 981 */ 982 if (wpipe->pipe_buffer.in + segsize != 983 wpipe->pipe_buffer.size) 984 panic("Expected pipe buffer wraparound disappeared"); 985 986 PIPE_UNLOCK(rpipe); 987 error = uiomove(&wpipe->pipe_buffer.buffer[0], 988 size - segsize, uio); 989 PIPE_LOCK(rpipe); 990 } 991 if (error == 0) { 992 wpipe->pipe_buffer.in += size; 993 if (wpipe->pipe_buffer.in >= 994 wpipe->pipe_buffer.size) { 995 if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size) 996 panic("Expected wraparound bad"); 997 wpipe->pipe_buffer.in = size - segsize; 998 } 999 1000 wpipe->pipe_buffer.cnt += size; 1001 if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size) 1002 panic("Pipe buffer overflow"); 1003 1004 } 1005 pipeunlock(wpipe); 1006 } 1007 if (error) 1008 break; 1009 1010 } else { 1011 /* 1012 * If the "read-side" has been blocked, wake it up now. 1013 */ 1014 if (wpipe->pipe_state & PIPE_WANTR) { 1015 wpipe->pipe_state &= ~PIPE_WANTR; 1016 wakeup(wpipe); 1017 } 1018 1019 /* 1020 * don't block on non-blocking I/O 1021 */ 1022 if (fp->f_flag & FNONBLOCK) { 1023 error = EAGAIN; 1024 break; 1025 } 1026 1027 /* 1028 * We have no more space and have something to offer, 1029 * wake up select/poll. 1030 */ 1031 pipeselwakeup(wpipe); 1032 1033 wpipe->pipe_state |= PIPE_WANTW; 1034 error = msleep(wpipe, PIPE_MTX(rpipe), 1035 PRIBIO | PCATCH, "pipewr", 0); 1036 if (error != 0) 1037 break; 1038 /* 1039 * If read side wants to go away, we just issue a signal 1040 * to ourselves. 1041 */ 1042 if (wpipe->pipe_state & PIPE_EOF) { 1043 error = EPIPE; 1044 break; 1045 } 1046 } 1047 } 1048 1049 --wpipe->pipe_busy; 1050 1051 if ((wpipe->pipe_busy == 0) && (wpipe->pipe_state & PIPE_WANT)) { 1052 wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR); 1053 wakeup(wpipe); 1054 } else if (wpipe->pipe_buffer.cnt > 0) { 1055 /* 1056 * If we have put any characters in the buffer, we wake up 1057 * the reader. 1058 */ 1059 if (wpipe->pipe_state & PIPE_WANTR) { 1060 wpipe->pipe_state &= ~PIPE_WANTR; 1061 wakeup(wpipe); 1062 } 1063 } 1064 1065 /* 1066 * Don't return EPIPE if I/O was successful 1067 */ 1068 if ((wpipe->pipe_buffer.cnt == 0) && 1069 (uio->uio_resid == 0) && 1070 (error == EPIPE)) { 1071 error = 0; 1072 } 1073 1074 if (error == 0) 1075 vfs_timestamp(&wpipe->pipe_mtime); 1076 1077 /* 1078 * We have something to offer, 1079 * wake up select/poll. 1080 */ 1081 if (wpipe->pipe_buffer.cnt) 1082 pipeselwakeup(wpipe); 1083 1084 PIPE_UNLOCK(rpipe); 1085 return (error); 1086} 1087 1088/* 1089 * we implement a very minimal set of ioctls for compatibility with sockets. 1090 */ 1091int 1092pipe_ioctl(fp, cmd, data, td) 1093 struct file *fp; 1094 u_long cmd; 1095 caddr_t data; 1096 struct thread *td; 1097{ 1098 struct pipe *mpipe = (struct pipe *)fp->f_data; 1099 1100 switch (cmd) { 1101 1102 case FIONBIO: 1103 return (0); 1104 1105 case FIOASYNC: 1106 PIPE_LOCK(mpipe); 1107 if (*(int *)data) { 1108 mpipe->pipe_state |= PIPE_ASYNC; 1109 } else { 1110 mpipe->pipe_state &= ~PIPE_ASYNC; 1111 } 1112 PIPE_UNLOCK(mpipe); 1113 return (0); 1114 1115 case FIONREAD: 1116 PIPE_LOCK(mpipe); 1117 if (mpipe->pipe_state & PIPE_DIRECTW) 1118 *(int *)data = mpipe->pipe_map.cnt; 1119 else 1120 *(int *)data = mpipe->pipe_buffer.cnt; 1121 PIPE_UNLOCK(mpipe); 1122 return (0); 1123 1124 case FIOSETOWN: 1125 return (fsetown(*(int *)data, &mpipe->pipe_sigio)); 1126 1127 case FIOGETOWN: 1128 *(int *)data = fgetown(mpipe->pipe_sigio); 1129 return (0); 1130 1131 /* This is deprecated, FIOSETOWN should be used instead. */ 1132 case TIOCSPGRP: 1133 return (fsetown(-(*(int *)data), &mpipe->pipe_sigio)); 1134 1135 /* This is deprecated, FIOGETOWN should be used instead. */ 1136 case TIOCGPGRP: 1137 *(int *)data = -fgetown(mpipe->pipe_sigio); 1138 return (0); 1139 1140 } 1141 return (ENOTTY); 1142} 1143 1144int 1145pipe_poll(fp, events, cred, td) 1146 struct file *fp; 1147 int events; 1148 struct ucred *cred; 1149 struct thread *td; 1150{ 1151 struct pipe *rpipe = (struct pipe *)fp->f_data; 1152 struct pipe *wpipe; 1153 int revents = 0; 1154 1155 wpipe = rpipe->pipe_peer; 1156 PIPE_LOCK(rpipe); 1157 if (events & (POLLIN | POLLRDNORM)) 1158 if ((rpipe->pipe_state & PIPE_DIRECTW) || 1159 (rpipe->pipe_buffer.cnt > 0) || 1160 (rpipe->pipe_state & PIPE_EOF)) 1161 revents |= events & (POLLIN | POLLRDNORM); 1162 1163 if (events & (POLLOUT | POLLWRNORM)) 1164 if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF) || 1165 (((wpipe->pipe_state & PIPE_DIRECTW) == 0) && 1166 (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)) 1167 revents |= events & (POLLOUT | POLLWRNORM); 1168 1169 if ((rpipe->pipe_state & PIPE_EOF) || 1170 (wpipe == NULL) || 1171 (wpipe->pipe_state & PIPE_EOF)) 1172 revents |= POLLHUP; 1173 1174 if (revents == 0) { 1175 if (events & (POLLIN | POLLRDNORM)) { 1176 selrecord(td, &rpipe->pipe_sel); 1177 rpipe->pipe_state |= PIPE_SEL; 1178 } 1179 1180 if (events & (POLLOUT | POLLWRNORM)) { 1181 selrecord(td, &wpipe->pipe_sel); 1182 wpipe->pipe_state |= PIPE_SEL; 1183 } 1184 } 1185 PIPE_UNLOCK(rpipe); 1186 1187 return (revents); 1188} 1189 1190static int 1191pipe_stat(fp, ub, td) 1192 struct file *fp; 1193 struct stat *ub; 1194 struct thread *td; 1195{ 1196 struct pipe *pipe = (struct pipe *)fp->f_data; 1197 1198 bzero((caddr_t)ub, sizeof(*ub)); 1199 ub->st_mode = S_IFIFO; 1200 ub->st_blksize = pipe->pipe_buffer.size; 1201 ub->st_size = pipe->pipe_buffer.cnt; 1202 ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize; 1203 ub->st_atimespec = pipe->pipe_atime; 1204 ub->st_mtimespec = pipe->pipe_mtime; 1205 ub->st_ctimespec = pipe->pipe_ctime; 1206 ub->st_uid = fp->f_cred->cr_uid; 1207 ub->st_gid = fp->f_cred->cr_gid; 1208 /* 1209 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen. 1210 * XXX (st_dev, st_ino) should be unique. 1211 */ 1212 return (0); 1213} 1214 1215/* ARGSUSED */ 1216static int 1217pipe_close(fp, td) 1218 struct file *fp; 1219 struct thread *td; 1220{ 1221 struct pipe *cpipe = (struct pipe *)fp->f_data; 1222 1223 fp->f_ops = &badfileops; 1224 fp->f_data = NULL; 1225 funsetown(cpipe->pipe_sigio); 1226 pipeclose(cpipe); 1227 return (0); 1228} 1229 1230static void 1231pipe_free_kmem(cpipe) 1232 struct pipe *cpipe; 1233{ 1234 GIANT_REQUIRED; 1235 1236 if (cpipe->pipe_buffer.buffer != NULL) { 1237 if (cpipe->pipe_buffer.size > PIPE_SIZE) 1238 --nbigpipe; 1239 amountpipekva -= cpipe->pipe_buffer.size; 1240 kmem_free(kernel_map, 1241 (vm_offset_t)cpipe->pipe_buffer.buffer, 1242 cpipe->pipe_buffer.size); 1243 cpipe->pipe_buffer.buffer = NULL; 1244 } 1245#ifndef PIPE_NODIRECT 1246 if (cpipe->pipe_map.kva != NULL) { 1247 amountpipekva -= cpipe->pipe_buffer.size + PAGE_SIZE; 1248 kmem_free(kernel_map, 1249 cpipe->pipe_map.kva, 1250 cpipe->pipe_buffer.size + PAGE_SIZE); 1251 cpipe->pipe_map.cnt = 0; 1252 cpipe->pipe_map.kva = 0; 1253 cpipe->pipe_map.pos = 0; 1254 cpipe->pipe_map.npages = 0; 1255 } 1256#endif 1257} 1258 1259/* 1260 * shutdown the pipe 1261 */ 1262static void 1263pipeclose(cpipe) 1264 struct pipe *cpipe; 1265{ 1266 struct pipe *ppipe; 1267 1268 if (cpipe) { 1269 PIPE_LOCK(cpipe); 1270 1271 pipeselwakeup(cpipe); 1272 1273 /* 1274 * If the other side is blocked, wake it up saying that 1275 * we want to close it down. 1276 */ 1277 while (cpipe->pipe_busy) { 1278 wakeup(cpipe); 1279 cpipe->pipe_state |= PIPE_WANT | PIPE_EOF; 1280 msleep(cpipe, PIPE_MTX(cpipe), PRIBIO, "pipecl", 0); 1281 } 1282 1283 /* 1284 * Disconnect from peer 1285 */ 1286 if ((ppipe = cpipe->pipe_peer) != NULL) { 1287 pipeselwakeup(ppipe); 1288 1289 ppipe->pipe_state |= PIPE_EOF; 1290 wakeup(ppipe); 1291 KNOTE(&ppipe->pipe_sel.si_note, 0); 1292 ppipe->pipe_peer = NULL; 1293 } 1294 /* 1295 * free resources 1296 */ 1297 PIPE_UNLOCK(cpipe); 1298 mtx_lock(&Giant); 1299 pipe_free_kmem(cpipe); 1300 zfree(pipe_zone, cpipe); 1301 mtx_unlock(&Giant); 1302 } 1303} 1304 1305/*ARGSUSED*/ 1306static int 1307pipe_kqfilter(struct file *fp, struct knote *kn) 1308{ 1309 struct pipe *cpipe; 1310 1311 cpipe = (struct pipe *)kn->kn_fp->f_data; 1312 switch (kn->kn_filter) { 1313 case EVFILT_READ: 1314 kn->kn_fop = &pipe_rfiltops; 1315 break; 1316 case EVFILT_WRITE: 1317 kn->kn_fop = &pipe_wfiltops; 1318 cpipe = cpipe->pipe_peer; 1319 break; 1320 default: 1321 return (1); 1322 } 1323 kn->kn_hook = (caddr_t)cpipe; 1324 1325 PIPE_LOCK(cpipe); 1326 SLIST_INSERT_HEAD(&cpipe->pipe_sel.si_note, kn, kn_selnext); 1327 PIPE_UNLOCK(cpipe); 1328 return (0); 1329} 1330 1331static void 1332filt_pipedetach(struct knote *kn) 1333{ 1334 struct pipe *cpipe = (struct pipe *)kn->kn_hook; 1335 1336 PIPE_LOCK(cpipe); 1337 SLIST_REMOVE(&cpipe->pipe_sel.si_note, kn, knote, kn_selnext); 1338 PIPE_UNLOCK(cpipe); 1339} 1340 1341/*ARGSUSED*/ 1342static int 1343filt_piperead(struct knote *kn, long hint) 1344{ 1345 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; 1346 struct pipe *wpipe = rpipe->pipe_peer; 1347 1348 PIPE_LOCK(rpipe); 1349 kn->kn_data = rpipe->pipe_buffer.cnt; 1350 if ((kn->kn_data == 0) && (rpipe->pipe_state & PIPE_DIRECTW)) 1351 kn->kn_data = rpipe->pipe_map.cnt; 1352 1353 if ((rpipe->pipe_state & PIPE_EOF) || 1354 (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 1355 kn->kn_flags |= EV_EOF; 1356 PIPE_UNLOCK(rpipe); 1357 return (1); 1358 } 1359 PIPE_UNLOCK(rpipe); 1360 return (kn->kn_data > 0); 1361} 1362 1363/*ARGSUSED*/ 1364static int 1365filt_pipewrite(struct knote *kn, long hint) 1366{ 1367 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; 1368 struct pipe *wpipe = rpipe->pipe_peer; 1369 1370 PIPE_LOCK(rpipe); 1371 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 1372 kn->kn_data = 0; 1373 kn->kn_flags |= EV_EOF; 1374 PIPE_UNLOCK(rpipe); 1375 return (1); 1376 } 1377 kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; 1378 if (wpipe->pipe_state & PIPE_DIRECTW) 1379 kn->kn_data = 0; 1380 1381 PIPE_UNLOCK(rpipe); 1382 return (kn->kn_data >= PIPE_BUF); 1383} 1384