sys_pipe.c revision 14177
154371Ssemenu/*
254371Ssemenu * Copyright (c) 1996 John S. Dyson
354371Ssemenu * All rights reserved.
454371Ssemenu *
554371Ssemenu * Redistribution and use in source and binary forms, with or without
654371Ssemenu * modification, are permitted provided that the following conditions
754371Ssemenu * are met:
854371Ssemenu * 1. Redistributions of source code must retain the above copyright
954371Ssemenu *    notice immediately at the beginning of the file, without modification,
1054371Ssemenu *    this list of conditions, and the following disclaimer.
1154371Ssemenu * 2. Redistributions in binary form must reproduce the above copyright
1254371Ssemenu *    notice, this list of conditions and the following disclaimer in the
1354371Ssemenu *    documentation and/or other materials provided with the distribution.
1454371Ssemenu * 3. Absolutely no warranty of function or purpose is made by the author
1554371Ssemenu *    John S. Dyson.
1654371Ssemenu * 4. Modifications may be freely made to this file if the above conditions
1754371Ssemenu *    are met.
1854371Ssemenu *
1954371Ssemenu * $Id: sys_pipe.c,v 1.12 1996/02/17 14:47:16 peter Exp $
2054371Ssemenu */
2154371Ssemenu
2254371Ssemenu#ifndef OLD_PIPE
2354371Ssemenu
2454371Ssemenu/*
2554371Ssemenu * This file contains a high-performance replacement for the socket-based
2654371Ssemenu * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
2754371Ssemenu * all features of sockets, but does do everything that pipes normally
2854371Ssemenu * do.
2954371Ssemenu */
3054371Ssemenu
3154371Ssemenu/*
3254371Ssemenu * This code has two modes of operation, a small write mode and a large
3354371Ssemenu * write mode.  The small write mode acts like conventional pipes with
3454371Ssemenu * a kernel buffer.  If the buffer is less than PIPE_MINDIRECT, then the
3554371Ssemenu * "normal" pipe buffering is done.  If the buffer is between PIPE_MINDIRECT
3654371Ssemenu * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and
3754371Ssemenu * the receiving process can copy it directly from the pages in the sending
3860041Sphk * process.
3954371Ssemenu *
4054371Ssemenu * If the sending process receives a signal, it is possible that it will
4154371Ssemenu * go away, and certainly its address space can change, because control
4254371Ssemenu * is returned back to the user-mode side.  In that case, the pipe code
43137040Sphk * arranges to copy the buffer supplied by the user process, to a pageable
44137040Sphk * kernel buffer, and the receiving process will grab the data from the
45137040Sphk * pageable kernel buffer.  Since signals don't happen all that often,
4654371Ssemenu * the copy operation is normally eliminated.
4754371Ssemenu *
4854371Ssemenu * The constant PIPE_MINDIRECT is chosen to make sure that buffering will
4954371Ssemenu * happen for small transfers so that the system will not spend all of
5054371Ssemenu * its time context switching.  PIPE_SIZE is constrained by the
5154371Ssemenu * amount of kernel virtual memory.
5254371Ssemenu */
5354371Ssemenu
5454371Ssemenu#include <sys/param.h>
5554371Ssemenu#include <sys/systm.h>
5654371Ssemenu#include <sys/proc.h>
5754371Ssemenu#include <sys/file.h>
5883384Sjhb#include <sys/protosw.h>
5986927Sjhb#include <sys/stat.h>
6086927Sjhb#include <sys/filedesc.h>
6192727Salfred#include <sys/malloc.h>
62138487Sphk#include <sys/ioctl.h>
6354371Ssemenu#include <sys/stat.h>
64116271Sphk#include <sys/select.h>
65116271Sphk#include <sys/signalvar.h>
66138487Sphk#include <sys/errno.h>
67138487Sphk#include <sys/queue.h>
68116271Sphk#include <sys/vmmeter.h>
69116271Sphk#include <sys/kernel.h>
70116271Sphk#include <sys/sysproto.h>
71116271Sphk#include <sys/pipe.h>
72116271Sphk
7354371Ssemenu#include <vm/vm.h>
74138487Sphk#include <vm/vm_prot.h>
75138487Sphk#include <vm/vm_param.h>
76138487Sphk#include <vm/lock.h>
77138487Sphk#include <vm/vm_object.h>
78138487Sphk#include <vm/vm_kern.h>
79138487Sphk#include <vm/vm_extern.h>
80138487Sphk#include <vm/pmap.h>
81138487Sphk#include <vm/vm_map.h>
82138487Sphk#include <vm/vm_page.h>
83138487Sphk
84138487Sphk/*
85138487Sphk * Use this define if you want to disable *fancy* VM things.  Expect an
86138487Sphk * approx 30% decrease in transfer rate.  This could be useful for
87138487Sphk * NetBSD or OpenBSD.
88138487Sphk */
89138487Sphk/* #define PIPE_NODIRECT */
90138487Sphk
91138487Sphk/*
92138487Sphk * interfaces to the outside world
93138487Sphk */
94138487Sphkstatic int pipe_read __P((struct file *fp, struct uio *uio,
95138487Sphk		struct ucred *cred));
96138487Sphkstatic int pipe_write __P((struct file *fp, struct uio *uio,
97138487Sphk		struct ucred *cred));
98138487Sphkstatic int pipe_close __P((struct file *fp, struct proc *p));
99138487Sphkstatic int pipe_select __P((struct file *fp, int which, struct proc *p));
100138487Sphkstatic int pipe_ioctl __P((struct file *fp, int cmd, caddr_t data, struct proc *p));
101138487Sphk
102138487Sphkstatic struct fileops pipeops =
103138487Sphk    { pipe_read, pipe_write, pipe_ioctl, pipe_select, pipe_close };
104138487Sphk
105138487Sphk/*
106138487Sphk * Default pipe buffer size(s), this can be kind-of large now because pipe
107138487Sphk * space is pageable.  The pipe code will try to maintain locality of
10854371Ssemenu * reference for performance reasons, so small amounts of outstanding I/O
10986931Sjhb * will not wipe the cache.
11054371Ssemenu */
111138487Sphk#define MINPIPESIZE (PIPE_SIZE/3)
11254371Ssemenu#define MAXPIPESIZE (2*PIPE_SIZE/3)
11354371Ssemenu
114132902Sphk/*
115138487Sphk * Maximum amount of kva for pipes -- this is kind-of a soft limit, but
116138487Sphk * is there so that on large systems, we don't exhaust it.
11754371Ssemenu */
118132902Sphk#define MAXPIPEKVA (8*1024*1024)
11954371Ssemenu
12054371Ssemenu/*
12196755Strhodes * Limit for direct transfers, we cannot, of course limit
12254371Ssemenu * the amount of kva for pipes in general though.
12354371Ssemenu */
124138487Sphk#define LIMITPIPEKVA (16*1024*1024)
125138487Sphkint amountpipekva;
12654371Ssemenu
127138487Sphkstatic void pipeclose __P((struct pipe *cpipe));
128138487Sphkstatic void pipebufferinit __P((struct pipe *cpipe));
129138487Sphkstatic void pipeinit __P((struct pipe *cpipe));
13054371Ssemenustatic __inline int pipelock __P((struct pipe *cpipe, int catch));
13154371Ssemenustatic __inline void pipeunlock __P((struct pipe *cpipe));
13254371Ssemenustatic __inline void pipeselwakeup __P((struct pipe *cpipe));
13354371Ssemenu#ifndef PIPE_NODIRECT
13454371Ssemenustatic int pipe_build_write_buffer __P((struct pipe *wpipe, struct uio *uio));
13554371Ssemenustatic void pipe_destroy_write_buffer __P((struct pipe *wpipe));
136132902Sphkstatic int pipe_direct_write __P((struct pipe *wpipe, struct uio *uio));
13754371Ssemenustatic void pipe_clone_write_buffer __P((struct pipe *wpipe));
13854371Ssemenustatic void pipe_mark_pages_clean __P((struct pipe *cpipe));
13954371Ssemenu#endif
140138487Sphkstatic int pipewrite __P((struct pipe *wpipe, struct uio *uio, int nbio));
141138487Sphkstatic void pipespace __P((struct pipe *cpipe));
142138487Sphk
143138487Sphk/*
144138487Sphk * The pipe system call for the DTYPE_PIPE type of pipes
14554371Ssemenu */
146138487Sphk
14754371Ssemenu/* ARGSUSED */
148132902Sphkint
14954371Ssemenupipe(p, uap, retval)
15054371Ssemenu	struct proc *p;
15154371Ssemenu	struct pipe_args /* {
15254371Ssemenu		int	dummy;
15354371Ssemenu	} */ *uap;
15454371Ssemenu	int retval[];
15554371Ssemenu{
15654371Ssemenu	register struct filedesc *fdp = p->p_fd;
15754371Ssemenu	struct file *rf, *wf;
15854371Ssemenu	struct pipe *rpipe, *wpipe;
15954371Ssemenu	int fd, error;
16054371Ssemenu
16154371Ssemenu	rpipe = malloc( sizeof (*rpipe), M_TEMP, M_WAITOK);
16254371Ssemenu	pipeinit(rpipe);
16354371Ssemenu	rpipe->pipe_state |= PIPE_DIRECTOK;
164138487Sphk	wpipe = malloc( sizeof (*wpipe), M_TEMP, M_WAITOK);
165132902Sphk	pipeinit(wpipe);
16654371Ssemenu	wpipe->pipe_state |= PIPE_DIRECTOK;
16754371Ssemenu
16854371Ssemenu	error = falloc(p, &rf, &fd);
16954371Ssemenu	if (error)
17054371Ssemenu		goto free2;
171132902Sphk	retval[0] = fd;
17254371Ssemenu	rf->f_flag = FREAD | FWRITE;
17355756Sphk	rf->f_type = DTYPE_PIPE;
17454371Ssemenu	rf->f_ops = &pipeops;
17554371Ssemenu	rf->f_data = (caddr_t)rpipe;
17654371Ssemenu	error = falloc(p, &wf, &fd);
17754371Ssemenu	if (error)
17854371Ssemenu		goto free3;
17954371Ssemenu	wf->f_flag = FREAD | FWRITE;
18054371Ssemenu	wf->f_type = DTYPE_PIPE;
18154371Ssemenu	wf->f_ops = &pipeops;
18254371Ssemenu	wf->f_data = (caddr_t)wpipe;
18354371Ssemenu	retval[1] = fd;
18454371Ssemenu
18573286Sadrian	rpipe->pipe_peer = wpipe;
186132902Sphk	wpipe->pipe_peer = rpipe;
18773286Sadrian
18873286Sadrian	return (0);
18973286Sadrianfree3:
19054371Ssemenu	ffree(rf);
19154371Ssemenu	fdp->fd_ofiles[retval[0]] = 0;
192138487Sphkfree2:
19354371Ssemenu	(void)pipeclose(wpipe);
194138487Sphkfree1:
19554371Ssemenu	(void)pipeclose(rpipe);
19654371Ssemenu	return (error);
19754371Ssemenu}
19854371Ssemenu
19954371Ssemenu/*
20054371Ssemenu * Allocate kva for pipe circular buffer, the space is pageable
20154371Ssemenu */
20254371Ssemenustatic void
20354371Ssemenupipespace(cpipe)
20454371Ssemenu	struct pipe *cpipe;
20554371Ssemenu{
20654371Ssemenu	int npages, error;
207132902Sphk
20854371Ssemenu	npages = round_page(cpipe->pipe_buffer.size)/PAGE_SIZE;
20954371Ssemenu	/*
21054371Ssemenu	 * Create an object, I don't like the idea of paging to/from
21154371Ssemenu	 * kernel_object.
21254371Ssemenu	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
21354371Ssemenu	 */
21454371Ssemenu	cpipe->pipe_buffer.object = vm_object_allocate(OBJT_DEFAULT, npages);
21554371Ssemenu	cpipe->pipe_buffer.buffer = (caddr_t) vm_map_min(kernel_map);
21654371Ssemenu
217138487Sphk	/*
21854371Ssemenu	 * Insert the object into the kernel map, and allocate kva for it.
21954371Ssemenu	 * The map entry is, by default, pageable.
22086931Sjhb	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
22154371Ssemenu	 */
222138487Sphk	error = vm_map_find(kernel_map, cpipe->pipe_buffer.object, 0,
22354371Ssemenu		(vm_offset_t *) &cpipe->pipe_buffer.buffer,
22454371Ssemenu		cpipe->pipe_buffer.size, 1,
22554371Ssemenu		VM_PROT_ALL, VM_PROT_ALL, 0);
22654371Ssemenu
22754371Ssemenu	if (error != KERN_SUCCESS)
228130585Sphk		panic("pipeinit: cannot allocate pipe -- out of kvm -- code = %d", error);
229137040Sphk	amountpipekva += cpipe->pipe_buffer.size;
230137040Sphk}
23154371Ssemenu
232137478Sphk/*
233137478Sphk * initialize and allocate VM and memory for pipe
23454371Ssemenu */
23554371Ssemenustatic void
23686931Sjhbpipeinit(cpipe)
237137040Sphk	struct pipe *cpipe;
238137040Sphk{
239137040Sphk	int s;
240137040Sphk
241137040Sphk	cpipe->pipe_buffer.in = 0;
242137040Sphk	cpipe->pipe_buffer.out = 0;
24386931Sjhb	cpipe->pipe_buffer.cnt = 0;
24454371Ssemenu	cpipe->pipe_buffer.size = PIPE_SIZE;
24554371Ssemenu	/* Buffer kva gets dynamically allocated */
24654371Ssemenu	cpipe->pipe_buffer.buffer = NULL;
247137040Sphk
248137040Sphk	cpipe->pipe_state = 0;
249137040Sphk	cpipe->pipe_peer = NULL;
250137040Sphk	cpipe->pipe_busy = 0;
25154371Ssemenu	s = splhigh();
25254371Ssemenu	cpipe->pipe_ctime = time;
25354371Ssemenu	cpipe->pipe_atime = time;
254111119Simp	cpipe->pipe_mtime = time;
25554371Ssemenu	splx(s);
256137040Sphk	bzero(&cpipe->pipe_sel, sizeof cpipe->pipe_sel);
257137040Sphk
258137040Sphk#ifndef PIPE_NODIRECT
25954371Ssemenu	/*
26054371Ssemenu	 * pipe data structure initializations to support direct pipe I/O
26154371Ssemenu	 */
26254371Ssemenu	cpipe->pipe_map.cnt = 0;
26354371Ssemenu	cpipe->pipe_map.kva = 0;
26454371Ssemenu	cpipe->pipe_map.pos = 0;
26554371Ssemenu	cpipe->pipe_map.npages = 0;
26654371Ssemenu#endif
26754371Ssemenu}
26854371Ssemenu
26954371Ssemenu
27054371Ssemenu/*
27154371Ssemenu * lock a pipe for I/O, blocking other access
27254371Ssemenu */
27354371Ssemenustatic __inline int
27454371Ssemenupipelock(cpipe, catch)
27554371Ssemenu	struct pipe *cpipe;
27654371Ssemenu	int catch;
27754371Ssemenu{
27854371Ssemenu	int error;
27954371Ssemenu	while (cpipe->pipe_state & PIPE_LOCK) {
28054371Ssemenu		cpipe->pipe_state |= PIPE_LWANT;
28154371Ssemenu		if (error = tsleep( cpipe,
28254371Ssemenu			catch?(PRIBIO|PCATCH):PRIBIO, "pipelk", 0)) {
28354371Ssemenu			return error;
28454371Ssemenu		}
28554371Ssemenu	}
28654371Ssemenu	cpipe->pipe_state |= PIPE_LOCK;
28754371Ssemenu	return 0;
28854371Ssemenu}
28954371Ssemenu
29054371Ssemenu/*
29154371Ssemenu * unlock a pipe I/O lock
292138487Sphk */
293138487Sphkstatic __inline void
294138487Sphkpipeunlock(cpipe)
295138487Sphk	struct pipe *cpipe;
296138487Sphk{
297138487Sphk	cpipe->pipe_state &= ~PIPE_LOCK;
29854371Ssemenu	if (cpipe->pipe_state & PIPE_LWANT) {
29954371Ssemenu		cpipe->pipe_state &= ~PIPE_LWANT;
30054371Ssemenu		wakeup(cpipe);
30154371Ssemenu	}
30254371Ssemenu	return;
303138487Sphk}
30454371Ssemenu
30554371Ssemenustatic __inline void
30654371Ssemenupipeselwakeup(cpipe)
30754371Ssemenu	struct pipe *cpipe;
30854371Ssemenu{
309132023Salfred	if (cpipe->pipe_state & PIPE_SEL) {
31054371Ssemenu		cpipe->pipe_state &= ~PIPE_SEL;
31154371Ssemenu		selwakeup(&cpipe->pipe_sel);
31254371Ssemenu	}
31354371Ssemenu}
31454371Ssemenu
31554371Ssemenu#ifndef PIPE_NODIRECT
31654371Ssemenu#if 0
31754371Ssemenustatic void
31854371Ssemenupipe_mark_pages_clean(cpipe)
31954371Ssemenu	struct pipe *cpipe;
32054371Ssemenu{
32154371Ssemenu	vm_size_t off;
32254371Ssemenu	vm_page_t m;
32354371Ssemenu
32454371Ssemenu	for(off = 0; off < cpipe->pipe_buffer.object->size; off += 1) {
32554371Ssemenu		m = vm_page_lookup(cpipe->pipe_buffer.object, off);
32654371Ssemenu		if ((m != NULL) && (m->busy == 0) && (m->flags & PG_BUSY) == 0) {
32754371Ssemenu			m->dirty = 0;
328140822Sphk			pmap_clear_modify(VM_PAGE_TO_PHYS(m));
32954371Ssemenu		}
33054371Ssemenu	}
33154371Ssemenu}
33254371Ssemenu#endif
33354371Ssemenu#endif
33454371Ssemenu
33554371Ssemenu/* ARGSUSED */
33686931Sjhbstatic int
33754371Ssemenupipe_read(fp, uio, cred)
33854371Ssemenu	struct file *fp;
33954371Ssemenu	struct uio *uio;
34054371Ssemenu	struct ucred *cred;
34154371Ssemenu{
34254371Ssemenu
34354371Ssemenu	struct pipe *rpipe = (struct pipe *) fp->f_data;
34454371Ssemenu	int error = 0;
34554371Ssemenu	int nread = 0;
34654371Ssemenu	int size;
34754371Ssemenu
34854371Ssemenu	++rpipe->pipe_busy;
34954371Ssemenu	while (uio->uio_resid) {
35054371Ssemenu		/*
351132023Salfred		 * normal pipe buffer receive
35254371Ssemenu		 */
35354371Ssemenu		if (rpipe->pipe_buffer.cnt > 0) {
35454371Ssemenu			int size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
35554371Ssemenu			if (size > rpipe->pipe_buffer.cnt)
35654371Ssemenu				size = rpipe->pipe_buffer.cnt;
357140220Sphk			if (size > uio->uio_resid)
358140822Sphk				size = uio->uio_resid;
35954371Ssemenu			if ((error = pipelock(rpipe,1)) == 0) {
36054371Ssemenu				error = uiomove( &rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
36154371Ssemenu					size, uio);
36254371Ssemenu				pipeunlock(rpipe);
36354371Ssemenu			}
36454371Ssemenu			if (error) {
36554371Ssemenu				break;
36654371Ssemenu			}
36754371Ssemenu			rpipe->pipe_buffer.out += size;
36854371Ssemenu			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
36954371Ssemenu				rpipe->pipe_buffer.out = 0;
37054371Ssemenu
37154371Ssemenu			rpipe->pipe_buffer.cnt -= size;
37254371Ssemenu			nread += size;
37354371Ssemenu#ifndef PIPE_NODIRECT
374132023Salfred		/*
375132023Salfred		 * Direct copy, bypassing a kernel buffer.
37654371Ssemenu		 */
37754371Ssemenu		} else if ((size = rpipe->pipe_map.cnt) &&
37854371Ssemenu			(rpipe->pipe_state & PIPE_DIRECTW)) {
37954371Ssemenu			caddr_t va;
38054371Ssemenu			if (size > uio->uio_resid)
38192462Smckusick				size = uio->uio_resid;
38254371Ssemenu			if ((error = pipelock(rpipe,1)) == 0) {
38354371Ssemenu				va = (caddr_t) rpipe->pipe_map.kva + rpipe->pipe_map.pos;
38454371Ssemenu				error = uiomove(va, size, uio);
38554371Ssemenu				pipeunlock(rpipe);
38654371Ssemenu			}
38754371Ssemenu			if (error)
38854371Ssemenu				break;
38954371Ssemenu			nread += size;
39054371Ssemenu			rpipe->pipe_map.pos += size;
39154371Ssemenu			rpipe->pipe_map.cnt -= size;
39254371Ssemenu			if (rpipe->pipe_map.cnt == 0) {
39354371Ssemenu				rpipe->pipe_state &= ~PIPE_DIRECTW;
39486931Sjhb				wakeup(rpipe);
39554371Ssemenu			}
39654371Ssemenu#endif
39754371Ssemenu		} else {
39854371Ssemenu			/*
39954371Ssemenu			 * detect EOF condition
40054371Ssemenu			 */
40154371Ssemenu			if (rpipe->pipe_state & PIPE_EOF) {
40254371Ssemenu				break;
40354371Ssemenu			}
40454371Ssemenu			/*
40554371Ssemenu			 * If the "write-side" has been blocked, wake it up now.
40654371Ssemenu			 */
40754371Ssemenu			if (rpipe->pipe_state & PIPE_WANTW) {
40854371Ssemenu				rpipe->pipe_state &= ~PIPE_WANTW;
40954371Ssemenu				wakeup(rpipe);
41054371Ssemenu			}
41154371Ssemenu			if (nread > 0)
41254371Ssemenu				break;
41354371Ssemenu			if (rpipe->pipe_state & PIPE_NBIO) {
41454371Ssemenu				error = EAGAIN;
41554371Ssemenu				break;
41654371Ssemenu			}
41754371Ssemenu
41854371Ssemenu			/*
41954371Ssemenu			 * If there is no more to read in the pipe, reset
42054371Ssemenu			 * its pointers to the beginning.  This improves
42154371Ssemenu			 * cache hit stats.
42254371Ssemenu			 */
42354371Ssemenu
42492462Smckusick			if ((error = pipelock(rpipe,1)) == 0) {
42554371Ssemenu				if (rpipe->pipe_buffer.cnt == 0) {
42654371Ssemenu					rpipe->pipe_buffer.in = 0;
42754371Ssemenu					rpipe->pipe_buffer.out = 0;
42854371Ssemenu				}
42954371Ssemenu				pipeunlock(rpipe);
43054371Ssemenu			} else {
43154371Ssemenu				break;
43254371Ssemenu			}
43354371Ssemenu
43454371Ssemenu			if (rpipe->pipe_state & PIPE_WANTW) {
43554371Ssemenu				rpipe->pipe_state &= ~PIPE_WANTW;
43654371Ssemenu				wakeup(rpipe);
43754371Ssemenu			}
43854371Ssemenu
43954371Ssemenu			rpipe->pipe_state |= PIPE_WANTR;
44054371Ssemenu			if (error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) {
44154371Ssemenu				break;
44254371Ssemenu			}
44354371Ssemenu		}
44454371Ssemenu	}
44554371Ssemenu
44654371Ssemenu	if (error == 0) {
44754371Ssemenu		int s = splhigh();
44854371Ssemenu		rpipe->pipe_atime = time;
44954371Ssemenu		splx(s);
45054371Ssemenu	}
45154371Ssemenu
45254371Ssemenu	--rpipe->pipe_busy;
45354371Ssemenu	if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
45454371Ssemenu		rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
45592462Smckusick		wakeup(rpipe);
45654371Ssemenu	} else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
45754371Ssemenu		/*
45854371Ssemenu		 * If there is no more to read in the pipe, reset
45954371Ssemenu		 * its pointers to the beginning.  This improves
46054371Ssemenu		 * cache hit stats.
46154371Ssemenu		 */
46254371Ssemenu		if ((error == 0) && (error = pipelock(rpipe,1)) == 0) {
46354371Ssemenu			if (rpipe->pipe_buffer.cnt == 0) {
46454371Ssemenu#if 0
46554371Ssemenu				pipe_mark_pages_clean(rpipe);
466143692Sphk#endif
467143619Sphk				rpipe->pipe_buffer.in = 0;
468143588Sphk				rpipe->pipe_buffer.out = 0;
469143588Sphk			}
47054371Ssemenu			pipeunlock(rpipe);
47154371Ssemenu		}
47254371Ssemenu
47354371Ssemenu		/*
47454371Ssemenu		 * If the "write-side" has been blocked, wake it up now.
47554371Ssemenu		 */
47654371Ssemenu		if (rpipe->pipe_state & PIPE_WANTW) {
47754371Ssemenu			rpipe->pipe_state &= ~PIPE_WANTW;
47854371Ssemenu			wakeup(rpipe);
47954371Ssemenu		}
48054371Ssemenu	}
48154371Ssemenu
48254371Ssemenu	if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) > PIPE_BUF)
48354371Ssemenu		pipeselwakeup(rpipe);
48454371Ssemenu
48554371Ssemenu	return error;
48654371Ssemenu}
487111119Simp
48854371Ssemenu#ifndef PIPE_NODIRECT
489138290Sphk/*
49054371Ssemenu * Map the sending processes' buffer into kernel space and wire it.
49154371Ssemenu * This is similar to a physical write operation.
49254371Ssemenu */
49354371Ssemenustatic int
49454371Ssemenupipe_build_write_buffer(wpipe, uio)
49554371Ssemenu	struct pipe *wpipe;
49654371Ssemenu	struct uio *uio;
49754371Ssemenu{
49854371Ssemenu	int size;
49954371Ssemenu	int i;
50054371Ssemenu	vm_offset_t addr, endaddr, paddr;
501101308Sjeff
50254371Ssemenu	size = uio->uio_iov->iov_len;
50371576Sjasone	if (size > wpipe->pipe_buffer.size)
50493818Sjhb		size = wpipe->pipe_buffer.size;
50554371Ssemenu
50654371Ssemenu	endaddr = round_page(uio->uio_iov->iov_base + size);
50754371Ssemenu	for(i = 0, addr = trunc_page(uio->uio_iov->iov_base);
50854371Ssemenu		addr < endaddr;
50954371Ssemenu		addr += PAGE_SIZE, i+=1) {
51054371Ssemenu
51154371Ssemenu		vm_page_t m;
51254371Ssemenu
51354371Ssemenu		vm_fault_quick( (caddr_t) addr, VM_PROT_READ);
51454371Ssemenu		paddr = pmap_kextract(addr);
51554371Ssemenu		if (!paddr) {
516143692Sphk			int j;
517143663Sphk			for(j=0;j<i;j++)
51854371Ssemenu				vm_page_unwire(wpipe->pipe_map.ms[j]);
51954371Ssemenu			return EFAULT;
52054371Ssemenu		}
52154371Ssemenu
52254371Ssemenu		m = PHYS_TO_VM_PAGE(paddr);
52354371Ssemenu		vm_page_wire(m);
52454371Ssemenu		wpipe->pipe_map.ms[i] = m;
52554371Ssemenu	}
52654371Ssemenu
52754371Ssemenu/*
52854371Ssemenu * set up the control block
52954371Ssemenu */
53054371Ssemenu	wpipe->pipe_map.npages = i;
53154371Ssemenu	wpipe->pipe_map.pos = ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK;
53254371Ssemenu	wpipe->pipe_map.cnt = size;
53354371Ssemenu
53454371Ssemenu/*
53554371Ssemenu * and map the buffer
53654371Ssemenu */
53754371Ssemenu	if (wpipe->pipe_map.kva == 0) {
53854371Ssemenu		/*
53954371Ssemenu		 * We need to allocate space for an extra page because the
54054371Ssemenu		 * address range might (will) span pages at times.
54154371Ssemenu		 */
54254371Ssemenu		wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map,
54354371Ssemenu			wpipe->pipe_buffer.size + PAGE_SIZE);
544116271Sphk		amountpipekva += wpipe->pipe_buffer.size + PAGE_SIZE;
545138487Sphk	}
546138487Sphk	pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms,
547116271Sphk		wpipe->pipe_map.npages);
548116271Sphk
549116271Sphk/*
550116271Sphk * and update the uio data
551116271Sphk */
55254371Ssemenu
55354371Ssemenu	uio->uio_iov->iov_len -= size;
554	uio->uio_iov->iov_base += size;
555	if (uio->uio_iov->iov_len == 0)
556		uio->uio_iov++;
557	uio->uio_resid -= size;
558	uio->uio_offset += size;
559	return 0;
560}
561
562/*
563 * unmap and unwire the process buffer
564 */
565static void
566pipe_destroy_write_buffer(wpipe)
567struct pipe *wpipe;
568{
569	int i;
570	pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages);
571
572	if (wpipe->pipe_map.kva) {
573		if (amountpipekva > MAXPIPEKVA) {
574			vm_offset_t kva = wpipe->pipe_map.kva;
575			wpipe->pipe_map.kva = 0;
576			kmem_free(kernel_map, kva,
577				wpipe->pipe_buffer.size + PAGE_SIZE);
578			amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE;
579		}
580	}
581	for (i=0;i<wpipe->pipe_map.npages;i++)
582		vm_page_unwire(wpipe->pipe_map.ms[i]);
583}
584
585/*
586 * In the case of a signal, the writing process might go away.  This
587 * code copies the data into the circular buffer so that the source
588 * pages can be freed without loss of data.
589 */
590static void
591pipe_clone_write_buffer(wpipe)
592struct pipe *wpipe;
593{
594	int size;
595	int pos;
596
597	size = wpipe->pipe_map.cnt;
598	pos = wpipe->pipe_map.pos;
599	bcopy((caddr_t) wpipe->pipe_map.kva+pos,
600			(caddr_t) wpipe->pipe_buffer.buffer,
601			size);
602
603	wpipe->pipe_buffer.in = size;
604	wpipe->pipe_buffer.out = 0;
605	wpipe->pipe_buffer.cnt = size;
606	wpipe->pipe_state &= ~PIPE_DIRECTW;
607
608	pipe_destroy_write_buffer(wpipe);
609}
610
611/*
612 * This implements the pipe buffer write mechanism.  Note that only
613 * a direct write OR a normal pipe write can be pending at any given time.
614 * If there are any characters in the pipe buffer, the direct write will
615 * be deferred until the receiving process grabs all of the bytes from
616 * the pipe buffer.  Then the direct mapping write is set-up.
617 */
618static int
619pipe_direct_write(wpipe, uio)
620	struct pipe *wpipe;
621	struct uio *uio;
622{
623	int error;
624retry:
625	while (wpipe->pipe_state & PIPE_DIRECTW) {
626		if ( wpipe->pipe_state & PIPE_WANTR) {
627			wpipe->pipe_state &= ~PIPE_WANTR;
628			wakeup(wpipe);
629		}
630		wpipe->pipe_state |= PIPE_WANTW;
631		error = tsleep(wpipe,
632				PRIBIO|PCATCH, "pipdww", 0);
633		if (error || (wpipe->pipe_state & PIPE_EOF))
634			goto error1;
635	}
636	wpipe->pipe_map.cnt = 0;	/* transfer not ready yet */
637	if (wpipe->pipe_buffer.cnt > 0) {
638		if ( wpipe->pipe_state & PIPE_WANTR) {
639			wpipe->pipe_state &= ~PIPE_WANTR;
640			wakeup(wpipe);
641		}
642
643		wpipe->pipe_state |= PIPE_WANTW;
644		error = tsleep(wpipe,
645				PRIBIO|PCATCH, "pipdwc", 0);
646		if (error || (wpipe->pipe_state & PIPE_EOF)) {
647			if (error == 0)
648				error = EPIPE;
649			goto error1;
650		}
651		goto retry;
652	}
653
654	wpipe->pipe_state |= PIPE_DIRECTW;
655
656	error = pipe_build_write_buffer(wpipe, uio);
657	if (error) {
658		wpipe->pipe_state &= ~PIPE_DIRECTW;
659		goto error1;
660	}
661
662	error = 0;
663	while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) {
664		if (wpipe->pipe_state & PIPE_EOF) {
665			pipelock(wpipe, 0);
666			pipe_destroy_write_buffer(wpipe);
667			pipeunlock(wpipe);
668			pipeselwakeup(wpipe);
669			wakeup(wpipe);
670			return EPIPE;
671		}
672		if (wpipe->pipe_state & PIPE_WANTR) {
673			wpipe->pipe_state &= ~PIPE_WANTR;
674			wakeup(wpipe);
675		}
676		pipeselwakeup(wpipe);
677		error = tsleep(wpipe, PRIBIO|PCATCH, "pipdwt", 0);
678	}
679
680	pipelock(wpipe,0);
681	if (wpipe->pipe_state & PIPE_DIRECTW) {
682		/*
683		 * this bit of trickery substitutes a kernel buffer for
684		 * the process that might be going away.
685		 */
686		pipe_clone_write_buffer(wpipe);
687	} else {
688		pipe_destroy_write_buffer(wpipe);
689	}
690	pipeunlock(wpipe);
691	return error;
692
693error1:
694	wakeup(wpipe);
695	return error;
696}
697#endif
698
699static __inline int
700pipewrite(wpipe, uio, nbio)
701	struct pipe *wpipe;
702	struct uio *uio;
703	int nbio;
704{
705	int error = 0;
706	int orig_resid;
707
708	/*
709	 * detect loss of pipe read side, issue SIGPIPE if lost.
710	 */
711	if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF)) {
712		return EPIPE;
713	}
714
715	if( wpipe->pipe_buffer.buffer == NULL) {
716		if ((error = pipelock(wpipe,1)) == 0) {
717			pipespace(wpipe);
718			pipeunlock(wpipe);
719		} else {
720			return error;
721		}
722	}
723
724	++wpipe->pipe_busy;
725	orig_resid = uio->uio_resid;
726	while (uio->uio_resid) {
727		int space;
728#ifndef PIPE_NODIRECT
729		/*
730		 * If the transfer is large, we can gain performance if
731		 * we do process-to-process copies directly.
732		 */
733		if ((amountpipekva < LIMITPIPEKVA) &&
734			(uio->uio_iov->iov_len >= PIPE_MINDIRECT)) {
735			error = pipe_direct_write( wpipe, uio);
736			if (error) {
737				break;
738			}
739			continue;
740		}
741#endif
742
743		/*
744		 * Pipe buffered writes cannot be coincidental with
745		 * direct writes.  We wait until the currently executing
746		 * direct write is completed before we start filling the
747		 * pipe buffer.
748		 */
749	retrywrite:
750		while (wpipe->pipe_state & PIPE_DIRECTW) {
751			if (wpipe->pipe_state & PIPE_WANTR) {
752				wpipe->pipe_state &= ~PIPE_WANTR;
753				wakeup(wpipe);
754			}
755			error = tsleep(wpipe,
756					PRIBIO|PCATCH, "pipbww", 0);
757			if (error)
758				break;
759		}
760
761		space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
762		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
763			space = 0;
764
765		/*
766		 * We must afford contiguous writes on buffers of size
767		 * PIPE_BUF or less.
768		 */
769		if (space > 0) {
770			int size = wpipe->pipe_buffer.size - wpipe->pipe_buffer.in;
771			if (size > space)
772				size = space;
773			if (size > uio->uio_resid)
774				size = uio->uio_resid;
775			if ((error = pipelock(wpipe,1)) == 0) {
776				/*
777				 * It is possible for a direct write to
778				 * slip in on us... handle it here...
779				 */
780				if (wpipe->pipe_state & PIPE_DIRECTW) {
781					pipeunlock(wpipe);
782					goto retrywrite;
783				}
784				error = uiomove( &wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
785					size, uio);
786				pipeunlock(wpipe);
787			}
788			if (error)
789				break;
790
791			wpipe->pipe_buffer.in += size;
792			if (wpipe->pipe_buffer.in >= wpipe->pipe_buffer.size)
793				wpipe->pipe_buffer.in = 0;
794
795			wpipe->pipe_buffer.cnt += size;
796		} else {
797			/*
798			 * If the "read-side" has been blocked, wake it up now.
799			 */
800			if (wpipe->pipe_state & PIPE_WANTR) {
801				wpipe->pipe_state &= ~PIPE_WANTR;
802				wakeup(wpipe);
803			}
804
805			/*
806			 * don't block on non-blocking I/O
807			 */
808			if (nbio) {
809				error = EAGAIN;
810				break;
811			}
812
813			/*
814			 * We have no more space and have something to offer,
815			 * wake up selects.
816			 */
817			pipeselwakeup(wpipe);
818
819			wpipe->pipe_state |= PIPE_WANTW;
820			if (error = tsleep(wpipe, (PRIBIO+1)|PCATCH, "pipewr", 0)) {
821				break;
822			}
823			/*
824			 * If read side wants to go away, we just issue a signal
825			 * to ourselves.
826			 */
827			if (wpipe->pipe_state & PIPE_EOF) {
828				error = EPIPE;
829				break;
830			}
831		}
832	}
833
834	if ((wpipe->pipe_busy == 0) &&
835		(wpipe->pipe_state & PIPE_WANT)) {
836		wpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTR);
837		wakeup(wpipe);
838	} else if (wpipe->pipe_buffer.cnt > 0) {
839		/*
840		 * If we have put any characters in the buffer, we wake up
841		 * the reader.
842		 */
843		if (wpipe->pipe_state & PIPE_WANTR) {
844			wpipe->pipe_state &= ~PIPE_WANTR;
845			wakeup(wpipe);
846		}
847	}
848
849	/*
850	 * Don't return EPIPE if I/O was successful
851	 */
852	if ((wpipe->pipe_buffer.cnt == 0) &&
853		(uio->uio_resid == 0) &&
854		(error == EPIPE))
855		error = 0;
856
857	if (error = 0) {
858		int s = splhigh();
859		wpipe->pipe_mtime = time;
860		splx(s);
861	}
862	/*
863	 * We have something to offer,
864	 * wake up select.
865	 */
866	if (wpipe->pipe_buffer.cnt)
867		pipeselwakeup(wpipe);
868
869	--wpipe->pipe_busy;
870	return error;
871}
872
873/* ARGSUSED */
874static int
875pipe_write(fp, uio, cred)
876	struct file *fp;
877	struct uio *uio;
878	struct ucred *cred;
879{
880	struct pipe *rpipe = (struct pipe *) fp->f_data;
881	struct pipe *wpipe = rpipe->pipe_peer;
882	return pipewrite(wpipe, uio, (rpipe->pipe_state & PIPE_NBIO)?1:0);
883}
884
885/*
886 * we implement a very minimal set of ioctls for compatibility with sockets.
887 */
888int
889pipe_ioctl(fp, cmd, data, p)
890	struct file *fp;
891	int cmd;
892	register caddr_t data;
893	struct proc *p;
894{
895	register struct pipe *mpipe = (struct pipe *)fp->f_data;
896
897	switch (cmd) {
898
899	case FIONBIO:
900		if (*(int *)data)
901			mpipe->pipe_state |= PIPE_NBIO;
902		else
903			mpipe->pipe_state &= ~PIPE_NBIO;
904		return (0);
905
906	case FIOASYNC:
907		if (*(int *)data) {
908			mpipe->pipe_state |= PIPE_ASYNC;
909		} else {
910			mpipe->pipe_state &= ~PIPE_ASYNC;
911		}
912		return (0);
913
914	case FIONREAD:
915		if (mpipe->pipe_state & PIPE_DIRECTW)
916			*(int *)data = mpipe->pipe_map.cnt;
917		else
918			*(int *)data = mpipe->pipe_buffer.cnt;
919		return (0);
920
921	case SIOCSPGRP:
922		mpipe->pipe_pgid = *(int *)data;
923		return (0);
924
925	case SIOCGPGRP:
926		*(int *)data = mpipe->pipe_pgid;
927		return (0);
928
929	}
930	return ENOSYS;
931}
932
933int
934pipe_select(fp, which, p)
935	struct file *fp;
936	int which;
937	struct proc *p;
938{
939	register struct pipe *rpipe = (struct pipe *)fp->f_data;
940	struct pipe *wpipe;
941
942	wpipe = rpipe->pipe_peer;
943	switch (which) {
944
945	case FREAD:
946		if ( (rpipe->pipe_state & PIPE_DIRECTW) ||
947			(rpipe->pipe_buffer.cnt > 0) ||
948			(rpipe->pipe_state & PIPE_EOF)) {
949			return (1);
950		}
951		selrecord(p, &rpipe->pipe_sel);
952		rpipe->pipe_state |= PIPE_SEL;
953		break;
954
955	case FWRITE:
956		if ((wpipe == NULL) ||
957			(wpipe->pipe_state & PIPE_EOF) ||
958			(((wpipe->pipe_state & PIPE_DIRECTW) == 0) &&
959			 (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)) {
960			return (1);
961		}
962		selrecord(p, &wpipe->pipe_sel);
963		wpipe->pipe_state |= PIPE_SEL;
964		break;
965
966	case 0:
967		if ((rpipe->pipe_state & PIPE_EOF) ||
968			(wpipe == NULL) ||
969			(wpipe->pipe_state & PIPE_EOF)) {
970			return (1);
971		}
972
973		selrecord(p, &rpipe->pipe_sel);
974		rpipe->pipe_state |= PIPE_SEL;
975		break;
976	}
977	return (0);
978}
979
980int
981pipe_stat(pipe, ub)
982	register struct pipe *pipe;
983	register struct stat *ub;
984{
985	bzero((caddr_t)ub, sizeof (*ub));
986	ub->st_mode = S_IFSOCK;
987	ub->st_blksize = pipe->pipe_buffer.size;
988	ub->st_size = pipe->pipe_buffer.cnt;
989	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
990	TIMEVAL_TO_TIMESPEC(&pipe->pipe_atime, &ub->st_atimespec);
991	TIMEVAL_TO_TIMESPEC(&pipe->pipe_mtime, &ub->st_mtimespec);
992	TIMEVAL_TO_TIMESPEC(&pipe->pipe_ctime, &ub->st_ctimespec);
993	return 0;
994}
995
996/* ARGSUSED */
997static int
998pipe_close(fp, p)
999	struct file *fp;
1000	struct proc *p;
1001{
1002	int error = 0;
1003	struct pipe *cpipe = (struct pipe *)fp->f_data;
1004	pipeclose(cpipe);
1005	fp->f_data = NULL;
1006	return 0;
1007}
1008
1009/*
1010 * shutdown the pipe
1011 */
1012static void
1013pipeclose(cpipe)
1014	struct pipe *cpipe;
1015{
1016	struct pipe *ppipe;
1017	if (cpipe) {
1018
1019		pipeselwakeup(cpipe);
1020
1021		/*
1022		 * If the other side is blocked, wake it up saying that
1023		 * we want to close it down.
1024		 */
1025		while (cpipe->pipe_busy) {
1026			wakeup(cpipe);
1027			cpipe->pipe_state |= PIPE_WANT|PIPE_EOF;
1028			tsleep(cpipe, PRIBIO, "pipecl", 0);
1029		}
1030
1031		/*
1032		 * Disconnect from peer
1033		 */
1034		if (ppipe = cpipe->pipe_peer) {
1035			pipeselwakeup(ppipe);
1036
1037			ppipe->pipe_state |= PIPE_EOF;
1038			wakeup(ppipe);
1039			ppipe->pipe_peer = NULL;
1040		}
1041
1042		/*
1043		 * free resources
1044		 */
1045		if (cpipe->pipe_buffer.buffer) {
1046			amountpipekva -= cpipe->pipe_buffer.size;
1047			kmem_free(kernel_map,
1048				(vm_offset_t)cpipe->pipe_buffer.buffer,
1049				cpipe->pipe_buffer.size);
1050		}
1051#ifndef PIPE_NODIRECT
1052		if (cpipe->pipe_map.kva) {
1053			amountpipekva -= cpipe->pipe_buffer.size + PAGE_SIZE;
1054			kmem_free(kernel_map,
1055				cpipe->pipe_map.kva,
1056				cpipe->pipe_buffer.size + PAGE_SIZE);
1057		}
1058#endif
1059		free(cpipe, M_TEMP);
1060	}
1061}
1062#endif
1063