sys_pipe.c revision 16416
1/*
2 * Copyright (c) 1996 John S. Dyson
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice immediately at the beginning of the file, without modification,
10 *    this list of conditions, and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 * 3. Absolutely no warranty of function or purpose is made by the author
15 *    John S. Dyson.
16 * 4. Modifications may be freely made to this file if the above conditions
17 *    are met.
18 *
19 * $Id: sys_pipe.c,v 1.16 1996/06/12 05:07:32 gpalmer Exp $
20 */
21
22#ifndef OLD_PIPE
23
24/*
25 * This file contains a high-performance replacement for the socket-based
26 * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
27 * all features of sockets, but does do everything that pipes normally
28 * do.
29 */
30
31/*
32 * This code has two modes of operation, a small write mode and a large
33 * write mode.  The small write mode acts like conventional pipes with
34 * a kernel buffer.  If the buffer is less than PIPE_MINDIRECT, then the
35 * "normal" pipe buffering is done.  If the buffer is between PIPE_MINDIRECT
36 * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and
37 * the receiving process can copy it directly from the pages in the sending
38 * process.
39 *
40 * If the sending process receives a signal, it is possible that it will
41 * go away, and certainly its address space can change, because control
42 * is returned back to the user-mode side.  In that case, the pipe code
43 * arranges to copy the buffer supplied by the user process, to a pageable
44 * kernel buffer, and the receiving process will grab the data from the
45 * pageable kernel buffer.  Since signals don't happen all that often,
46 * the copy operation is normally eliminated.
47 *
48 * The constant PIPE_MINDIRECT is chosen to make sure that buffering will
49 * happen for small transfers so that the system will not spend all of
50 * its time context switching.  PIPE_SIZE is constrained by the
51 * amount of kernel virtual memory.
52 */
53
54#include <sys/param.h>
55#include <sys/systm.h>
56#include <sys/proc.h>
57#include <sys/file.h>
58#include <sys/protosw.h>
59#include <sys/stat.h>
60#include <sys/filedesc.h>
61#include <sys/malloc.h>
62#include <sys/ioctl.h>
63#include <sys/stat.h>
64#include <sys/select.h>
65#include <sys/signalvar.h>
66#include <sys/errno.h>
67#include <sys/queue.h>
68#include <sys/vmmeter.h>
69#include <sys/kernel.h>
70#include <sys/sysproto.h>
71#include <sys/pipe.h>
72
73#include <vm/vm.h>
74#include <vm/vm_prot.h>
75#include <vm/vm_param.h>
76#include <vm/lock.h>
77#include <vm/vm_object.h>
78#include <vm/vm_kern.h>
79#include <vm/vm_extern.h>
80#include <vm/pmap.h>
81#include <vm/vm_map.h>
82#include <vm/vm_page.h>
83
84/*
85 * Use this define if you want to disable *fancy* VM things.  Expect an
86 * approx 30% decrease in transfer rate.  This could be useful for
87 * NetBSD or OpenBSD.
88 */
89/* #define PIPE_NODIRECT */
90
91/*
92 * interfaces to the outside world
93 */
94static int pipe_read __P((struct file *fp, struct uio *uio,
95		struct ucred *cred));
96static int pipe_write __P((struct file *fp, struct uio *uio,
97		struct ucred *cred));
98static int pipe_close __P((struct file *fp, struct proc *p));
99static int pipe_select __P((struct file *fp, int which, struct proc *p));
100static int pipe_ioctl __P((struct file *fp, int cmd, caddr_t data, struct proc *p));
101
102static struct fileops pipeops =
103    { pipe_read, pipe_write, pipe_ioctl, pipe_select, pipe_close };
104
105/*
106 * Default pipe buffer size(s), this can be kind-of large now because pipe
107 * space is pageable.  The pipe code will try to maintain locality of
108 * reference for performance reasons, so small amounts of outstanding I/O
109 * will not wipe the cache.
110 */
111#define MINPIPESIZE (PIPE_SIZE/3)
112#define MAXPIPESIZE (2*PIPE_SIZE/3)
113
114/*
115 * Maximum amount of kva for pipes -- this is kind-of a soft limit, but
116 * is there so that on large systems, we don't exhaust it.
117 */
118#define MAXPIPEKVA (8*1024*1024)
119
120/*
121 * Limit for direct transfers, we cannot, of course limit
122 * the amount of kva for pipes in general though.
123 */
124#define LIMITPIPEKVA (16*1024*1024)
125int amountpipekva;
126
127static void pipeclose __P((struct pipe *cpipe));
128static void pipeinit __P((struct pipe *cpipe));
129static __inline int pipelock __P((struct pipe *cpipe, int catch));
130static __inline void pipeunlock __P((struct pipe *cpipe));
131static __inline void pipeselwakeup __P((struct pipe *cpipe));
132#ifndef PIPE_NODIRECT
133static int pipe_build_write_buffer __P((struct pipe *wpipe, struct uio *uio));
134static void pipe_destroy_write_buffer __P((struct pipe *wpipe));
135static int pipe_direct_write __P((struct pipe *wpipe, struct uio *uio));
136static void pipe_clone_write_buffer __P((struct pipe *wpipe));
137#endif
138static int pipewrite __P((struct pipe *wpipe, struct uio *uio, int nbio));
139static void pipespace __P((struct pipe *cpipe));
140
141/*
142 * The pipe system call for the DTYPE_PIPE type of pipes
143 */
144
145/* ARGSUSED */
146int
147pipe(p, uap, retval)
148	struct proc *p;
149	struct pipe_args /* {
150		int	dummy;
151	} */ *uap;
152	int retval[];
153{
154	register struct filedesc *fdp = p->p_fd;
155	struct file *rf, *wf;
156	struct pipe *rpipe, *wpipe;
157	int fd, error;
158
159	rpipe = malloc( sizeof (*rpipe), M_TEMP, M_WAITOK);
160	pipeinit(rpipe);
161	rpipe->pipe_state |= PIPE_DIRECTOK;
162	wpipe = malloc( sizeof (*wpipe), M_TEMP, M_WAITOK);
163	pipeinit(wpipe);
164	wpipe->pipe_state |= PIPE_DIRECTOK;
165
166	error = falloc(p, &rf, &fd);
167	if (error)
168		goto free2;
169	retval[0] = fd;
170	rf->f_flag = FREAD | FWRITE;
171	rf->f_type = DTYPE_PIPE;
172	rf->f_ops = &pipeops;
173	rf->f_data = (caddr_t)rpipe;
174	error = falloc(p, &wf, &fd);
175	if (error)
176		goto free3;
177	wf->f_flag = FREAD | FWRITE;
178	wf->f_type = DTYPE_PIPE;
179	wf->f_ops = &pipeops;
180	wf->f_data = (caddr_t)wpipe;
181	retval[1] = fd;
182
183	rpipe->pipe_peer = wpipe;
184	wpipe->pipe_peer = rpipe;
185
186	return (0);
187free3:
188	ffree(rf);
189	fdp->fd_ofiles[retval[0]] = 0;
190free2:
191	(void)pipeclose(wpipe);
192free1:
193	(void)pipeclose(rpipe);
194	return (error);
195}
196
197/*
198 * Allocate kva for pipe circular buffer, the space is pageable
199 */
200static void
201pipespace(cpipe)
202	struct pipe *cpipe;
203{
204	int npages, error;
205
206	npages = round_page(cpipe->pipe_buffer.size)/PAGE_SIZE;
207	/*
208	 * Create an object, I don't like the idea of paging to/from
209	 * kernel_object.
210	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
211	 */
212	cpipe->pipe_buffer.object = vm_object_allocate(OBJT_DEFAULT, npages);
213	cpipe->pipe_buffer.buffer = (caddr_t) vm_map_min(kernel_map);
214
215	/*
216	 * Insert the object into the kernel map, and allocate kva for it.
217	 * The map entry is, by default, pageable.
218	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
219	 */
220	error = vm_map_find(kernel_map, cpipe->pipe_buffer.object, 0,
221		(vm_offset_t *) &cpipe->pipe_buffer.buffer,
222		cpipe->pipe_buffer.size, 1,
223		VM_PROT_ALL, VM_PROT_ALL, 0);
224
225	if (error != KERN_SUCCESS)
226		panic("pipeinit: cannot allocate pipe -- out of kvm -- code = %d", error);
227	amountpipekva += cpipe->pipe_buffer.size;
228}
229
230/*
231 * initialize and allocate VM and memory for pipe
232 */
233static void
234pipeinit(cpipe)
235	struct pipe *cpipe;
236{
237	int s;
238
239	cpipe->pipe_buffer.in = 0;
240	cpipe->pipe_buffer.out = 0;
241	cpipe->pipe_buffer.cnt = 0;
242	cpipe->pipe_buffer.size = PIPE_SIZE;
243	/* Buffer kva gets dynamically allocated */
244	cpipe->pipe_buffer.buffer = NULL;
245
246	cpipe->pipe_state = 0;
247	cpipe->pipe_peer = NULL;
248	cpipe->pipe_busy = 0;
249	s = splhigh();
250	cpipe->pipe_ctime = time;
251	cpipe->pipe_atime = time;
252	cpipe->pipe_mtime = time;
253	splx(s);
254	bzero(&cpipe->pipe_sel, sizeof cpipe->pipe_sel);
255
256#ifndef PIPE_NODIRECT
257	/*
258	 * pipe data structure initializations to support direct pipe I/O
259	 */
260	cpipe->pipe_map.cnt = 0;
261	cpipe->pipe_map.kva = 0;
262	cpipe->pipe_map.pos = 0;
263	cpipe->pipe_map.npages = 0;
264#endif
265}
266
267
268/*
269 * lock a pipe for I/O, blocking other access
270 */
271static __inline int
272pipelock(cpipe, catch)
273	struct pipe *cpipe;
274	int catch;
275{
276	int error;
277	while (cpipe->pipe_state & PIPE_LOCK) {
278		cpipe->pipe_state |= PIPE_LWANT;
279		if (error = tsleep( cpipe,
280			catch?(PRIBIO|PCATCH):PRIBIO, "pipelk", 0)) {
281			return error;
282		}
283	}
284	cpipe->pipe_state |= PIPE_LOCK;
285	return 0;
286}
287
288/*
289 * unlock a pipe I/O lock
290 */
291static __inline void
292pipeunlock(cpipe)
293	struct pipe *cpipe;
294{
295	cpipe->pipe_state &= ~PIPE_LOCK;
296	if (cpipe->pipe_state & PIPE_LWANT) {
297		cpipe->pipe_state &= ~PIPE_LWANT;
298		wakeup(cpipe);
299	}
300	return;
301}
302
303static __inline void
304pipeselwakeup(cpipe)
305	struct pipe *cpipe;
306{
307	if (cpipe->pipe_state & PIPE_SEL) {
308		cpipe->pipe_state &= ~PIPE_SEL;
309		selwakeup(&cpipe->pipe_sel);
310	}
311}
312
313#ifndef PIPE_NODIRECT
314#if 0
315static void
316pipe_mark_pages_clean(cpipe)
317	struct pipe *cpipe;
318{
319	vm_size_t off;
320	vm_page_t m;
321
322	for(off = 0; off < cpipe->pipe_buffer.object->size; off += 1) {
323		m = vm_page_lookup(cpipe->pipe_buffer.object, off);
324		if ((m != NULL) && (m->busy == 0) && (m->flags & PG_BUSY) == 0) {
325			m->dirty = 0;
326			pmap_clear_modify(VM_PAGE_TO_PHYS(m));
327		}
328	}
329}
330#endif
331#endif
332
333/* ARGSUSED */
334static int
335pipe_read(fp, uio, cred)
336	struct file *fp;
337	struct uio *uio;
338	struct ucred *cred;
339{
340
341	struct pipe *rpipe = (struct pipe *) fp->f_data;
342	int error = 0;
343	int nread = 0;
344	int size;
345
346	++rpipe->pipe_busy;
347	while (uio->uio_resid) {
348		/*
349		 * normal pipe buffer receive
350		 */
351		if (rpipe->pipe_buffer.cnt > 0) {
352			int size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
353			if (size > rpipe->pipe_buffer.cnt)
354				size = rpipe->pipe_buffer.cnt;
355			if (size > uio->uio_resid)
356				size = uio->uio_resid;
357			if ((error = pipelock(rpipe,1)) == 0) {
358				error = uiomove( &rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
359					size, uio);
360				pipeunlock(rpipe);
361			}
362			if (error) {
363				break;
364			}
365			rpipe->pipe_buffer.out += size;
366			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
367				rpipe->pipe_buffer.out = 0;
368
369			rpipe->pipe_buffer.cnt -= size;
370			nread += size;
371#ifndef PIPE_NODIRECT
372		/*
373		 * Direct copy, bypassing a kernel buffer.
374		 */
375		} else if ((size = rpipe->pipe_map.cnt) &&
376			(rpipe->pipe_state & PIPE_DIRECTW)) {
377			caddr_t va;
378			if (size > uio->uio_resid)
379				size = uio->uio_resid;
380			if ((error = pipelock(rpipe,1)) == 0) {
381				va = (caddr_t) rpipe->pipe_map.kva + rpipe->pipe_map.pos;
382				error = uiomove(va, size, uio);
383				pipeunlock(rpipe);
384			}
385			if (error)
386				break;
387			nread += size;
388			rpipe->pipe_map.pos += size;
389			rpipe->pipe_map.cnt -= size;
390			if (rpipe->pipe_map.cnt == 0) {
391				rpipe->pipe_state &= ~PIPE_DIRECTW;
392				wakeup(rpipe);
393			}
394#endif
395		} else {
396			/*
397			 * detect EOF condition
398			 */
399			if (rpipe->pipe_state & PIPE_EOF) {
400				/* XXX error = ? */
401				break;
402			}
403			/*
404			 * If the "write-side" has been blocked, wake it up now.
405			 */
406			if (rpipe->pipe_state & PIPE_WANTW) {
407				rpipe->pipe_state &= ~PIPE_WANTW;
408				wakeup(rpipe);
409			}
410			if (nread > 0)
411				break;
412			if (rpipe->pipe_state & PIPE_NBIO) {
413				error = EAGAIN;
414				break;
415			}
416
417			/*
418			 * If there is no more to read in the pipe, reset
419			 * its pointers to the beginning.  This improves
420			 * cache hit stats.
421			 */
422
423			if ((error = pipelock(rpipe,1)) == 0) {
424				if (rpipe->pipe_buffer.cnt == 0) {
425					rpipe->pipe_buffer.in = 0;
426					rpipe->pipe_buffer.out = 0;
427				}
428				pipeunlock(rpipe);
429			} else {
430				break;
431			}
432
433			if (rpipe->pipe_state & PIPE_WANTW) {
434				rpipe->pipe_state &= ~PIPE_WANTW;
435				wakeup(rpipe);
436			}
437
438			rpipe->pipe_state |= PIPE_WANTR;
439			if (error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) {
440				break;
441			}
442		}
443	}
444
445	if (error == 0) {
446		int s = splhigh();
447		rpipe->pipe_atime = time;
448		splx(s);
449	}
450
451	--rpipe->pipe_busy;
452	if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
453		rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
454		wakeup(rpipe);
455	} else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
456		/*
457		 * If there is no more to read in the pipe, reset
458		 * its pointers to the beginning.  This improves
459		 * cache hit stats.
460		 */
461		if ((error == 0) && (error = pipelock(rpipe,1)) == 0) {
462			if (rpipe->pipe_buffer.cnt == 0) {
463#if 0
464				pipe_mark_pages_clean(rpipe);
465#endif
466				rpipe->pipe_buffer.in = 0;
467				rpipe->pipe_buffer.out = 0;
468			}
469			pipeunlock(rpipe);
470		}
471
472		/*
473		 * If the "write-side" has been blocked, wake it up now.
474		 */
475		if (rpipe->pipe_state & PIPE_WANTW) {
476			rpipe->pipe_state &= ~PIPE_WANTW;
477			wakeup(rpipe);
478		}
479	}
480
481	if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
482		pipeselwakeup(rpipe);
483
484	return error;
485}
486
487#ifndef PIPE_NODIRECT
488/*
489 * Map the sending processes' buffer into kernel space and wire it.
490 * This is similar to a physical write operation.
491 */
492static int
493pipe_build_write_buffer(wpipe, uio)
494	struct pipe *wpipe;
495	struct uio *uio;
496{
497	int size;
498	int i;
499	vm_offset_t addr, endaddr, paddr;
500
501	size = uio->uio_iov->iov_len;
502	if (size > wpipe->pipe_buffer.size)
503		size = wpipe->pipe_buffer.size;
504
505	endaddr = round_page(uio->uio_iov->iov_base + size);
506	for(i = 0, addr = trunc_page(uio->uio_iov->iov_base);
507		addr < endaddr;
508		addr += PAGE_SIZE, i+=1) {
509
510		vm_page_t m;
511
512		vm_fault_quick( (caddr_t) addr, VM_PROT_READ);
513		paddr = pmap_kextract(addr);
514		if (!paddr) {
515			int j;
516			for(j=0;j<i;j++)
517				vm_page_unwire(wpipe->pipe_map.ms[j]);
518			return EFAULT;
519		}
520
521		m = PHYS_TO_VM_PAGE(paddr);
522		vm_page_wire(m);
523		wpipe->pipe_map.ms[i] = m;
524	}
525
526/*
527 * set up the control block
528 */
529	wpipe->pipe_map.npages = i;
530	wpipe->pipe_map.pos = ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK;
531	wpipe->pipe_map.cnt = size;
532
533/*
534 * and map the buffer
535 */
536	if (wpipe->pipe_map.kva == 0) {
537		/*
538		 * We need to allocate space for an extra page because the
539		 * address range might (will) span pages at times.
540		 */
541		wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map,
542			wpipe->pipe_buffer.size + PAGE_SIZE);
543		amountpipekva += wpipe->pipe_buffer.size + PAGE_SIZE;
544	}
545	pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms,
546		wpipe->pipe_map.npages);
547
548/*
549 * and update the uio data
550 */
551
552	uio->uio_iov->iov_len -= size;
553	uio->uio_iov->iov_base += size;
554	if (uio->uio_iov->iov_len == 0)
555		uio->uio_iov++;
556	uio->uio_resid -= size;
557	uio->uio_offset += size;
558	return 0;
559}
560
561/*
562 * unmap and unwire the process buffer
563 */
564static void
565pipe_destroy_write_buffer(wpipe)
566struct pipe *wpipe;
567{
568	int i;
569	pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages);
570
571	if (wpipe->pipe_map.kva) {
572		if (amountpipekva > MAXPIPEKVA) {
573			vm_offset_t kva = wpipe->pipe_map.kva;
574			wpipe->pipe_map.kva = 0;
575			kmem_free(kernel_map, kva,
576				wpipe->pipe_buffer.size + PAGE_SIZE);
577			amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE;
578		}
579	}
580	for (i=0;i<wpipe->pipe_map.npages;i++)
581		vm_page_unwire(wpipe->pipe_map.ms[i]);
582}
583
584/*
585 * In the case of a signal, the writing process might go away.  This
586 * code copies the data into the circular buffer so that the source
587 * pages can be freed without loss of data.
588 */
589static void
590pipe_clone_write_buffer(wpipe)
591struct pipe *wpipe;
592{
593	int size;
594	int pos;
595
596	size = wpipe->pipe_map.cnt;
597	pos = wpipe->pipe_map.pos;
598	bcopy((caddr_t) wpipe->pipe_map.kva+pos,
599			(caddr_t) wpipe->pipe_buffer.buffer,
600			size);
601
602	wpipe->pipe_buffer.in = size;
603	wpipe->pipe_buffer.out = 0;
604	wpipe->pipe_buffer.cnt = size;
605	wpipe->pipe_state &= ~PIPE_DIRECTW;
606
607	pipe_destroy_write_buffer(wpipe);
608}
609
610/*
611 * This implements the pipe buffer write mechanism.  Note that only
612 * a direct write OR a normal pipe write can be pending at any given time.
613 * If there are any characters in the pipe buffer, the direct write will
614 * be deferred until the receiving process grabs all of the bytes from
615 * the pipe buffer.  Then the direct mapping write is set-up.
616 */
617static int
618pipe_direct_write(wpipe, uio)
619	struct pipe *wpipe;
620	struct uio *uio;
621{
622	int error;
623retry:
624	while (wpipe->pipe_state & PIPE_DIRECTW) {
625		if ( wpipe->pipe_state & PIPE_WANTR) {
626			wpipe->pipe_state &= ~PIPE_WANTR;
627			wakeup(wpipe);
628		}
629		wpipe->pipe_state |= PIPE_WANTW;
630		error = tsleep(wpipe,
631				PRIBIO|PCATCH, "pipdww", 0);
632		if (error)
633			goto error1;
634		if (wpipe->pipe_state & PIPE_EOF) {
635			error = EPIPE;
636			goto error1;
637		}
638	}
639	wpipe->pipe_map.cnt = 0;	/* transfer not ready yet */
640	if (wpipe->pipe_buffer.cnt > 0) {
641		if ( wpipe->pipe_state & PIPE_WANTR) {
642			wpipe->pipe_state &= ~PIPE_WANTR;
643			wakeup(wpipe);
644		}
645
646		wpipe->pipe_state |= PIPE_WANTW;
647		error = tsleep(wpipe,
648				PRIBIO|PCATCH, "pipdwc", 0);
649		if (error)
650			goto error1;
651		if (wpipe->pipe_state & PIPE_EOF) {
652			error = EPIPE;
653			goto error1;
654		}
655		goto retry;
656	}
657
658	wpipe->pipe_state |= PIPE_DIRECTW;
659
660	error = pipe_build_write_buffer(wpipe, uio);
661	if (error) {
662		wpipe->pipe_state &= ~PIPE_DIRECTW;
663		goto error1;
664	}
665
666	error = 0;
667	while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) {
668		if (wpipe->pipe_state & PIPE_EOF) {
669			pipelock(wpipe, 0);
670			pipe_destroy_write_buffer(wpipe);
671			pipeunlock(wpipe);
672			pipeselwakeup(wpipe);
673			error = EPIPE;
674			goto error1;
675		}
676		if (wpipe->pipe_state & PIPE_WANTR) {
677			wpipe->pipe_state &= ~PIPE_WANTR;
678			wakeup(wpipe);
679		}
680		pipeselwakeup(wpipe);
681		error = tsleep(wpipe, PRIBIO|PCATCH, "pipdwt", 0);
682	}
683
684	pipelock(wpipe,0);
685	if (wpipe->pipe_state & PIPE_DIRECTW) {
686		/*
687		 * this bit of trickery substitutes a kernel buffer for
688		 * the process that might be going away.
689		 */
690		pipe_clone_write_buffer(wpipe);
691	} else {
692		pipe_destroy_write_buffer(wpipe);
693	}
694	pipeunlock(wpipe);
695	return error;
696
697error1:
698	wakeup(wpipe);
699	return error;
700}
701#endif
702
703static __inline int
704pipewrite(wpipe, uio, nbio)
705	struct pipe *wpipe;
706	struct uio *uio;
707	int nbio;
708{
709	int error = 0;
710	int orig_resid;
711
712	/*
713	 * detect loss of pipe read side, issue SIGPIPE if lost.
714	 */
715	if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF)) {
716		return EPIPE;
717	}
718
719	if( wpipe->pipe_buffer.buffer == NULL) {
720		if ((error = pipelock(wpipe,1)) == 0) {
721			pipespace(wpipe);
722			pipeunlock(wpipe);
723		} else {
724			return error;
725		}
726	}
727
728	++wpipe->pipe_busy;
729	orig_resid = uio->uio_resid;
730	while (uio->uio_resid) {
731		int space;
732#ifndef PIPE_NODIRECT
733		/*
734		 * If the transfer is large, we can gain performance if
735		 * we do process-to-process copies directly.
736		 * If the write is non-blocking, we don't use the
737		 * direct write mechanism.
738		 */
739		if ((wpipe->pipe_state & PIPE_NBIO) == 0 &&
740			(amountpipekva < LIMITPIPEKVA) &&
741			(uio->uio_iov->iov_len >= PIPE_MINDIRECT)) {
742			error = pipe_direct_write( wpipe, uio);
743			if (error) {
744				break;
745			}
746			continue;
747		}
748#endif
749
750		/*
751		 * Pipe buffered writes cannot be coincidental with
752		 * direct writes.  We wait until the currently executing
753		 * direct write is completed before we start filling the
754		 * pipe buffer.
755		 */
756	retrywrite:
757		while (wpipe->pipe_state & PIPE_DIRECTW) {
758			if (wpipe->pipe_state & PIPE_WANTR) {
759				wpipe->pipe_state &= ~PIPE_WANTR;
760				wakeup(wpipe);
761			}
762			error = tsleep(wpipe,
763					PRIBIO|PCATCH, "pipbww", 0);
764			if (error)
765				break;
766		}
767
768		space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
769
770		/* Writes of size <= PIPE_BUF must be atomic. */
771		/* XXX perhaps they need to be contiguous to be atomic? */
772		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
773			space = 0;
774
775		if (space > 0) {
776			int size = wpipe->pipe_buffer.size - wpipe->pipe_buffer.in;
777			if (size > space)
778				size = space;
779			if (size > uio->uio_resid)
780				size = uio->uio_resid;
781			if ((error = pipelock(wpipe,1)) == 0) {
782				/*
783				 * It is possible for a direct write to
784				 * slip in on us... handle it here...
785				 */
786				if (wpipe->pipe_state & PIPE_DIRECTW) {
787					pipeunlock(wpipe);
788					goto retrywrite;
789				}
790				error = uiomove( &wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
791					size, uio);
792				pipeunlock(wpipe);
793			}
794			if (error)
795				break;
796
797			wpipe->pipe_buffer.in += size;
798			if (wpipe->pipe_buffer.in >= wpipe->pipe_buffer.size)
799				wpipe->pipe_buffer.in = 0;
800
801			wpipe->pipe_buffer.cnt += size;
802		} else {
803			/*
804			 * If the "read-side" has been blocked, wake it up now.
805			 */
806			if (wpipe->pipe_state & PIPE_WANTR) {
807				wpipe->pipe_state &= ~PIPE_WANTR;
808				wakeup(wpipe);
809			}
810
811			/*
812			 * don't block on non-blocking I/O
813			 */
814			if (nbio) {
815				error = EAGAIN;
816				break;
817			}
818
819			/*
820			 * We have no more space and have something to offer,
821			 * wake up selects.
822			 */
823			pipeselwakeup(wpipe);
824
825			wpipe->pipe_state |= PIPE_WANTW;
826			if (error = tsleep(wpipe, (PRIBIO+1)|PCATCH, "pipewr", 0)) {
827				break;
828			}
829			/*
830			 * If read side wants to go away, we just issue a signal
831			 * to ourselves.
832			 */
833			if (wpipe->pipe_state & PIPE_EOF) {
834				error = EPIPE;
835				break;
836			}
837		}
838	}
839
840	--wpipe->pipe_busy;
841	if ((wpipe->pipe_busy == 0) &&
842		(wpipe->pipe_state & PIPE_WANT)) {
843		wpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTR);
844		wakeup(wpipe);
845	} else if (wpipe->pipe_buffer.cnt > 0) {
846		/*
847		 * If we have put any characters in the buffer, we wake up
848		 * the reader.
849		 */
850		if (wpipe->pipe_state & PIPE_WANTR) {
851			wpipe->pipe_state &= ~PIPE_WANTR;
852			wakeup(wpipe);
853		}
854	}
855
856	/*
857	 * Don't return EPIPE if I/O was successful
858	 */
859	if ((wpipe->pipe_buffer.cnt == 0) &&
860		(uio->uio_resid == 0) &&
861		(error == EPIPE))
862		error = 0;
863
864	if (error == 0) {
865		int s = splhigh();
866		wpipe->pipe_mtime = time;
867		splx(s);
868	}
869	/*
870	 * We have something to offer,
871	 * wake up select.
872	 */
873	if (wpipe->pipe_buffer.cnt)
874		pipeselwakeup(wpipe);
875
876	return error;
877}
878
879/* ARGSUSED */
880static int
881pipe_write(fp, uio, cred)
882	struct file *fp;
883	struct uio *uio;
884	struct ucred *cred;
885{
886	struct pipe *rpipe = (struct pipe *) fp->f_data;
887	struct pipe *wpipe = rpipe->pipe_peer;
888	return pipewrite(wpipe, uio, (rpipe->pipe_state & PIPE_NBIO)?1:0);
889}
890
891/*
892 * we implement a very minimal set of ioctls for compatibility with sockets.
893 */
894int
895pipe_ioctl(fp, cmd, data, p)
896	struct file *fp;
897	int cmd;
898	register caddr_t data;
899	struct proc *p;
900{
901	register struct pipe *mpipe = (struct pipe *)fp->f_data;
902
903	switch (cmd) {
904
905	case FIONBIO:
906		if (*(int *)data)
907			mpipe->pipe_state |= PIPE_NBIO;
908		else
909			mpipe->pipe_state &= ~PIPE_NBIO;
910		return (0);
911
912	case FIOASYNC:
913		if (*(int *)data) {
914			mpipe->pipe_state |= PIPE_ASYNC;
915		} else {
916			mpipe->pipe_state &= ~PIPE_ASYNC;
917		}
918		return (0);
919
920	case FIONREAD:
921		if (mpipe->pipe_state & PIPE_DIRECTW)
922			*(int *)data = mpipe->pipe_map.cnt;
923		else
924			*(int *)data = mpipe->pipe_buffer.cnt;
925		return (0);
926
927	case SIOCSPGRP:
928		mpipe->pipe_pgid = *(int *)data;
929		return (0);
930
931	case SIOCGPGRP:
932		*(int *)data = mpipe->pipe_pgid;
933		return (0);
934
935	}
936	return ENOSYS;
937}
938
939int
940pipe_select(fp, which, p)
941	struct file *fp;
942	int which;
943	struct proc *p;
944{
945	register struct pipe *rpipe = (struct pipe *)fp->f_data;
946	struct pipe *wpipe;
947
948	wpipe = rpipe->pipe_peer;
949	switch (which) {
950
951	case FREAD:
952		if ( (rpipe->pipe_state & PIPE_DIRECTW) ||
953			(rpipe->pipe_buffer.cnt > 0) ||
954			(rpipe->pipe_state & PIPE_EOF)) {
955			return (1);
956		}
957		selrecord(p, &rpipe->pipe_sel);
958		rpipe->pipe_state |= PIPE_SEL;
959		break;
960
961	case FWRITE:
962		if ((wpipe == NULL) ||
963			(wpipe->pipe_state & PIPE_EOF) ||
964			(((wpipe->pipe_state & PIPE_DIRECTW) == 0) &&
965			 (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)) {
966			return (1);
967		}
968		selrecord(p, &wpipe->pipe_sel);
969		wpipe->pipe_state |= PIPE_SEL;
970		break;
971
972	case 0:
973		if ((rpipe->pipe_state & PIPE_EOF) ||
974			(wpipe == NULL) ||
975			(wpipe->pipe_state & PIPE_EOF)) {
976			return (1);
977		}
978
979		selrecord(p, &rpipe->pipe_sel);
980		rpipe->pipe_state |= PIPE_SEL;
981		break;
982	}
983	return (0);
984}
985
986int
987pipe_stat(pipe, ub)
988	register struct pipe *pipe;
989	register struct stat *ub;
990{
991	bzero((caddr_t)ub, sizeof (*ub));
992	ub->st_mode = S_IFSOCK;
993	ub->st_blksize = pipe->pipe_buffer.size;
994	ub->st_size = pipe->pipe_buffer.cnt;
995	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
996	TIMEVAL_TO_TIMESPEC(&pipe->pipe_atime, &ub->st_atimespec);
997	TIMEVAL_TO_TIMESPEC(&pipe->pipe_mtime, &ub->st_mtimespec);
998	TIMEVAL_TO_TIMESPEC(&pipe->pipe_ctime, &ub->st_ctimespec);
999	return 0;
1000}
1001
1002/* ARGSUSED */
1003static int
1004pipe_close(fp, p)
1005	struct file *fp;
1006	struct proc *p;
1007{
1008	struct pipe *cpipe = (struct pipe *)fp->f_data;
1009
1010	pipeclose(cpipe);
1011	fp->f_data = NULL;
1012	return 0;
1013}
1014
1015/*
1016 * shutdown the pipe
1017 */
1018static void
1019pipeclose(cpipe)
1020	struct pipe *cpipe;
1021{
1022	struct pipe *ppipe;
1023	if (cpipe) {
1024
1025		pipeselwakeup(cpipe);
1026
1027		/*
1028		 * If the other side is blocked, wake it up saying that
1029		 * we want to close it down.
1030		 */
1031		while (cpipe->pipe_busy) {
1032			wakeup(cpipe);
1033			cpipe->pipe_state |= PIPE_WANT|PIPE_EOF;
1034			tsleep(cpipe, PRIBIO, "pipecl", 0);
1035		}
1036
1037		/*
1038		 * Disconnect from peer
1039		 */
1040		if (ppipe = cpipe->pipe_peer) {
1041			pipeselwakeup(ppipe);
1042
1043			ppipe->pipe_state |= PIPE_EOF;
1044			wakeup(ppipe);
1045			ppipe->pipe_peer = NULL;
1046		}
1047
1048		/*
1049		 * free resources
1050		 */
1051		if (cpipe->pipe_buffer.buffer) {
1052			amountpipekva -= cpipe->pipe_buffer.size;
1053			kmem_free(kernel_map,
1054				(vm_offset_t)cpipe->pipe_buffer.buffer,
1055				cpipe->pipe_buffer.size);
1056		}
1057#ifndef PIPE_NODIRECT
1058		if (cpipe->pipe_map.kva) {
1059			amountpipekva -= cpipe->pipe_buffer.size + PAGE_SIZE;
1060			kmem_free(kernel_map,
1061				cpipe->pipe_map.kva,
1062				cpipe->pipe_buffer.size + PAGE_SIZE);
1063		}
1064#endif
1065		free(cpipe, M_TEMP);
1066	}
1067}
1068#endif
1069