sys_pipe.c revision 24206
1/*
2 * Copyright (c) 1996 John S. Dyson
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice immediately at the beginning of the file, without modification,
10 *    this list of conditions, and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 * 3. Absolutely no warranty of function or purpose is made by the author
15 *    John S. Dyson.
16 * 4. Modifications may be freely made to this file if the above conditions
17 *    are met.
18 *
19 * $Id: sys_pipe.c,v 1.26 1997/03/23 03:36:24 bde Exp $
20 */
21
22#ifndef OLD_PIPE
23
24/*
25 * This file contains a high-performance replacement for the socket-based
26 * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
27 * all features of sockets, but does do everything that pipes normally
28 * do.
29 */
30
31/*
32 * This code has two modes of operation, a small write mode and a large
33 * write mode.  The small write mode acts like conventional pipes with
34 * a kernel buffer.  If the buffer is less than PIPE_MINDIRECT, then the
35 * "normal" pipe buffering is done.  If the buffer is between PIPE_MINDIRECT
36 * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and
37 * the receiving process can copy it directly from the pages in the sending
38 * process.
39 *
40 * If the sending process receives a signal, it is possible that it will
41 * go away, and certainly its address space can change, because control
42 * is returned back to the user-mode side.  In that case, the pipe code
43 * arranges to copy the buffer supplied by the user process, to a pageable
44 * kernel buffer, and the receiving process will grab the data from the
45 * pageable kernel buffer.  Since signals don't happen all that often,
46 * the copy operation is normally eliminated.
47 *
48 * The constant PIPE_MINDIRECT is chosen to make sure that buffering will
49 * happen for small transfers so that the system will not spend all of
50 * its time context switching.  PIPE_SIZE is constrained by the
51 * amount of kernel virtual memory.
52 */
53
54#include <sys/param.h>
55#include <sys/systm.h>
56#include <sys/proc.h>
57#include <sys/fcntl.h>
58#include <sys/file.h>
59#include <sys/protosw.h>
60#include <sys/stat.h>
61#include <sys/filedesc.h>
62#include <sys/malloc.h>
63#include <sys/filio.h>
64#include <sys/ttycom.h>
65#include <sys/stat.h>
66#include <sys/select.h>
67#include <sys/signalvar.h>
68#include <sys/errno.h>
69#include <sys/queue.h>
70#include <sys/vmmeter.h>
71#include <sys/kernel.h>
72#include <sys/sysproto.h>
73#include <sys/pipe.h>
74
75#include <vm/vm.h>
76#include <vm/vm_prot.h>
77#include <vm/vm_param.h>
78#include <sys/lock.h>
79#include <vm/vm_object.h>
80#include <vm/vm_kern.h>
81#include <vm/vm_extern.h>
82#include <vm/pmap.h>
83#include <vm/vm_map.h>
84#include <vm/vm_page.h>
85
86/*
87 * Use this define if you want to disable *fancy* VM things.  Expect an
88 * approx 30% decrease in transfer rate.  This could be useful for
89 * NetBSD or OpenBSD.
90 */
91/* #define PIPE_NODIRECT */
92
93/*
94 * interfaces to the outside world
95 */
96static int pipe_read __P((struct file *fp, struct uio *uio,
97		struct ucred *cred));
98static int pipe_write __P((struct file *fp, struct uio *uio,
99		struct ucred *cred));
100static int pipe_close __P((struct file *fp, struct proc *p));
101static int pipe_select __P((struct file *fp, int which, struct proc *p));
102static int pipe_ioctl __P((struct file *fp, int cmd, caddr_t data, struct proc *p));
103
104static struct fileops pipeops =
105    { pipe_read, pipe_write, pipe_ioctl, pipe_select, pipe_close };
106
107/*
108 * Default pipe buffer size(s), this can be kind-of large now because pipe
109 * space is pageable.  The pipe code will try to maintain locality of
110 * reference for performance reasons, so small amounts of outstanding I/O
111 * will not wipe the cache.
112 */
113#define MINPIPESIZE (PIPE_SIZE/3)
114#define MAXPIPESIZE (2*PIPE_SIZE/3)
115
116/*
117 * Maximum amount of kva for pipes -- this is kind-of a soft limit, but
118 * is there so that on large systems, we don't exhaust it.
119 */
120#define MAXPIPEKVA (8*1024*1024)
121
122/*
123 * Limit for direct transfers, we cannot, of course limit
124 * the amount of kva for pipes in general though.
125 */
126#define LIMITPIPEKVA (16*1024*1024)
127
128/*
129 * Limit the number of "big" pipes
130 */
131#define LIMITBIGPIPES	32
132int nbigpipe;
133
134static int amountpipekva;
135
136static void pipeclose __P((struct pipe *cpipe));
137static void pipeinit __P((struct pipe *cpipe));
138static __inline int pipelock __P((struct pipe *cpipe, int catch));
139static __inline void pipeunlock __P((struct pipe *cpipe));
140static __inline void pipeselwakeup __P((struct pipe *cpipe));
141#ifndef PIPE_NODIRECT
142static int pipe_build_write_buffer __P((struct pipe *wpipe, struct uio *uio));
143static void pipe_destroy_write_buffer __P((struct pipe *wpipe));
144static int pipe_direct_write __P((struct pipe *wpipe, struct uio *uio));
145static void pipe_clone_write_buffer __P((struct pipe *wpipe));
146#endif
147static void pipespace __P((struct pipe *cpipe));
148
149/*
150 * The pipe system call for the DTYPE_PIPE type of pipes
151 */
152
153/* ARGSUSED */
154int
155pipe(p, uap, retval)
156	struct proc *p;
157	struct pipe_args /* {
158		int	dummy;
159	} */ *uap;
160	int retval[];
161{
162	register struct filedesc *fdp = p->p_fd;
163	struct file *rf, *wf;
164	struct pipe *rpipe, *wpipe;
165	int fd, error;
166
167	rpipe = malloc( sizeof (*rpipe), M_TEMP, M_WAITOK);
168	pipeinit(rpipe);
169	rpipe->pipe_state |= PIPE_DIRECTOK;
170	wpipe = malloc( sizeof (*wpipe), M_TEMP, M_WAITOK);
171	pipeinit(wpipe);
172	wpipe->pipe_state |= PIPE_DIRECTOK;
173
174	error = falloc(p, &rf, &fd);
175	if (error)
176		goto free2;
177	retval[0] = fd;
178	rf->f_flag = FREAD | FWRITE;
179	rf->f_type = DTYPE_PIPE;
180	rf->f_ops = &pipeops;
181	rf->f_data = (caddr_t)rpipe;
182	error = falloc(p, &wf, &fd);
183	if (error)
184		goto free3;
185	wf->f_flag = FREAD | FWRITE;
186	wf->f_type = DTYPE_PIPE;
187	wf->f_ops = &pipeops;
188	wf->f_data = (caddr_t)wpipe;
189	retval[1] = fd;
190
191	rpipe->pipe_peer = wpipe;
192	wpipe->pipe_peer = rpipe;
193
194	return (0);
195free3:
196	ffree(rf);
197	fdp->fd_ofiles[retval[0]] = 0;
198free2:
199	(void)pipeclose(wpipe);
200	(void)pipeclose(rpipe);
201	return (error);
202}
203
204/*
205 * Allocate kva for pipe circular buffer, the space is pageable
206 */
207static void
208pipespace(cpipe)
209	struct pipe *cpipe;
210{
211	int npages, error;
212
213	npages = round_page(cpipe->pipe_buffer.size)/PAGE_SIZE;
214	/*
215	 * Create an object, I don't like the idea of paging to/from
216	 * kernel_object.
217	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
218	 */
219	cpipe->pipe_buffer.object = vm_object_allocate(OBJT_DEFAULT, npages);
220	cpipe->pipe_buffer.buffer = (caddr_t) vm_map_min(kernel_map);
221
222	/*
223	 * Insert the object into the kernel map, and allocate kva for it.
224	 * The map entry is, by default, pageable.
225	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
226	 */
227	error = vm_map_find(kernel_map, cpipe->pipe_buffer.object, 0,
228		(vm_offset_t *) &cpipe->pipe_buffer.buffer,
229		cpipe->pipe_buffer.size, 1,
230		VM_PROT_ALL, VM_PROT_ALL, 0);
231
232	if (error != KERN_SUCCESS)
233		panic("pipeinit: cannot allocate pipe -- out of kvm -- code = %d", error);
234	amountpipekva += cpipe->pipe_buffer.size;
235}
236
237/*
238 * initialize and allocate VM and memory for pipe
239 */
240static void
241pipeinit(cpipe)
242	struct pipe *cpipe;
243{
244	int s;
245
246	cpipe->pipe_buffer.in = 0;
247	cpipe->pipe_buffer.out = 0;
248	cpipe->pipe_buffer.cnt = 0;
249	cpipe->pipe_buffer.size = PIPE_SIZE;
250
251	/* Buffer kva gets dynamically allocated */
252	cpipe->pipe_buffer.buffer = NULL;
253	/* cpipe->pipe_buffer.object = invalid */
254
255	cpipe->pipe_state = 0;
256	cpipe->pipe_peer = NULL;
257	cpipe->pipe_busy = 0;
258	gettime(&cpipe->pipe_ctime);
259	cpipe->pipe_atime = cpipe->pipe_ctime;
260	cpipe->pipe_mtime = cpipe->pipe_ctime;
261	bzero(&cpipe->pipe_sel, sizeof cpipe->pipe_sel);
262	cpipe->pipe_pgid = NO_PID;
263
264#ifndef PIPE_NODIRECT
265	/*
266	 * pipe data structure initializations to support direct pipe I/O
267	 */
268	cpipe->pipe_map.cnt = 0;
269	cpipe->pipe_map.kva = 0;
270	cpipe->pipe_map.pos = 0;
271	cpipe->pipe_map.npages = 0;
272	/* cpipe->pipe_map.ms[] = invalid */
273#endif
274}
275
276
277/*
278 * lock a pipe for I/O, blocking other access
279 */
280static __inline int
281pipelock(cpipe, catch)
282	struct pipe *cpipe;
283	int catch;
284{
285	int error;
286	while (cpipe->pipe_state & PIPE_LOCK) {
287		cpipe->pipe_state |= PIPE_LWANT;
288		if (error = tsleep( cpipe,
289			catch?(PRIBIO|PCATCH):PRIBIO, "pipelk", 0)) {
290			return error;
291		}
292	}
293	cpipe->pipe_state |= PIPE_LOCK;
294	return 0;
295}
296
297/*
298 * unlock a pipe I/O lock
299 */
300static __inline void
301pipeunlock(cpipe)
302	struct pipe *cpipe;
303{
304	cpipe->pipe_state &= ~PIPE_LOCK;
305	if (cpipe->pipe_state & PIPE_LWANT) {
306		cpipe->pipe_state &= ~PIPE_LWANT;
307		wakeup(cpipe);
308	}
309}
310
311static __inline void
312pipeselwakeup(cpipe)
313	struct pipe *cpipe;
314{
315	struct proc *p;
316
317	if (cpipe->pipe_state & PIPE_SEL) {
318		cpipe->pipe_state &= ~PIPE_SEL;
319		selwakeup(&cpipe->pipe_sel);
320	}
321	if (cpipe->pipe_state & PIPE_ASYNC) {
322		if (cpipe->pipe_pgid < 0)
323			gsignal(-cpipe->pipe_pgid, SIGIO);
324		else if ((p = pfind(cpipe->pipe_pgid)) != NULL)
325			psignal(p, SIGIO);
326	}
327}
328
329/* ARGSUSED */
330static int
331pipe_read(fp, uio, cred)
332	struct file *fp;
333	struct uio *uio;
334	struct ucred *cred;
335{
336
337	struct pipe *rpipe = (struct pipe *) fp->f_data;
338	int error = 0;
339	int nread = 0;
340	u_int size;
341
342	++rpipe->pipe_busy;
343	while (uio->uio_resid) {
344		/*
345		 * normal pipe buffer receive
346		 */
347		if (rpipe->pipe_buffer.cnt > 0) {
348			size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
349			if (size > rpipe->pipe_buffer.cnt)
350				size = rpipe->pipe_buffer.cnt;
351			if (size > (u_int) uio->uio_resid)
352				size = (u_int) uio->uio_resid;
353			if ((error = pipelock(rpipe,1)) == 0) {
354				error = uiomove( &rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
355					size, uio);
356				pipeunlock(rpipe);
357			}
358			if (error) {
359				break;
360			}
361			rpipe->pipe_buffer.out += size;
362			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
363				rpipe->pipe_buffer.out = 0;
364
365			rpipe->pipe_buffer.cnt -= size;
366			nread += size;
367#ifndef PIPE_NODIRECT
368		/*
369		 * Direct copy, bypassing a kernel buffer.
370		 */
371		} else if ((size = rpipe->pipe_map.cnt) &&
372			(rpipe->pipe_state & PIPE_DIRECTW)) {
373			caddr_t va;
374			if (size > (u_int) uio->uio_resid)
375				size = (u_int) uio->uio_resid;
376			if ((error = pipelock(rpipe,1)) == 0) {
377				va = (caddr_t) rpipe->pipe_map.kva + rpipe->pipe_map.pos;
378				error = uiomove(va, size, uio);
379				pipeunlock(rpipe);
380			}
381			if (error)
382				break;
383			nread += size;
384			rpipe->pipe_map.pos += size;
385			rpipe->pipe_map.cnt -= size;
386			if (rpipe->pipe_map.cnt == 0) {
387				rpipe->pipe_state &= ~PIPE_DIRECTW;
388				wakeup(rpipe);
389			}
390#endif
391		} else {
392			/*
393			 * detect EOF condition
394			 */
395			if (rpipe->pipe_state & PIPE_EOF) {
396				/* XXX error = ? */
397				break;
398			}
399			/*
400			 * If the "write-side" has been blocked, wake it up now.
401			 */
402			if (rpipe->pipe_state & PIPE_WANTW) {
403				rpipe->pipe_state &= ~PIPE_WANTW;
404				wakeup(rpipe);
405			}
406			if (nread > 0)
407				break;
408
409			if (fp->f_flag & FNONBLOCK) {
410				error = EAGAIN;
411				break;
412			}
413
414			/*
415			 * If there is no more to read in the pipe, reset
416			 * its pointers to the beginning.  This improves
417			 * cache hit stats.
418			 */
419
420			if ((error = pipelock(rpipe,1)) == 0) {
421				if (rpipe->pipe_buffer.cnt == 0) {
422					rpipe->pipe_buffer.in = 0;
423					rpipe->pipe_buffer.out = 0;
424				}
425				pipeunlock(rpipe);
426			} else {
427				break;
428			}
429
430			if (rpipe->pipe_state & PIPE_WANTW) {
431				rpipe->pipe_state &= ~PIPE_WANTW;
432				wakeup(rpipe);
433			}
434
435			rpipe->pipe_state |= PIPE_WANTR;
436			if (error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) {
437				break;
438			}
439		}
440	}
441
442	if (error == 0)
443		gettime(&rpipe->pipe_atime);
444
445	--rpipe->pipe_busy;
446	if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
447		rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
448		wakeup(rpipe);
449	} else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
450		/*
451		 * If there is no more to read in the pipe, reset
452		 * its pointers to the beginning.  This improves
453		 * cache hit stats.
454		 */
455		if (rpipe->pipe_buffer.cnt == 0) {
456			if ((error == 0) && (error = pipelock(rpipe,1)) == 0) {
457				rpipe->pipe_buffer.in = 0;
458				rpipe->pipe_buffer.out = 0;
459				pipeunlock(rpipe);
460			}
461		}
462
463		/*
464		 * If the "write-side" has been blocked, wake it up now.
465		 */
466		if (rpipe->pipe_state & PIPE_WANTW) {
467			rpipe->pipe_state &= ~PIPE_WANTW;
468			wakeup(rpipe);
469		}
470	}
471
472	if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
473		pipeselwakeup(rpipe);
474
475	return error;
476}
477
478#ifndef PIPE_NODIRECT
479/*
480 * Map the sending processes' buffer into kernel space and wire it.
481 * This is similar to a physical write operation.
482 */
483static int
484pipe_build_write_buffer(wpipe, uio)
485	struct pipe *wpipe;
486	struct uio *uio;
487{
488	u_int size;
489	int i;
490	vm_offset_t addr, endaddr, paddr;
491
492	size = (u_int) uio->uio_iov->iov_len;
493	if (size > wpipe->pipe_buffer.size)
494		size = wpipe->pipe_buffer.size;
495
496	endaddr = round_page(uio->uio_iov->iov_base + size);
497	for(i = 0, addr = trunc_page(uio->uio_iov->iov_base);
498		addr < endaddr;
499		addr += PAGE_SIZE, i+=1) {
500
501		vm_page_t m;
502
503		vm_fault_quick( (caddr_t) addr, VM_PROT_READ);
504		paddr = pmap_kextract(addr);
505		if (!paddr) {
506			int j;
507			for(j=0;j<i;j++)
508				vm_page_unwire(wpipe->pipe_map.ms[j]);
509			return EFAULT;
510		}
511
512		m = PHYS_TO_VM_PAGE(paddr);
513		vm_page_wire(m);
514		wpipe->pipe_map.ms[i] = m;
515	}
516
517/*
518 * set up the control block
519 */
520	wpipe->pipe_map.npages = i;
521	wpipe->pipe_map.pos = ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK;
522	wpipe->pipe_map.cnt = size;
523
524/*
525 * and map the buffer
526 */
527	if (wpipe->pipe_map.kva == 0) {
528		/*
529		 * We need to allocate space for an extra page because the
530		 * address range might (will) span pages at times.
531		 */
532		wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map,
533			wpipe->pipe_buffer.size + PAGE_SIZE);
534		amountpipekva += wpipe->pipe_buffer.size + PAGE_SIZE;
535	}
536	pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms,
537		wpipe->pipe_map.npages);
538
539/*
540 * and update the uio data
541 */
542
543	uio->uio_iov->iov_len -= size;
544	uio->uio_iov->iov_base += size;
545	if (uio->uio_iov->iov_len == 0)
546		uio->uio_iov++;
547	uio->uio_resid -= size;
548	uio->uio_offset += size;
549	return 0;
550}
551
552/*
553 * unmap and unwire the process buffer
554 */
555static void
556pipe_destroy_write_buffer(wpipe)
557struct pipe *wpipe;
558{
559	int i;
560	if (wpipe->pipe_map.kva) {
561		pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages);
562
563		if (amountpipekva > MAXPIPEKVA) {
564			vm_offset_t kva = wpipe->pipe_map.kva;
565			wpipe->pipe_map.kva = 0;
566			kmem_free(kernel_map, kva,
567				wpipe->pipe_buffer.size + PAGE_SIZE);
568			amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE;
569		}
570	}
571	for (i=0;i<wpipe->pipe_map.npages;i++)
572		vm_page_unwire(wpipe->pipe_map.ms[i]);
573}
574
575/*
576 * In the case of a signal, the writing process might go away.  This
577 * code copies the data into the circular buffer so that the source
578 * pages can be freed without loss of data.
579 */
580static void
581pipe_clone_write_buffer(wpipe)
582struct pipe *wpipe;
583{
584	int size;
585	int pos;
586
587	size = wpipe->pipe_map.cnt;
588	pos = wpipe->pipe_map.pos;
589	bcopy((caddr_t) wpipe->pipe_map.kva+pos,
590			(caddr_t) wpipe->pipe_buffer.buffer,
591			size);
592
593	wpipe->pipe_buffer.in = size;
594	wpipe->pipe_buffer.out = 0;
595	wpipe->pipe_buffer.cnt = size;
596	wpipe->pipe_state &= ~PIPE_DIRECTW;
597
598	pipe_destroy_write_buffer(wpipe);
599}
600
601/*
602 * This implements the pipe buffer write mechanism.  Note that only
603 * a direct write OR a normal pipe write can be pending at any given time.
604 * If there are any characters in the pipe buffer, the direct write will
605 * be deferred until the receiving process grabs all of the bytes from
606 * the pipe buffer.  Then the direct mapping write is set-up.
607 */
608static int
609pipe_direct_write(wpipe, uio)
610	struct pipe *wpipe;
611	struct uio *uio;
612{
613	int error;
614retry:
615	while (wpipe->pipe_state & PIPE_DIRECTW) {
616		if ( wpipe->pipe_state & PIPE_WANTR) {
617			wpipe->pipe_state &= ~PIPE_WANTR;
618			wakeup(wpipe);
619		}
620		wpipe->pipe_state |= PIPE_WANTW;
621		error = tsleep(wpipe,
622				PRIBIO|PCATCH, "pipdww", 0);
623		if (error)
624			goto error1;
625		if (wpipe->pipe_state & PIPE_EOF) {
626			error = EPIPE;
627			goto error1;
628		}
629	}
630	wpipe->pipe_map.cnt = 0;	/* transfer not ready yet */
631	if (wpipe->pipe_buffer.cnt > 0) {
632		if ( wpipe->pipe_state & PIPE_WANTR) {
633			wpipe->pipe_state &= ~PIPE_WANTR;
634			wakeup(wpipe);
635		}
636
637		wpipe->pipe_state |= PIPE_WANTW;
638		error = tsleep(wpipe,
639				PRIBIO|PCATCH, "pipdwc", 0);
640		if (error)
641			goto error1;
642		if (wpipe->pipe_state & PIPE_EOF) {
643			error = EPIPE;
644			goto error1;
645		}
646		goto retry;
647	}
648
649	wpipe->pipe_state |= PIPE_DIRECTW;
650
651	error = pipe_build_write_buffer(wpipe, uio);
652	if (error) {
653		wpipe->pipe_state &= ~PIPE_DIRECTW;
654		goto error1;
655	}
656
657	error = 0;
658	while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) {
659		if (wpipe->pipe_state & PIPE_EOF) {
660			pipelock(wpipe, 0);
661			pipe_destroy_write_buffer(wpipe);
662			pipeunlock(wpipe);
663			pipeselwakeup(wpipe);
664			error = EPIPE;
665			goto error1;
666		}
667		if (wpipe->pipe_state & PIPE_WANTR) {
668			wpipe->pipe_state &= ~PIPE_WANTR;
669			wakeup(wpipe);
670		}
671		pipeselwakeup(wpipe);
672		error = tsleep(wpipe, PRIBIO|PCATCH, "pipdwt", 0);
673	}
674
675	pipelock(wpipe,0);
676	if (wpipe->pipe_state & PIPE_DIRECTW) {
677		/*
678		 * this bit of trickery substitutes a kernel buffer for
679		 * the process that might be going away.
680		 */
681		pipe_clone_write_buffer(wpipe);
682	} else {
683		pipe_destroy_write_buffer(wpipe);
684	}
685	pipeunlock(wpipe);
686	return error;
687
688error1:
689	wakeup(wpipe);
690	return error;
691}
692#endif
693
694static int
695pipe_write(fp, uio, cred)
696	struct file *fp;
697	struct uio *uio;
698	struct ucred *cred;
699{
700	int error = 0;
701	int orig_resid;
702
703	struct pipe *wpipe, *rpipe;
704
705	rpipe = (struct pipe *) fp->f_data;
706	wpipe = rpipe->pipe_peer;
707
708	/*
709	 * detect loss of pipe read side, issue SIGPIPE if lost.
710	 */
711	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
712		return EPIPE;
713	}
714
715	/*
716	 * If it is advantageous to resize the pipe buffer, do
717	 * so.
718	 */
719	if ((uio->uio_resid > PIPE_SIZE) &&
720		(nbigpipe < LIMITBIGPIPES) &&
721		(wpipe->pipe_state & PIPE_DIRECTW) == 0 &&
722		(wpipe->pipe_buffer.size <= PIPE_SIZE) &&
723		(wpipe->pipe_buffer.cnt == 0)) {
724
725		if (wpipe->pipe_buffer.buffer) {
726			amountpipekva -= wpipe->pipe_buffer.size;
727			kmem_free(kernel_map,
728				(vm_offset_t)wpipe->pipe_buffer.buffer,
729				wpipe->pipe_buffer.size);
730		}
731
732#ifndef PIPE_NODIRECT
733		if (wpipe->pipe_map.kva) {
734			amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE;
735			kmem_free(kernel_map,
736				wpipe->pipe_map.kva,
737				wpipe->pipe_buffer.size + PAGE_SIZE);
738		}
739#endif
740
741		wpipe->pipe_buffer.in = 0;
742		wpipe->pipe_buffer.out = 0;
743		wpipe->pipe_buffer.cnt = 0;
744		wpipe->pipe_buffer.size = BIG_PIPE_SIZE;
745		wpipe->pipe_buffer.buffer = NULL;
746		++nbigpipe;
747
748#ifndef PIPE_NODIRECT
749		wpipe->pipe_map.cnt = 0;
750		wpipe->pipe_map.kva = 0;
751		wpipe->pipe_map.pos = 0;
752		wpipe->pipe_map.npages = 0;
753#endif
754
755	}
756
757
758	if( wpipe->pipe_buffer.buffer == NULL) {
759		if ((error = pipelock(wpipe,1)) == 0) {
760			pipespace(wpipe);
761			pipeunlock(wpipe);
762		} else {
763			return error;
764		}
765	}
766
767	++wpipe->pipe_busy;
768	orig_resid = uio->uio_resid;
769	while (uio->uio_resid) {
770		int space;
771#ifndef PIPE_NODIRECT
772		/*
773		 * If the transfer is large, we can gain performance if
774		 * we do process-to-process copies directly.
775		 * If the write is non-blocking, we don't use the
776		 * direct write mechanism.
777		 */
778		if ((uio->uio_iov->iov_len >= PIPE_MINDIRECT) &&
779		    (fp->f_flag & FNONBLOCK) == 0 &&
780			(wpipe->pipe_map.kva || (amountpipekva < LIMITPIPEKVA)) &&
781			(uio->uio_iov->iov_len >= PIPE_MINDIRECT)) {
782			error = pipe_direct_write( wpipe, uio);
783			if (error) {
784				break;
785			}
786			continue;
787		}
788#endif
789
790		/*
791		 * Pipe buffered writes cannot be coincidental with
792		 * direct writes.  We wait until the currently executing
793		 * direct write is completed before we start filling the
794		 * pipe buffer.
795		 */
796	retrywrite:
797		while (wpipe->pipe_state & PIPE_DIRECTW) {
798			if (wpipe->pipe_state & PIPE_WANTR) {
799				wpipe->pipe_state &= ~PIPE_WANTR;
800				wakeup(wpipe);
801			}
802			error = tsleep(wpipe,
803					PRIBIO|PCATCH, "pipbww", 0);
804			if (error)
805				break;
806		}
807
808		space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
809
810		/* Writes of size <= PIPE_BUF must be atomic. */
811		/* XXX perhaps they need to be contiguous to be atomic? */
812		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
813			space = 0;
814
815		if (space > 0 && (wpipe->pipe_buffer.cnt < PIPE_SIZE)) {
816			/*
817			 * This set the maximum transfer as a segment of
818			 * the buffer.
819			 */
820			int size = wpipe->pipe_buffer.size - wpipe->pipe_buffer.in;
821			/*
822			 * space is the size left in the buffer
823			 */
824			if (size > space)
825				size = space;
826			/*
827			 * now limit it to the size of the uio transfer
828			 */
829			if (size > uio->uio_resid)
830				size = uio->uio_resid;
831			if ((error = pipelock(wpipe,1)) == 0) {
832				/*
833				 * It is possible for a direct write to
834				 * slip in on us... handle it here...
835				 */
836				if (wpipe->pipe_state & PIPE_DIRECTW) {
837					pipeunlock(wpipe);
838					goto retrywrite;
839				}
840				error = uiomove( &wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
841					size, uio);
842				pipeunlock(wpipe);
843			}
844			if (error)
845				break;
846
847			wpipe->pipe_buffer.in += size;
848			if (wpipe->pipe_buffer.in >= wpipe->pipe_buffer.size)
849				wpipe->pipe_buffer.in = 0;
850
851			wpipe->pipe_buffer.cnt += size;
852		} else {
853			/*
854			 * If the "read-side" has been blocked, wake it up now.
855			 */
856			if (wpipe->pipe_state & PIPE_WANTR) {
857				wpipe->pipe_state &= ~PIPE_WANTR;
858				wakeup(wpipe);
859			}
860
861			/*
862			 * don't block on non-blocking I/O
863			 */
864			if (fp->f_flag & FNONBLOCK) {
865				error = EAGAIN;
866				break;
867			}
868
869			/*
870			 * We have no more space and have something to offer,
871			 * wake up selects.
872			 */
873			pipeselwakeup(wpipe);
874
875			wpipe->pipe_state |= PIPE_WANTW;
876			if (error = tsleep(wpipe, (PRIBIO+1)|PCATCH, "pipewr", 0)) {
877				break;
878			}
879			/*
880			 * If read side wants to go away, we just issue a signal
881			 * to ourselves.
882			 */
883			if (wpipe->pipe_state & PIPE_EOF) {
884				error = EPIPE;
885				break;
886			}
887		}
888	}
889
890	--wpipe->pipe_busy;
891	if ((wpipe->pipe_busy == 0) &&
892		(wpipe->pipe_state & PIPE_WANT)) {
893		wpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTR);
894		wakeup(wpipe);
895	} else if (wpipe->pipe_buffer.cnt > 0) {
896		/*
897		 * If we have put any characters in the buffer, we wake up
898		 * the reader.
899		 */
900		if (wpipe->pipe_state & PIPE_WANTR) {
901			wpipe->pipe_state &= ~PIPE_WANTR;
902			wakeup(wpipe);
903		}
904	}
905
906	/*
907	 * Don't return EPIPE if I/O was successful
908	 */
909	if ((wpipe->pipe_buffer.cnt == 0) &&
910		(uio->uio_resid == 0) &&
911		(error == EPIPE))
912		error = 0;
913
914	if (error == 0)
915		gettime(&wpipe->pipe_mtime);
916
917	/*
918	 * We have something to offer,
919	 * wake up select.
920	 */
921	if (wpipe->pipe_buffer.cnt)
922		pipeselwakeup(wpipe);
923
924	return error;
925}
926
927/*
928 * we implement a very minimal set of ioctls for compatibility with sockets.
929 */
930int
931pipe_ioctl(fp, cmd, data, p)
932	struct file *fp;
933	int cmd;
934	register caddr_t data;
935	struct proc *p;
936{
937	register struct pipe *mpipe = (struct pipe *)fp->f_data;
938
939	switch (cmd) {
940
941	case FIONBIO:
942		return (0);
943
944	case FIOASYNC:
945		if (*(int *)data) {
946			mpipe->pipe_state |= PIPE_ASYNC;
947		} else {
948			mpipe->pipe_state &= ~PIPE_ASYNC;
949		}
950		return (0);
951
952	case FIONREAD:
953		if (mpipe->pipe_state & PIPE_DIRECTW)
954			*(int *)data = mpipe->pipe_map.cnt;
955		else
956			*(int *)data = mpipe->pipe_buffer.cnt;
957		return (0);
958
959	case TIOCSPGRP:
960		mpipe->pipe_pgid = *(int *)data;
961		return (0);
962
963	case TIOCGPGRP:
964		*(int *)data = mpipe->pipe_pgid;
965		return (0);
966
967	}
968	return (ENOTTY);
969}
970
971int
972pipe_select(fp, which, p)
973	struct file *fp;
974	int which;
975	struct proc *p;
976{
977	register struct pipe *rpipe = (struct pipe *)fp->f_data;
978	struct pipe *wpipe;
979
980	wpipe = rpipe->pipe_peer;
981	switch (which) {
982
983	case FREAD:
984		if ( (rpipe->pipe_state & PIPE_DIRECTW) ||
985			(rpipe->pipe_buffer.cnt > 0) ||
986			(rpipe->pipe_state & PIPE_EOF)) {
987			return (1);
988		}
989		selrecord(p, &rpipe->pipe_sel);
990		rpipe->pipe_state |= PIPE_SEL;
991		break;
992
993	case FWRITE:
994		if ((wpipe == NULL) ||
995			(wpipe->pipe_state & PIPE_EOF) ||
996			(((wpipe->pipe_state & PIPE_DIRECTW) == 0) &&
997			 (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)) {
998			return (1);
999		}
1000		selrecord(p, &wpipe->pipe_sel);
1001		wpipe->pipe_state |= PIPE_SEL;
1002		break;
1003
1004	case 0:
1005		if ((rpipe->pipe_state & PIPE_EOF) ||
1006			(wpipe == NULL) ||
1007			(wpipe->pipe_state & PIPE_EOF)) {
1008			return (1);
1009		}
1010
1011		selrecord(p, &rpipe->pipe_sel);
1012		rpipe->pipe_state |= PIPE_SEL;
1013		break;
1014	}
1015	return (0);
1016}
1017
1018int
1019pipe_stat(pipe, ub)
1020	register struct pipe *pipe;
1021	register struct stat *ub;
1022{
1023	bzero((caddr_t)ub, sizeof (*ub));
1024	ub->st_mode = S_IFIFO;
1025	ub->st_blksize = pipe->pipe_buffer.size;
1026	ub->st_size = pipe->pipe_buffer.cnt;
1027	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1028	TIMEVAL_TO_TIMESPEC(&pipe->pipe_atime, &ub->st_atimespec);
1029	TIMEVAL_TO_TIMESPEC(&pipe->pipe_mtime, &ub->st_mtimespec);
1030	TIMEVAL_TO_TIMESPEC(&pipe->pipe_ctime, &ub->st_ctimespec);
1031	/*
1032	 * Left as 0: st_dev, st_ino, st_nlink, st_uid, st_gid, st_rdev,
1033	 * st_flags, st_gen.
1034	 * XXX (st_dev, st_ino) should be unique.
1035	 */
1036	return 0;
1037}
1038
1039/* ARGSUSED */
1040static int
1041pipe_close(fp, p)
1042	struct file *fp;
1043	struct proc *p;
1044{
1045	struct pipe *cpipe = (struct pipe *)fp->f_data;
1046
1047	pipeclose(cpipe);
1048	fp->f_data = NULL;
1049	return 0;
1050}
1051
1052/*
1053 * shutdown the pipe
1054 */
1055static void
1056pipeclose(cpipe)
1057	struct pipe *cpipe;
1058{
1059	struct pipe *ppipe;
1060	if (cpipe) {
1061
1062		pipeselwakeup(cpipe);
1063
1064		/*
1065		 * If the other side is blocked, wake it up saying that
1066		 * we want to close it down.
1067		 */
1068		while (cpipe->pipe_busy) {
1069			wakeup(cpipe);
1070			cpipe->pipe_state |= PIPE_WANT|PIPE_EOF;
1071			tsleep(cpipe, PRIBIO, "pipecl", 0);
1072		}
1073
1074		/*
1075		 * Disconnect from peer
1076		 */
1077		if (ppipe = cpipe->pipe_peer) {
1078			pipeselwakeup(ppipe);
1079
1080			ppipe->pipe_state |= PIPE_EOF;
1081			wakeup(ppipe);
1082			ppipe->pipe_peer = NULL;
1083		}
1084
1085		/*
1086		 * free resources
1087		 */
1088		if (cpipe->pipe_buffer.buffer) {
1089			if (cpipe->pipe_buffer.size > PIPE_SIZE)
1090				--nbigpipe;
1091			amountpipekva -= cpipe->pipe_buffer.size;
1092			kmem_free(kernel_map,
1093				(vm_offset_t)cpipe->pipe_buffer.buffer,
1094				cpipe->pipe_buffer.size);
1095		}
1096#ifndef PIPE_NODIRECT
1097		if (cpipe->pipe_map.kva) {
1098			amountpipekva -= cpipe->pipe_buffer.size + PAGE_SIZE;
1099			kmem_free(kernel_map,
1100				cpipe->pipe_map.kva,
1101				cpipe->pipe_buffer.size + PAGE_SIZE);
1102		}
1103#endif
1104		free(cpipe, M_TEMP);
1105	}
1106}
1107#endif
1108