sys_pipe.c revision 14177
154371Ssemenu/* 254371Ssemenu * Copyright (c) 1996 John S. Dyson 354371Ssemenu * All rights reserved. 454371Ssemenu * 554371Ssemenu * Redistribution and use in source and binary forms, with or without 654371Ssemenu * modification, are permitted provided that the following conditions 754371Ssemenu * are met: 854371Ssemenu * 1. Redistributions of source code must retain the above copyright 954371Ssemenu * notice immediately at the beginning of the file, without modification, 1054371Ssemenu * this list of conditions, and the following disclaimer. 1154371Ssemenu * 2. Redistributions in binary form must reproduce the above copyright 1254371Ssemenu * notice, this list of conditions and the following disclaimer in the 1354371Ssemenu * documentation and/or other materials provided with the distribution. 1454371Ssemenu * 3. Absolutely no warranty of function or purpose is made by the author 1554371Ssemenu * John S. Dyson. 1654371Ssemenu * 4. Modifications may be freely made to this file if the above conditions 1754371Ssemenu * are met. 1854371Ssemenu * 1954371Ssemenu * $Id: sys_pipe.c,v 1.12 1996/02/17 14:47:16 peter Exp $ 2054371Ssemenu */ 2154371Ssemenu 2254371Ssemenu#ifndef OLD_PIPE 2354371Ssemenu 2454371Ssemenu/* 2554371Ssemenu * This file contains a high-performance replacement for the socket-based 2654371Ssemenu * pipes scheme originally used in FreeBSD/4.4Lite. It does not support 2754371Ssemenu * all features of sockets, but does do everything that pipes normally 2854371Ssemenu * do. 2954371Ssemenu */ 3054371Ssemenu 3154371Ssemenu/* 3254371Ssemenu * This code has two modes of operation, a small write mode and a large 3354371Ssemenu * write mode. The small write mode acts like conventional pipes with 3454371Ssemenu * a kernel buffer. If the buffer is less than PIPE_MINDIRECT, then the 3554371Ssemenu * "normal" pipe buffering is done. If the buffer is between PIPE_MINDIRECT 3654371Ssemenu * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and 3754371Ssemenu * the receiving process can copy it directly from the pages in the sending 3860041Sphk * process. 3954371Ssemenu * 4054371Ssemenu * If the sending process receives a signal, it is possible that it will 4154371Ssemenu * go away, and certainly its address space can change, because control 4254371Ssemenu * is returned back to the user-mode side. In that case, the pipe code 43137040Sphk * arranges to copy the buffer supplied by the user process, to a pageable 44137040Sphk * kernel buffer, and the receiving process will grab the data from the 45137040Sphk * pageable kernel buffer. Since signals don't happen all that often, 4654371Ssemenu * the copy operation is normally eliminated. 4754371Ssemenu * 4854371Ssemenu * The constant PIPE_MINDIRECT is chosen to make sure that buffering will 4954371Ssemenu * happen for small transfers so that the system will not spend all of 5054371Ssemenu * its time context switching. PIPE_SIZE is constrained by the 5154371Ssemenu * amount of kernel virtual memory. 5254371Ssemenu */ 5354371Ssemenu 5454371Ssemenu#include <sys/param.h> 5554371Ssemenu#include <sys/systm.h> 5654371Ssemenu#include <sys/proc.h> 5754371Ssemenu#include <sys/file.h> 5883384Sjhb#include <sys/protosw.h> 5986927Sjhb#include <sys/stat.h> 6086927Sjhb#include <sys/filedesc.h> 6192727Salfred#include <sys/malloc.h> 62138487Sphk#include <sys/ioctl.h> 6354371Ssemenu#include <sys/stat.h> 64116271Sphk#include <sys/select.h> 65116271Sphk#include <sys/signalvar.h> 66138487Sphk#include <sys/errno.h> 67138487Sphk#include <sys/queue.h> 68116271Sphk#include <sys/vmmeter.h> 69116271Sphk#include <sys/kernel.h> 70116271Sphk#include <sys/sysproto.h> 71116271Sphk#include <sys/pipe.h> 72116271Sphk 7354371Ssemenu#include <vm/vm.h> 74138487Sphk#include <vm/vm_prot.h> 75138487Sphk#include <vm/vm_param.h> 76138487Sphk#include <vm/lock.h> 77138487Sphk#include <vm/vm_object.h> 78138487Sphk#include <vm/vm_kern.h> 79138487Sphk#include <vm/vm_extern.h> 80138487Sphk#include <vm/pmap.h> 81138487Sphk#include <vm/vm_map.h> 82138487Sphk#include <vm/vm_page.h> 83138487Sphk 84138487Sphk/* 85138487Sphk * Use this define if you want to disable *fancy* VM things. Expect an 86138487Sphk * approx 30% decrease in transfer rate. This could be useful for 87138487Sphk * NetBSD or OpenBSD. 88138487Sphk */ 89138487Sphk/* #define PIPE_NODIRECT */ 90138487Sphk 91138487Sphk/* 92138487Sphk * interfaces to the outside world 93138487Sphk */ 94138487Sphkstatic int pipe_read __P((struct file *fp, struct uio *uio, 95138487Sphk struct ucred *cred)); 96138487Sphkstatic int pipe_write __P((struct file *fp, struct uio *uio, 97138487Sphk struct ucred *cred)); 98138487Sphkstatic int pipe_close __P((struct file *fp, struct proc *p)); 99138487Sphkstatic int pipe_select __P((struct file *fp, int which, struct proc *p)); 100138487Sphkstatic int pipe_ioctl __P((struct file *fp, int cmd, caddr_t data, struct proc *p)); 101138487Sphk 102138487Sphkstatic struct fileops pipeops = 103138487Sphk { pipe_read, pipe_write, pipe_ioctl, pipe_select, pipe_close }; 104138487Sphk 105138487Sphk/* 106138487Sphk * Default pipe buffer size(s), this can be kind-of large now because pipe 107138487Sphk * space is pageable. The pipe code will try to maintain locality of 10854371Ssemenu * reference for performance reasons, so small amounts of outstanding I/O 10986931Sjhb * will not wipe the cache. 11054371Ssemenu */ 111138487Sphk#define MINPIPESIZE (PIPE_SIZE/3) 11254371Ssemenu#define MAXPIPESIZE (2*PIPE_SIZE/3) 11354371Ssemenu 114132902Sphk/* 115138487Sphk * Maximum amount of kva for pipes -- this is kind-of a soft limit, but 116138487Sphk * is there so that on large systems, we don't exhaust it. 11754371Ssemenu */ 118132902Sphk#define MAXPIPEKVA (8*1024*1024) 11954371Ssemenu 12054371Ssemenu/* 12196755Strhodes * Limit for direct transfers, we cannot, of course limit 12254371Ssemenu * the amount of kva for pipes in general though. 12354371Ssemenu */ 124138487Sphk#define LIMITPIPEKVA (16*1024*1024) 125138487Sphkint amountpipekva; 12654371Ssemenu 127138487Sphkstatic void pipeclose __P((struct pipe *cpipe)); 128138487Sphkstatic void pipebufferinit __P((struct pipe *cpipe)); 129138487Sphkstatic void pipeinit __P((struct pipe *cpipe)); 13054371Ssemenustatic __inline int pipelock __P((struct pipe *cpipe, int catch)); 13154371Ssemenustatic __inline void pipeunlock __P((struct pipe *cpipe)); 13254371Ssemenustatic __inline void pipeselwakeup __P((struct pipe *cpipe)); 13354371Ssemenu#ifndef PIPE_NODIRECT 13454371Ssemenustatic int pipe_build_write_buffer __P((struct pipe *wpipe, struct uio *uio)); 13554371Ssemenustatic void pipe_destroy_write_buffer __P((struct pipe *wpipe)); 136132902Sphkstatic int pipe_direct_write __P((struct pipe *wpipe, struct uio *uio)); 13754371Ssemenustatic void pipe_clone_write_buffer __P((struct pipe *wpipe)); 13854371Ssemenustatic void pipe_mark_pages_clean __P((struct pipe *cpipe)); 13954371Ssemenu#endif 140138487Sphkstatic int pipewrite __P((struct pipe *wpipe, struct uio *uio, int nbio)); 141138487Sphkstatic void pipespace __P((struct pipe *cpipe)); 142138487Sphk 143138487Sphk/* 144138487Sphk * The pipe system call for the DTYPE_PIPE type of pipes 14554371Ssemenu */ 146138487Sphk 14754371Ssemenu/* ARGSUSED */ 148132902Sphkint 14954371Ssemenupipe(p, uap, retval) 15054371Ssemenu struct proc *p; 15154371Ssemenu struct pipe_args /* { 15254371Ssemenu int dummy; 15354371Ssemenu } */ *uap; 15454371Ssemenu int retval[]; 15554371Ssemenu{ 15654371Ssemenu register struct filedesc *fdp = p->p_fd; 15754371Ssemenu struct file *rf, *wf; 15854371Ssemenu struct pipe *rpipe, *wpipe; 15954371Ssemenu int fd, error; 16054371Ssemenu 16154371Ssemenu rpipe = malloc( sizeof (*rpipe), M_TEMP, M_WAITOK); 16254371Ssemenu pipeinit(rpipe); 16354371Ssemenu rpipe->pipe_state |= PIPE_DIRECTOK; 164138487Sphk wpipe = malloc( sizeof (*wpipe), M_TEMP, M_WAITOK); 165132902Sphk pipeinit(wpipe); 16654371Ssemenu wpipe->pipe_state |= PIPE_DIRECTOK; 16754371Ssemenu 16854371Ssemenu error = falloc(p, &rf, &fd); 16954371Ssemenu if (error) 17054371Ssemenu goto free2; 171132902Sphk retval[0] = fd; 17254371Ssemenu rf->f_flag = FREAD | FWRITE; 17355756Sphk rf->f_type = DTYPE_PIPE; 17454371Ssemenu rf->f_ops = &pipeops; 17554371Ssemenu rf->f_data = (caddr_t)rpipe; 17654371Ssemenu error = falloc(p, &wf, &fd); 17754371Ssemenu if (error) 17854371Ssemenu goto free3; 17954371Ssemenu wf->f_flag = FREAD | FWRITE; 18054371Ssemenu wf->f_type = DTYPE_PIPE; 18154371Ssemenu wf->f_ops = &pipeops; 18254371Ssemenu wf->f_data = (caddr_t)wpipe; 18354371Ssemenu retval[1] = fd; 18454371Ssemenu 18573286Sadrian rpipe->pipe_peer = wpipe; 186132902Sphk wpipe->pipe_peer = rpipe; 18773286Sadrian 18873286Sadrian return (0); 18973286Sadrianfree3: 19054371Ssemenu ffree(rf); 19154371Ssemenu fdp->fd_ofiles[retval[0]] = 0; 192138487Sphkfree2: 19354371Ssemenu (void)pipeclose(wpipe); 194138487Sphkfree1: 19554371Ssemenu (void)pipeclose(rpipe); 19654371Ssemenu return (error); 19754371Ssemenu} 19854371Ssemenu 19954371Ssemenu/* 20054371Ssemenu * Allocate kva for pipe circular buffer, the space is pageable 20154371Ssemenu */ 20254371Ssemenustatic void 20354371Ssemenupipespace(cpipe) 20454371Ssemenu struct pipe *cpipe; 20554371Ssemenu{ 20654371Ssemenu int npages, error; 207132902Sphk 20854371Ssemenu npages = round_page(cpipe->pipe_buffer.size)/PAGE_SIZE; 20954371Ssemenu /* 21054371Ssemenu * Create an object, I don't like the idea of paging to/from 21154371Ssemenu * kernel_object. 21254371Ssemenu * XXX -- minor change needed here for NetBSD/OpenBSD VM systems. 21354371Ssemenu */ 21454371Ssemenu cpipe->pipe_buffer.object = vm_object_allocate(OBJT_DEFAULT, npages); 21554371Ssemenu cpipe->pipe_buffer.buffer = (caddr_t) vm_map_min(kernel_map); 21654371Ssemenu 217138487Sphk /* 21854371Ssemenu * Insert the object into the kernel map, and allocate kva for it. 21954371Ssemenu * The map entry is, by default, pageable. 22086931Sjhb * XXX -- minor change needed here for NetBSD/OpenBSD VM systems. 22154371Ssemenu */ 222138487Sphk error = vm_map_find(kernel_map, cpipe->pipe_buffer.object, 0, 22354371Ssemenu (vm_offset_t *) &cpipe->pipe_buffer.buffer, 22454371Ssemenu cpipe->pipe_buffer.size, 1, 22554371Ssemenu VM_PROT_ALL, VM_PROT_ALL, 0); 22654371Ssemenu 22754371Ssemenu if (error != KERN_SUCCESS) 228130585Sphk panic("pipeinit: cannot allocate pipe -- out of kvm -- code = %d", error); 229137040Sphk amountpipekva += cpipe->pipe_buffer.size; 230137040Sphk} 23154371Ssemenu 232137478Sphk/* 233137478Sphk * initialize and allocate VM and memory for pipe 23454371Ssemenu */ 23554371Ssemenustatic void 23686931Sjhbpipeinit(cpipe) 237137040Sphk struct pipe *cpipe; 238137040Sphk{ 239137040Sphk int s; 240137040Sphk 241137040Sphk cpipe->pipe_buffer.in = 0; 242137040Sphk cpipe->pipe_buffer.out = 0; 24386931Sjhb cpipe->pipe_buffer.cnt = 0; 24454371Ssemenu cpipe->pipe_buffer.size = PIPE_SIZE; 24554371Ssemenu /* Buffer kva gets dynamically allocated */ 24654371Ssemenu cpipe->pipe_buffer.buffer = NULL; 247137040Sphk 248137040Sphk cpipe->pipe_state = 0; 249137040Sphk cpipe->pipe_peer = NULL; 250137040Sphk cpipe->pipe_busy = 0; 25154371Ssemenu s = splhigh(); 25254371Ssemenu cpipe->pipe_ctime = time; 25354371Ssemenu cpipe->pipe_atime = time; 254111119Simp cpipe->pipe_mtime = time; 25554371Ssemenu splx(s); 256137040Sphk bzero(&cpipe->pipe_sel, sizeof cpipe->pipe_sel); 257137040Sphk 258137040Sphk#ifndef PIPE_NODIRECT 25954371Ssemenu /* 26054371Ssemenu * pipe data structure initializations to support direct pipe I/O 26154371Ssemenu */ 26254371Ssemenu cpipe->pipe_map.cnt = 0; 26354371Ssemenu cpipe->pipe_map.kva = 0; 26454371Ssemenu cpipe->pipe_map.pos = 0; 26554371Ssemenu cpipe->pipe_map.npages = 0; 26654371Ssemenu#endif 26754371Ssemenu} 26854371Ssemenu 26954371Ssemenu 27054371Ssemenu/* 27154371Ssemenu * lock a pipe for I/O, blocking other access 27254371Ssemenu */ 27354371Ssemenustatic __inline int 27454371Ssemenupipelock(cpipe, catch) 27554371Ssemenu struct pipe *cpipe; 27654371Ssemenu int catch; 27754371Ssemenu{ 27854371Ssemenu int error; 27954371Ssemenu while (cpipe->pipe_state & PIPE_LOCK) { 28054371Ssemenu cpipe->pipe_state |= PIPE_LWANT; 28154371Ssemenu if (error = tsleep( cpipe, 28254371Ssemenu catch?(PRIBIO|PCATCH):PRIBIO, "pipelk", 0)) { 28354371Ssemenu return error; 28454371Ssemenu } 28554371Ssemenu } 28654371Ssemenu cpipe->pipe_state |= PIPE_LOCK; 28754371Ssemenu return 0; 28854371Ssemenu} 28954371Ssemenu 29054371Ssemenu/* 29154371Ssemenu * unlock a pipe I/O lock 292138487Sphk */ 293138487Sphkstatic __inline void 294138487Sphkpipeunlock(cpipe) 295138487Sphk struct pipe *cpipe; 296138487Sphk{ 297138487Sphk cpipe->pipe_state &= ~PIPE_LOCK; 29854371Ssemenu if (cpipe->pipe_state & PIPE_LWANT) { 29954371Ssemenu cpipe->pipe_state &= ~PIPE_LWANT; 30054371Ssemenu wakeup(cpipe); 30154371Ssemenu } 30254371Ssemenu return; 303138487Sphk} 30454371Ssemenu 30554371Ssemenustatic __inline void 30654371Ssemenupipeselwakeup(cpipe) 30754371Ssemenu struct pipe *cpipe; 30854371Ssemenu{ 309132023Salfred if (cpipe->pipe_state & PIPE_SEL) { 31054371Ssemenu cpipe->pipe_state &= ~PIPE_SEL; 31154371Ssemenu selwakeup(&cpipe->pipe_sel); 31254371Ssemenu } 31354371Ssemenu} 31454371Ssemenu 31554371Ssemenu#ifndef PIPE_NODIRECT 31654371Ssemenu#if 0 31754371Ssemenustatic void 31854371Ssemenupipe_mark_pages_clean(cpipe) 31954371Ssemenu struct pipe *cpipe; 32054371Ssemenu{ 32154371Ssemenu vm_size_t off; 32254371Ssemenu vm_page_t m; 32354371Ssemenu 32454371Ssemenu for(off = 0; off < cpipe->pipe_buffer.object->size; off += 1) { 32554371Ssemenu m = vm_page_lookup(cpipe->pipe_buffer.object, off); 32654371Ssemenu if ((m != NULL) && (m->busy == 0) && (m->flags & PG_BUSY) == 0) { 32754371Ssemenu m->dirty = 0; 328140822Sphk pmap_clear_modify(VM_PAGE_TO_PHYS(m)); 32954371Ssemenu } 33054371Ssemenu } 33154371Ssemenu} 33254371Ssemenu#endif 33354371Ssemenu#endif 33454371Ssemenu 33554371Ssemenu/* ARGSUSED */ 33686931Sjhbstatic int 33754371Ssemenupipe_read(fp, uio, cred) 33854371Ssemenu struct file *fp; 33954371Ssemenu struct uio *uio; 34054371Ssemenu struct ucred *cred; 34154371Ssemenu{ 34254371Ssemenu 34354371Ssemenu struct pipe *rpipe = (struct pipe *) fp->f_data; 34454371Ssemenu int error = 0; 34554371Ssemenu int nread = 0; 34654371Ssemenu int size; 34754371Ssemenu 34854371Ssemenu ++rpipe->pipe_busy; 34954371Ssemenu while (uio->uio_resid) { 35054371Ssemenu /* 351132023Salfred * normal pipe buffer receive 35254371Ssemenu */ 35354371Ssemenu if (rpipe->pipe_buffer.cnt > 0) { 35454371Ssemenu int size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out; 35554371Ssemenu if (size > rpipe->pipe_buffer.cnt) 35654371Ssemenu size = rpipe->pipe_buffer.cnt; 357140220Sphk if (size > uio->uio_resid) 358140822Sphk size = uio->uio_resid; 35954371Ssemenu if ((error = pipelock(rpipe,1)) == 0) { 36054371Ssemenu error = uiomove( &rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out], 36154371Ssemenu size, uio); 36254371Ssemenu pipeunlock(rpipe); 36354371Ssemenu } 36454371Ssemenu if (error) { 36554371Ssemenu break; 36654371Ssemenu } 36754371Ssemenu rpipe->pipe_buffer.out += size; 36854371Ssemenu if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size) 36954371Ssemenu rpipe->pipe_buffer.out = 0; 37054371Ssemenu 37154371Ssemenu rpipe->pipe_buffer.cnt -= size; 37254371Ssemenu nread += size; 37354371Ssemenu#ifndef PIPE_NODIRECT 374132023Salfred /* 375132023Salfred * Direct copy, bypassing a kernel buffer. 37654371Ssemenu */ 37754371Ssemenu } else if ((size = rpipe->pipe_map.cnt) && 37854371Ssemenu (rpipe->pipe_state & PIPE_DIRECTW)) { 37954371Ssemenu caddr_t va; 38054371Ssemenu if (size > uio->uio_resid) 38192462Smckusick size = uio->uio_resid; 38254371Ssemenu if ((error = pipelock(rpipe,1)) == 0) { 38354371Ssemenu va = (caddr_t) rpipe->pipe_map.kva + rpipe->pipe_map.pos; 38454371Ssemenu error = uiomove(va, size, uio); 38554371Ssemenu pipeunlock(rpipe); 38654371Ssemenu } 38754371Ssemenu if (error) 38854371Ssemenu break; 38954371Ssemenu nread += size; 39054371Ssemenu rpipe->pipe_map.pos += size; 39154371Ssemenu rpipe->pipe_map.cnt -= size; 39254371Ssemenu if (rpipe->pipe_map.cnt == 0) { 39354371Ssemenu rpipe->pipe_state &= ~PIPE_DIRECTW; 39486931Sjhb wakeup(rpipe); 39554371Ssemenu } 39654371Ssemenu#endif 39754371Ssemenu } else { 39854371Ssemenu /* 39954371Ssemenu * detect EOF condition 40054371Ssemenu */ 40154371Ssemenu if (rpipe->pipe_state & PIPE_EOF) { 40254371Ssemenu break; 40354371Ssemenu } 40454371Ssemenu /* 40554371Ssemenu * If the "write-side" has been blocked, wake it up now. 40654371Ssemenu */ 40754371Ssemenu if (rpipe->pipe_state & PIPE_WANTW) { 40854371Ssemenu rpipe->pipe_state &= ~PIPE_WANTW; 40954371Ssemenu wakeup(rpipe); 41054371Ssemenu } 41154371Ssemenu if (nread > 0) 41254371Ssemenu break; 41354371Ssemenu if (rpipe->pipe_state & PIPE_NBIO) { 41454371Ssemenu error = EAGAIN; 41554371Ssemenu break; 41654371Ssemenu } 41754371Ssemenu 41854371Ssemenu /* 41954371Ssemenu * If there is no more to read in the pipe, reset 42054371Ssemenu * its pointers to the beginning. This improves 42154371Ssemenu * cache hit stats. 42254371Ssemenu */ 42354371Ssemenu 42492462Smckusick if ((error = pipelock(rpipe,1)) == 0) { 42554371Ssemenu if (rpipe->pipe_buffer.cnt == 0) { 42654371Ssemenu rpipe->pipe_buffer.in = 0; 42754371Ssemenu rpipe->pipe_buffer.out = 0; 42854371Ssemenu } 42954371Ssemenu pipeunlock(rpipe); 43054371Ssemenu } else { 43154371Ssemenu break; 43254371Ssemenu } 43354371Ssemenu 43454371Ssemenu if (rpipe->pipe_state & PIPE_WANTW) { 43554371Ssemenu rpipe->pipe_state &= ~PIPE_WANTW; 43654371Ssemenu wakeup(rpipe); 43754371Ssemenu } 43854371Ssemenu 43954371Ssemenu rpipe->pipe_state |= PIPE_WANTR; 44054371Ssemenu if (error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) { 44154371Ssemenu break; 44254371Ssemenu } 44354371Ssemenu } 44454371Ssemenu } 44554371Ssemenu 44654371Ssemenu if (error == 0) { 44754371Ssemenu int s = splhigh(); 44854371Ssemenu rpipe->pipe_atime = time; 44954371Ssemenu splx(s); 45054371Ssemenu } 45154371Ssemenu 45254371Ssemenu --rpipe->pipe_busy; 45354371Ssemenu if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) { 45454371Ssemenu rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW); 45592462Smckusick wakeup(rpipe); 45654371Ssemenu } else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) { 45754371Ssemenu /* 45854371Ssemenu * If there is no more to read in the pipe, reset 45954371Ssemenu * its pointers to the beginning. This improves 46054371Ssemenu * cache hit stats. 46154371Ssemenu */ 46254371Ssemenu if ((error == 0) && (error = pipelock(rpipe,1)) == 0) { 46354371Ssemenu if (rpipe->pipe_buffer.cnt == 0) { 46454371Ssemenu#if 0 46554371Ssemenu pipe_mark_pages_clean(rpipe); 466143692Sphk#endif 467143619Sphk rpipe->pipe_buffer.in = 0; 468143588Sphk rpipe->pipe_buffer.out = 0; 469143588Sphk } 47054371Ssemenu pipeunlock(rpipe); 47154371Ssemenu } 47254371Ssemenu 47354371Ssemenu /* 47454371Ssemenu * If the "write-side" has been blocked, wake it up now. 47554371Ssemenu */ 47654371Ssemenu if (rpipe->pipe_state & PIPE_WANTW) { 47754371Ssemenu rpipe->pipe_state &= ~PIPE_WANTW; 47854371Ssemenu wakeup(rpipe); 47954371Ssemenu } 48054371Ssemenu } 48154371Ssemenu 48254371Ssemenu if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) > PIPE_BUF) 48354371Ssemenu pipeselwakeup(rpipe); 48454371Ssemenu 48554371Ssemenu return error; 48654371Ssemenu} 487111119Simp 48854371Ssemenu#ifndef PIPE_NODIRECT 489138290Sphk/* 49054371Ssemenu * Map the sending processes' buffer into kernel space and wire it. 49154371Ssemenu * This is similar to a physical write operation. 49254371Ssemenu */ 49354371Ssemenustatic int 49454371Ssemenupipe_build_write_buffer(wpipe, uio) 49554371Ssemenu struct pipe *wpipe; 49654371Ssemenu struct uio *uio; 49754371Ssemenu{ 49854371Ssemenu int size; 49954371Ssemenu int i; 50054371Ssemenu vm_offset_t addr, endaddr, paddr; 501101308Sjeff 50254371Ssemenu size = uio->uio_iov->iov_len; 50371576Sjasone if (size > wpipe->pipe_buffer.size) 50493818Sjhb size = wpipe->pipe_buffer.size; 50554371Ssemenu 50654371Ssemenu endaddr = round_page(uio->uio_iov->iov_base + size); 50754371Ssemenu for(i = 0, addr = trunc_page(uio->uio_iov->iov_base); 50854371Ssemenu addr < endaddr; 50954371Ssemenu addr += PAGE_SIZE, i+=1) { 51054371Ssemenu 51154371Ssemenu vm_page_t m; 51254371Ssemenu 51354371Ssemenu vm_fault_quick( (caddr_t) addr, VM_PROT_READ); 51454371Ssemenu paddr = pmap_kextract(addr); 51554371Ssemenu if (!paddr) { 516143692Sphk int j; 517143663Sphk for(j=0;j<i;j++) 51854371Ssemenu vm_page_unwire(wpipe->pipe_map.ms[j]); 51954371Ssemenu return EFAULT; 52054371Ssemenu } 52154371Ssemenu 52254371Ssemenu m = PHYS_TO_VM_PAGE(paddr); 52354371Ssemenu vm_page_wire(m); 52454371Ssemenu wpipe->pipe_map.ms[i] = m; 52554371Ssemenu } 52654371Ssemenu 52754371Ssemenu/* 52854371Ssemenu * set up the control block 52954371Ssemenu */ 53054371Ssemenu wpipe->pipe_map.npages = i; 53154371Ssemenu wpipe->pipe_map.pos = ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK; 53254371Ssemenu wpipe->pipe_map.cnt = size; 53354371Ssemenu 53454371Ssemenu/* 53554371Ssemenu * and map the buffer 53654371Ssemenu */ 53754371Ssemenu if (wpipe->pipe_map.kva == 0) { 53854371Ssemenu /* 53954371Ssemenu * We need to allocate space for an extra page because the 54054371Ssemenu * address range might (will) span pages at times. 54154371Ssemenu */ 54254371Ssemenu wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map, 54354371Ssemenu wpipe->pipe_buffer.size + PAGE_SIZE); 544116271Sphk amountpipekva += wpipe->pipe_buffer.size + PAGE_SIZE; 545138487Sphk } 546138487Sphk pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms, 547116271Sphk wpipe->pipe_map.npages); 548116271Sphk 549116271Sphk/* 550116271Sphk * and update the uio data 551116271Sphk */ 55254371Ssemenu 55354371Ssemenu uio->uio_iov->iov_len -= size; 554 uio->uio_iov->iov_base += size; 555 if (uio->uio_iov->iov_len == 0) 556 uio->uio_iov++; 557 uio->uio_resid -= size; 558 uio->uio_offset += size; 559 return 0; 560} 561 562/* 563 * unmap and unwire the process buffer 564 */ 565static void 566pipe_destroy_write_buffer(wpipe) 567struct pipe *wpipe; 568{ 569 int i; 570 pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages); 571 572 if (wpipe->pipe_map.kva) { 573 if (amountpipekva > MAXPIPEKVA) { 574 vm_offset_t kva = wpipe->pipe_map.kva; 575 wpipe->pipe_map.kva = 0; 576 kmem_free(kernel_map, kva, 577 wpipe->pipe_buffer.size + PAGE_SIZE); 578 amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE; 579 } 580 } 581 for (i=0;i<wpipe->pipe_map.npages;i++) 582 vm_page_unwire(wpipe->pipe_map.ms[i]); 583} 584 585/* 586 * In the case of a signal, the writing process might go away. This 587 * code copies the data into the circular buffer so that the source 588 * pages can be freed without loss of data. 589 */ 590static void 591pipe_clone_write_buffer(wpipe) 592struct pipe *wpipe; 593{ 594 int size; 595 int pos; 596 597 size = wpipe->pipe_map.cnt; 598 pos = wpipe->pipe_map.pos; 599 bcopy((caddr_t) wpipe->pipe_map.kva+pos, 600 (caddr_t) wpipe->pipe_buffer.buffer, 601 size); 602 603 wpipe->pipe_buffer.in = size; 604 wpipe->pipe_buffer.out = 0; 605 wpipe->pipe_buffer.cnt = size; 606 wpipe->pipe_state &= ~PIPE_DIRECTW; 607 608 pipe_destroy_write_buffer(wpipe); 609} 610 611/* 612 * This implements the pipe buffer write mechanism. Note that only 613 * a direct write OR a normal pipe write can be pending at any given time. 614 * If there are any characters in the pipe buffer, the direct write will 615 * be deferred until the receiving process grabs all of the bytes from 616 * the pipe buffer. Then the direct mapping write is set-up. 617 */ 618static int 619pipe_direct_write(wpipe, uio) 620 struct pipe *wpipe; 621 struct uio *uio; 622{ 623 int error; 624retry: 625 while (wpipe->pipe_state & PIPE_DIRECTW) { 626 if ( wpipe->pipe_state & PIPE_WANTR) { 627 wpipe->pipe_state &= ~PIPE_WANTR; 628 wakeup(wpipe); 629 } 630 wpipe->pipe_state |= PIPE_WANTW; 631 error = tsleep(wpipe, 632 PRIBIO|PCATCH, "pipdww", 0); 633 if (error || (wpipe->pipe_state & PIPE_EOF)) 634 goto error1; 635 } 636 wpipe->pipe_map.cnt = 0; /* transfer not ready yet */ 637 if (wpipe->pipe_buffer.cnt > 0) { 638 if ( wpipe->pipe_state & PIPE_WANTR) { 639 wpipe->pipe_state &= ~PIPE_WANTR; 640 wakeup(wpipe); 641 } 642 643 wpipe->pipe_state |= PIPE_WANTW; 644 error = tsleep(wpipe, 645 PRIBIO|PCATCH, "pipdwc", 0); 646 if (error || (wpipe->pipe_state & PIPE_EOF)) { 647 if (error == 0) 648 error = EPIPE; 649 goto error1; 650 } 651 goto retry; 652 } 653 654 wpipe->pipe_state |= PIPE_DIRECTW; 655 656 error = pipe_build_write_buffer(wpipe, uio); 657 if (error) { 658 wpipe->pipe_state &= ~PIPE_DIRECTW; 659 goto error1; 660 } 661 662 error = 0; 663 while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) { 664 if (wpipe->pipe_state & PIPE_EOF) { 665 pipelock(wpipe, 0); 666 pipe_destroy_write_buffer(wpipe); 667 pipeunlock(wpipe); 668 pipeselwakeup(wpipe); 669 wakeup(wpipe); 670 return EPIPE; 671 } 672 if (wpipe->pipe_state & PIPE_WANTR) { 673 wpipe->pipe_state &= ~PIPE_WANTR; 674 wakeup(wpipe); 675 } 676 pipeselwakeup(wpipe); 677 error = tsleep(wpipe, PRIBIO|PCATCH, "pipdwt", 0); 678 } 679 680 pipelock(wpipe,0); 681 if (wpipe->pipe_state & PIPE_DIRECTW) { 682 /* 683 * this bit of trickery substitutes a kernel buffer for 684 * the process that might be going away. 685 */ 686 pipe_clone_write_buffer(wpipe); 687 } else { 688 pipe_destroy_write_buffer(wpipe); 689 } 690 pipeunlock(wpipe); 691 return error; 692 693error1: 694 wakeup(wpipe); 695 return error; 696} 697#endif 698 699static __inline int 700pipewrite(wpipe, uio, nbio) 701 struct pipe *wpipe; 702 struct uio *uio; 703 int nbio; 704{ 705 int error = 0; 706 int orig_resid; 707 708 /* 709 * detect loss of pipe read side, issue SIGPIPE if lost. 710 */ 711 if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF)) { 712 return EPIPE; 713 } 714 715 if( wpipe->pipe_buffer.buffer == NULL) { 716 if ((error = pipelock(wpipe,1)) == 0) { 717 pipespace(wpipe); 718 pipeunlock(wpipe); 719 } else { 720 return error; 721 } 722 } 723 724 ++wpipe->pipe_busy; 725 orig_resid = uio->uio_resid; 726 while (uio->uio_resid) { 727 int space; 728#ifndef PIPE_NODIRECT 729 /* 730 * If the transfer is large, we can gain performance if 731 * we do process-to-process copies directly. 732 */ 733 if ((amountpipekva < LIMITPIPEKVA) && 734 (uio->uio_iov->iov_len >= PIPE_MINDIRECT)) { 735 error = pipe_direct_write( wpipe, uio); 736 if (error) { 737 break; 738 } 739 continue; 740 } 741#endif 742 743 /* 744 * Pipe buffered writes cannot be coincidental with 745 * direct writes. We wait until the currently executing 746 * direct write is completed before we start filling the 747 * pipe buffer. 748 */ 749 retrywrite: 750 while (wpipe->pipe_state & PIPE_DIRECTW) { 751 if (wpipe->pipe_state & PIPE_WANTR) { 752 wpipe->pipe_state &= ~PIPE_WANTR; 753 wakeup(wpipe); 754 } 755 error = tsleep(wpipe, 756 PRIBIO|PCATCH, "pipbww", 0); 757 if (error) 758 break; 759 } 760 761 space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; 762 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF)) 763 space = 0; 764 765 /* 766 * We must afford contiguous writes on buffers of size 767 * PIPE_BUF or less. 768 */ 769 if (space > 0) { 770 int size = wpipe->pipe_buffer.size - wpipe->pipe_buffer.in; 771 if (size > space) 772 size = space; 773 if (size > uio->uio_resid) 774 size = uio->uio_resid; 775 if ((error = pipelock(wpipe,1)) == 0) { 776 /* 777 * It is possible for a direct write to 778 * slip in on us... handle it here... 779 */ 780 if (wpipe->pipe_state & PIPE_DIRECTW) { 781 pipeunlock(wpipe); 782 goto retrywrite; 783 } 784 error = uiomove( &wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in], 785 size, uio); 786 pipeunlock(wpipe); 787 } 788 if (error) 789 break; 790 791 wpipe->pipe_buffer.in += size; 792 if (wpipe->pipe_buffer.in >= wpipe->pipe_buffer.size) 793 wpipe->pipe_buffer.in = 0; 794 795 wpipe->pipe_buffer.cnt += size; 796 } else { 797 /* 798 * If the "read-side" has been blocked, wake it up now. 799 */ 800 if (wpipe->pipe_state & PIPE_WANTR) { 801 wpipe->pipe_state &= ~PIPE_WANTR; 802 wakeup(wpipe); 803 } 804 805 /* 806 * don't block on non-blocking I/O 807 */ 808 if (nbio) { 809 error = EAGAIN; 810 break; 811 } 812 813 /* 814 * We have no more space and have something to offer, 815 * wake up selects. 816 */ 817 pipeselwakeup(wpipe); 818 819 wpipe->pipe_state |= PIPE_WANTW; 820 if (error = tsleep(wpipe, (PRIBIO+1)|PCATCH, "pipewr", 0)) { 821 break; 822 } 823 /* 824 * If read side wants to go away, we just issue a signal 825 * to ourselves. 826 */ 827 if (wpipe->pipe_state & PIPE_EOF) { 828 error = EPIPE; 829 break; 830 } 831 } 832 } 833 834 if ((wpipe->pipe_busy == 0) && 835 (wpipe->pipe_state & PIPE_WANT)) { 836 wpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTR); 837 wakeup(wpipe); 838 } else if (wpipe->pipe_buffer.cnt > 0) { 839 /* 840 * If we have put any characters in the buffer, we wake up 841 * the reader. 842 */ 843 if (wpipe->pipe_state & PIPE_WANTR) { 844 wpipe->pipe_state &= ~PIPE_WANTR; 845 wakeup(wpipe); 846 } 847 } 848 849 /* 850 * Don't return EPIPE if I/O was successful 851 */ 852 if ((wpipe->pipe_buffer.cnt == 0) && 853 (uio->uio_resid == 0) && 854 (error == EPIPE)) 855 error = 0; 856 857 if (error = 0) { 858 int s = splhigh(); 859 wpipe->pipe_mtime = time; 860 splx(s); 861 } 862 /* 863 * We have something to offer, 864 * wake up select. 865 */ 866 if (wpipe->pipe_buffer.cnt) 867 pipeselwakeup(wpipe); 868 869 --wpipe->pipe_busy; 870 return error; 871} 872 873/* ARGSUSED */ 874static int 875pipe_write(fp, uio, cred) 876 struct file *fp; 877 struct uio *uio; 878 struct ucred *cred; 879{ 880 struct pipe *rpipe = (struct pipe *) fp->f_data; 881 struct pipe *wpipe = rpipe->pipe_peer; 882 return pipewrite(wpipe, uio, (rpipe->pipe_state & PIPE_NBIO)?1:0); 883} 884 885/* 886 * we implement a very minimal set of ioctls for compatibility with sockets. 887 */ 888int 889pipe_ioctl(fp, cmd, data, p) 890 struct file *fp; 891 int cmd; 892 register caddr_t data; 893 struct proc *p; 894{ 895 register struct pipe *mpipe = (struct pipe *)fp->f_data; 896 897 switch (cmd) { 898 899 case FIONBIO: 900 if (*(int *)data) 901 mpipe->pipe_state |= PIPE_NBIO; 902 else 903 mpipe->pipe_state &= ~PIPE_NBIO; 904 return (0); 905 906 case FIOASYNC: 907 if (*(int *)data) { 908 mpipe->pipe_state |= PIPE_ASYNC; 909 } else { 910 mpipe->pipe_state &= ~PIPE_ASYNC; 911 } 912 return (0); 913 914 case FIONREAD: 915 if (mpipe->pipe_state & PIPE_DIRECTW) 916 *(int *)data = mpipe->pipe_map.cnt; 917 else 918 *(int *)data = mpipe->pipe_buffer.cnt; 919 return (0); 920 921 case SIOCSPGRP: 922 mpipe->pipe_pgid = *(int *)data; 923 return (0); 924 925 case SIOCGPGRP: 926 *(int *)data = mpipe->pipe_pgid; 927 return (0); 928 929 } 930 return ENOSYS; 931} 932 933int 934pipe_select(fp, which, p) 935 struct file *fp; 936 int which; 937 struct proc *p; 938{ 939 register struct pipe *rpipe = (struct pipe *)fp->f_data; 940 struct pipe *wpipe; 941 942 wpipe = rpipe->pipe_peer; 943 switch (which) { 944 945 case FREAD: 946 if ( (rpipe->pipe_state & PIPE_DIRECTW) || 947 (rpipe->pipe_buffer.cnt > 0) || 948 (rpipe->pipe_state & PIPE_EOF)) { 949 return (1); 950 } 951 selrecord(p, &rpipe->pipe_sel); 952 rpipe->pipe_state |= PIPE_SEL; 953 break; 954 955 case FWRITE: 956 if ((wpipe == NULL) || 957 (wpipe->pipe_state & PIPE_EOF) || 958 (((wpipe->pipe_state & PIPE_DIRECTW) == 0) && 959 (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)) { 960 return (1); 961 } 962 selrecord(p, &wpipe->pipe_sel); 963 wpipe->pipe_state |= PIPE_SEL; 964 break; 965 966 case 0: 967 if ((rpipe->pipe_state & PIPE_EOF) || 968 (wpipe == NULL) || 969 (wpipe->pipe_state & PIPE_EOF)) { 970 return (1); 971 } 972 973 selrecord(p, &rpipe->pipe_sel); 974 rpipe->pipe_state |= PIPE_SEL; 975 break; 976 } 977 return (0); 978} 979 980int 981pipe_stat(pipe, ub) 982 register struct pipe *pipe; 983 register struct stat *ub; 984{ 985 bzero((caddr_t)ub, sizeof (*ub)); 986 ub->st_mode = S_IFSOCK; 987 ub->st_blksize = pipe->pipe_buffer.size; 988 ub->st_size = pipe->pipe_buffer.cnt; 989 ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize; 990 TIMEVAL_TO_TIMESPEC(&pipe->pipe_atime, &ub->st_atimespec); 991 TIMEVAL_TO_TIMESPEC(&pipe->pipe_mtime, &ub->st_mtimespec); 992 TIMEVAL_TO_TIMESPEC(&pipe->pipe_ctime, &ub->st_ctimespec); 993 return 0; 994} 995 996/* ARGSUSED */ 997static int 998pipe_close(fp, p) 999 struct file *fp; 1000 struct proc *p; 1001{ 1002 int error = 0; 1003 struct pipe *cpipe = (struct pipe *)fp->f_data; 1004 pipeclose(cpipe); 1005 fp->f_data = NULL; 1006 return 0; 1007} 1008 1009/* 1010 * shutdown the pipe 1011 */ 1012static void 1013pipeclose(cpipe) 1014 struct pipe *cpipe; 1015{ 1016 struct pipe *ppipe; 1017 if (cpipe) { 1018 1019 pipeselwakeup(cpipe); 1020 1021 /* 1022 * If the other side is blocked, wake it up saying that 1023 * we want to close it down. 1024 */ 1025 while (cpipe->pipe_busy) { 1026 wakeup(cpipe); 1027 cpipe->pipe_state |= PIPE_WANT|PIPE_EOF; 1028 tsleep(cpipe, PRIBIO, "pipecl", 0); 1029 } 1030 1031 /* 1032 * Disconnect from peer 1033 */ 1034 if (ppipe = cpipe->pipe_peer) { 1035 pipeselwakeup(ppipe); 1036 1037 ppipe->pipe_state |= PIPE_EOF; 1038 wakeup(ppipe); 1039 ppipe->pipe_peer = NULL; 1040 } 1041 1042 /* 1043 * free resources 1044 */ 1045 if (cpipe->pipe_buffer.buffer) { 1046 amountpipekva -= cpipe->pipe_buffer.size; 1047 kmem_free(kernel_map, 1048 (vm_offset_t)cpipe->pipe_buffer.buffer, 1049 cpipe->pipe_buffer.size); 1050 } 1051#ifndef PIPE_NODIRECT 1052 if (cpipe->pipe_map.kva) { 1053 amountpipekva -= cpipe->pipe_buffer.size + PAGE_SIZE; 1054 kmem_free(kernel_map, 1055 cpipe->pipe_map.kva, 1056 cpipe->pipe_buffer.size + PAGE_SIZE); 1057 } 1058#endif 1059 free(cpipe, M_TEMP); 1060 } 1061} 1062#endif 1063