sys_pipe.c revision 60938
1/*
2 * Copyright (c) 1996 John S. Dyson
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice immediately at the beginning of the file, without modification,
10 *    this list of conditions, and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 * 3. Absolutely no warranty of function or purpose is made by the author
15 *    John S. Dyson.
16 * 4. Modifications may be freely made to this file if the above conditions
17 *    are met.
18 *
19 * $FreeBSD: head/sys/kern/sys_pipe.c 60938 2000-05-26 02:09:24Z jake $
20 */
21
22/*
23 * This file contains a high-performance replacement for the socket-based
24 * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
25 * all features of sockets, but does do everything that pipes normally
26 * do.
27 */
28
29/*
30 * This code has two modes of operation, a small write mode and a large
31 * write mode.  The small write mode acts like conventional pipes with
32 * a kernel buffer.  If the buffer is less than PIPE_MINDIRECT, then the
33 * "normal" pipe buffering is done.  If the buffer is between PIPE_MINDIRECT
34 * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and
35 * the receiving process can copy it directly from the pages in the sending
36 * process.
37 *
38 * If the sending process receives a signal, it is possible that it will
39 * go away, and certainly its address space can change, because control
40 * is returned back to the user-mode side.  In that case, the pipe code
41 * arranges to copy the buffer supplied by the user process, to a pageable
42 * kernel buffer, and the receiving process will grab the data from the
43 * pageable kernel buffer.  Since signals don't happen all that often,
44 * the copy operation is normally eliminated.
45 *
46 * The constant PIPE_MINDIRECT is chosen to make sure that buffering will
47 * happen for small transfers so that the system will not spend all of
48 * its time context switching.  PIPE_SIZE is constrained by the
49 * amount of kernel virtual memory.
50 */
51
52#include <sys/param.h>
53#include <sys/systm.h>
54#include <sys/proc.h>
55#include <sys/fcntl.h>
56#include <sys/file.h>
57#include <sys/filedesc.h>
58#include <sys/filio.h>
59#include <sys/ttycom.h>
60#include <sys/stat.h>
61#include <sys/poll.h>
62#include <sys/select.h>
63#include <sys/signalvar.h>
64#include <sys/sysproto.h>
65#include <sys/pipe.h>
66#include <sys/vnode.h>
67#include <sys/uio.h>
68#include <sys/event.h>
69
70#include <vm/vm.h>
71#include <vm/vm_param.h>
72#include <sys/lock.h>
73#include <vm/vm_object.h>
74#include <vm/vm_kern.h>
75#include <vm/vm_extern.h>
76#include <vm/pmap.h>
77#include <vm/vm_map.h>
78#include <vm/vm_page.h>
79#include <vm/vm_zone.h>
80
81/*
82 * Use this define if you want to disable *fancy* VM things.  Expect an
83 * approx 30% decrease in transfer rate.  This could be useful for
84 * NetBSD or OpenBSD.
85 */
86/* #define PIPE_NODIRECT */
87
88/*
89 * interfaces to the outside world
90 */
91static int pipe_read __P((struct file *fp, struct uio *uio,
92		struct ucred *cred, int flags, struct proc *p));
93static int pipe_write __P((struct file *fp, struct uio *uio,
94		struct ucred *cred, int flags, struct proc *p));
95static int pipe_close __P((struct file *fp, struct proc *p));
96static int pipe_poll __P((struct file *fp, int events, struct ucred *cred,
97		struct proc *p));
98static int pipe_stat __P((struct file *fp, struct stat *sb, struct proc *p));
99static int pipe_ioctl __P((struct file *fp, u_long cmd, caddr_t data, struct proc *p));
100
101static struct fileops pipeops =
102    { pipe_read, pipe_write, pipe_ioctl, pipe_poll, pipe_stat, pipe_close };
103
104static int	filt_pipeattach(struct knote *kn);
105static void	filt_pipedetach(struct knote *kn);
106static int	filt_piperead(struct knote *kn, long hint);
107static int	filt_pipewrite(struct knote *kn, long hint);
108
109struct filterops pipe_rwfiltops[] = {
110	{ 1, filt_pipeattach, filt_pipedetach, filt_piperead },
111	{ 1, filt_pipeattach, filt_pipedetach, filt_pipewrite },
112};
113
114/*
115 * Default pipe buffer size(s), this can be kind-of large now because pipe
116 * space is pageable.  The pipe code will try to maintain locality of
117 * reference for performance reasons, so small amounts of outstanding I/O
118 * will not wipe the cache.
119 */
120#define MINPIPESIZE (PIPE_SIZE/3)
121#define MAXPIPESIZE (2*PIPE_SIZE/3)
122
123/*
124 * Maximum amount of kva for pipes -- this is kind-of a soft limit, but
125 * is there so that on large systems, we don't exhaust it.
126 */
127#define MAXPIPEKVA (8*1024*1024)
128
129/*
130 * Limit for direct transfers, we cannot, of course limit
131 * the amount of kva for pipes in general though.
132 */
133#define LIMITPIPEKVA (16*1024*1024)
134
135/*
136 * Limit the number of "big" pipes
137 */
138#define LIMITBIGPIPES	32
139static int nbigpipe;
140
141static int amountpipekva;
142
143static void pipeclose __P((struct pipe *cpipe));
144static void pipeinit __P((struct pipe *cpipe));
145static __inline int pipelock __P((struct pipe *cpipe, int catch));
146static __inline void pipeunlock __P((struct pipe *cpipe));
147static __inline void pipeselwakeup __P((struct pipe *cpipe));
148#ifndef PIPE_NODIRECT
149static int pipe_build_write_buffer __P((struct pipe *wpipe, struct uio *uio));
150static void pipe_destroy_write_buffer __P((struct pipe *wpipe));
151static int pipe_direct_write __P((struct pipe *wpipe, struct uio *uio));
152static void pipe_clone_write_buffer __P((struct pipe *wpipe));
153#endif
154static void pipespace __P((struct pipe *cpipe));
155
156static vm_zone_t pipe_zone;
157
158/*
159 * The pipe system call for the DTYPE_PIPE type of pipes
160 */
161
162/* ARGSUSED */
163int
164pipe(p, uap)
165	struct proc *p;
166	struct pipe_args /* {
167		int	dummy;
168	} */ *uap;
169{
170	register struct filedesc *fdp = p->p_fd;
171	struct file *rf, *wf;
172	struct pipe *rpipe, *wpipe;
173	int fd, error;
174
175	if (pipe_zone == NULL)
176		pipe_zone = zinit("PIPE", sizeof (struct pipe), 0, 0, 4);
177
178	rpipe = zalloc( pipe_zone);
179	pipeinit(rpipe);
180	rpipe->pipe_state |= PIPE_DIRECTOK;
181	wpipe = zalloc( pipe_zone);
182	pipeinit(wpipe);
183	wpipe->pipe_state |= PIPE_DIRECTOK;
184
185	error = falloc(p, &rf, &fd);
186	if (error)
187		goto free2;
188	p->p_retval[0] = fd;
189	rf->f_flag = FREAD | FWRITE;
190	rf->f_type = DTYPE_PIPE;
191	rf->f_data = (caddr_t)rpipe;
192	rf->f_ops = &pipeops;
193	error = falloc(p, &wf, &fd);
194	if (error)
195		goto free3;
196	wf->f_flag = FREAD | FWRITE;
197	wf->f_type = DTYPE_PIPE;
198	wf->f_data = (caddr_t)wpipe;
199	wf->f_ops = &pipeops;
200	p->p_retval[1] = fd;
201
202	rpipe->pipe_peer = wpipe;
203	wpipe->pipe_peer = rpipe;
204
205	return (0);
206free3:
207	fdp->fd_ofiles[p->p_retval[0]] = 0;
208	ffree(rf);
209free2:
210	(void)pipeclose(wpipe);
211	(void)pipeclose(rpipe);
212	return (error);
213}
214
215/*
216 * Allocate kva for pipe circular buffer, the space is pageable
217 */
218static void
219pipespace(cpipe)
220	struct pipe *cpipe;
221{
222	int npages, error;
223
224	npages = round_page(cpipe->pipe_buffer.size)/PAGE_SIZE;
225	/*
226	 * Create an object, I don't like the idea of paging to/from
227	 * kernel_object.
228	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
229	 */
230	cpipe->pipe_buffer.object = vm_object_allocate(OBJT_DEFAULT, npages);
231	cpipe->pipe_buffer.buffer = (caddr_t) vm_map_min(kernel_map);
232
233	/*
234	 * Insert the object into the kernel map, and allocate kva for it.
235	 * The map entry is, by default, pageable.
236	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
237	 */
238	error = vm_map_find(kernel_map, cpipe->pipe_buffer.object, 0,
239		(vm_offset_t *) &cpipe->pipe_buffer.buffer,
240		cpipe->pipe_buffer.size, 1,
241		VM_PROT_ALL, VM_PROT_ALL, 0);
242
243	if (error != KERN_SUCCESS)
244		panic("pipeinit: cannot allocate pipe -- out of kvm -- code = %d", error);
245	amountpipekva += cpipe->pipe_buffer.size;
246}
247
248/*
249 * initialize and allocate VM and memory for pipe
250 */
251static void
252pipeinit(cpipe)
253	struct pipe *cpipe;
254{
255
256	cpipe->pipe_buffer.in = 0;
257	cpipe->pipe_buffer.out = 0;
258	cpipe->pipe_buffer.cnt = 0;
259	cpipe->pipe_buffer.size = PIPE_SIZE;
260
261	/* Buffer kva gets dynamically allocated */
262	cpipe->pipe_buffer.buffer = NULL;
263	/* cpipe->pipe_buffer.object = invalid */
264
265	cpipe->pipe_state = 0;
266	cpipe->pipe_peer = NULL;
267	cpipe->pipe_busy = 0;
268	vfs_timestamp(&cpipe->pipe_ctime);
269	cpipe->pipe_atime = cpipe->pipe_ctime;
270	cpipe->pipe_mtime = cpipe->pipe_ctime;
271	bzero(&cpipe->pipe_sel, sizeof cpipe->pipe_sel);
272
273#ifndef PIPE_NODIRECT
274	/*
275	 * pipe data structure initializations to support direct pipe I/O
276	 */
277	cpipe->pipe_map.cnt = 0;
278	cpipe->pipe_map.kva = 0;
279	cpipe->pipe_map.pos = 0;
280	cpipe->pipe_map.npages = 0;
281	/* cpipe->pipe_map.ms[] = invalid */
282#endif
283}
284
285
286/*
287 * lock a pipe for I/O, blocking other access
288 */
289static __inline int
290pipelock(cpipe, catch)
291	struct pipe *cpipe;
292	int catch;
293{
294	int error;
295	while (cpipe->pipe_state & PIPE_LOCK) {
296		cpipe->pipe_state |= PIPE_LWANT;
297		if ((error = tsleep( cpipe,
298			catch?(PRIBIO|PCATCH):PRIBIO, "pipelk", 0)) != 0) {
299			return error;
300		}
301	}
302	cpipe->pipe_state |= PIPE_LOCK;
303	return 0;
304}
305
306/*
307 * unlock a pipe I/O lock
308 */
309static __inline void
310pipeunlock(cpipe)
311	struct pipe *cpipe;
312{
313	cpipe->pipe_state &= ~PIPE_LOCK;
314	if (cpipe->pipe_state & PIPE_LWANT) {
315		cpipe->pipe_state &= ~PIPE_LWANT;
316		wakeup(cpipe);
317	}
318}
319
320static __inline void
321pipeselwakeup(cpipe)
322	struct pipe *cpipe;
323{
324	if (cpipe->pipe_state & PIPE_SEL) {
325		cpipe->pipe_state &= ~PIPE_SEL;
326		selwakeup(&cpipe->pipe_sel);
327	}
328	if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio)
329		pgsigio(cpipe->pipe_sigio, SIGIO, 0);
330	KNOTE(&cpipe->pipe_sel.si_note, 0);
331}
332
333/* ARGSUSED */
334static int
335pipe_read(fp, uio, cred, flags, p)
336	struct file *fp;
337	struct uio *uio;
338	struct ucred *cred;
339	struct proc *p;
340	int flags;
341{
342
343	struct pipe *rpipe = (struct pipe *) fp->f_data;
344	int error;
345	int nread = 0;
346	u_int size;
347
348	++rpipe->pipe_busy;
349	error = pipelock(rpipe, 1);
350	if (error)
351		goto unlocked_error;
352
353	while (uio->uio_resid) {
354		/*
355		 * normal pipe buffer receive
356		 */
357		if (rpipe->pipe_buffer.cnt > 0) {
358			size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
359			if (size > rpipe->pipe_buffer.cnt)
360				size = rpipe->pipe_buffer.cnt;
361			if (size > (u_int) uio->uio_resid)
362				size = (u_int) uio->uio_resid;
363
364			error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
365					size, uio);
366			if (error) {
367				break;
368			}
369			rpipe->pipe_buffer.out += size;
370			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
371				rpipe->pipe_buffer.out = 0;
372
373			rpipe->pipe_buffer.cnt -= size;
374
375			/*
376			 * If there is no more to read in the pipe, reset
377			 * its pointers to the beginning.  This improves
378			 * cache hit stats.
379			 */
380			if (rpipe->pipe_buffer.cnt == 0) {
381				rpipe->pipe_buffer.in = 0;
382				rpipe->pipe_buffer.out = 0;
383			}
384			nread += size;
385#ifndef PIPE_NODIRECT
386		/*
387		 * Direct copy, bypassing a kernel buffer.
388		 */
389		} else if ((size = rpipe->pipe_map.cnt) &&
390			   (rpipe->pipe_state & PIPE_DIRECTW)) {
391			caddr_t	va;
392			if (size > (u_int) uio->uio_resid)
393				size = (u_int) uio->uio_resid;
394
395			va = (caddr_t) rpipe->pipe_map.kva + rpipe->pipe_map.pos;
396			error = uiomove(va, size, uio);
397			if (error)
398				break;
399			nread += size;
400			rpipe->pipe_map.pos += size;
401			rpipe->pipe_map.cnt -= size;
402			if (rpipe->pipe_map.cnt == 0) {
403				rpipe->pipe_state &= ~PIPE_DIRECTW;
404				wakeup(rpipe);
405			}
406#endif
407		} else {
408			/*
409			 * detect EOF condition
410			 */
411			if (rpipe->pipe_state & PIPE_EOF) {
412				/* XXX error = ? */
413				break;
414			}
415
416			/*
417			 * If the "write-side" has been blocked, wake it up now.
418			 */
419			if (rpipe->pipe_state & PIPE_WANTW) {
420				rpipe->pipe_state &= ~PIPE_WANTW;
421				wakeup(rpipe);
422			}
423
424			/*
425			 * Break if some data was read.
426			 */
427			if (nread > 0)
428				break;
429
430			/*
431			 * Unlock the pipe buffer for our remaining processing.  We
432			 * will either break out with an error or we will sleep and
433			 * relock to loop.
434			 */
435			pipeunlock(rpipe);
436
437			/*
438			 * Handle non-blocking mode operation or
439			 * wait for more data.
440			 */
441			if (fp->f_flag & FNONBLOCK)
442				error = EAGAIN;
443			else {
444				rpipe->pipe_state |= PIPE_WANTR;
445				if ((error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) == 0)
446					error = pipelock(rpipe, 1);
447			}
448			if (error)
449				goto unlocked_error;
450		}
451	}
452	pipeunlock(rpipe);
453
454	if (error == 0)
455		vfs_timestamp(&rpipe->pipe_atime);
456unlocked_error:
457	--rpipe->pipe_busy;
458
459	/*
460	 * PIPE_WANT processing only makes sense if pipe_busy is 0.
461	 */
462	if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
463		rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
464		wakeup(rpipe);
465	} else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
466		/*
467		 * Handle write blocking hysteresis.
468		 */
469		if (rpipe->pipe_state & PIPE_WANTW) {
470			rpipe->pipe_state &= ~PIPE_WANTW;
471			wakeup(rpipe);
472		}
473	}
474
475	if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
476		pipeselwakeup(rpipe);
477
478	return error;
479}
480
481#ifndef PIPE_NODIRECT
482/*
483 * Map the sending processes' buffer into kernel space and wire it.
484 * This is similar to a physical write operation.
485 */
486static int
487pipe_build_write_buffer(wpipe, uio)
488	struct pipe *wpipe;
489	struct uio *uio;
490{
491	u_int size;
492	int i;
493	vm_offset_t addr, endaddr, paddr;
494
495	size = (u_int) uio->uio_iov->iov_len;
496	if (size > wpipe->pipe_buffer.size)
497		size = wpipe->pipe_buffer.size;
498
499	endaddr = round_page((vm_offset_t)uio->uio_iov->iov_base + size);
500	for(i = 0, addr = trunc_page((vm_offset_t)uio->uio_iov->iov_base);
501		addr < endaddr;
502		addr += PAGE_SIZE, i+=1) {
503
504		vm_page_t m;
505
506		if (vm_fault_quick((caddr_t)addr, VM_PROT_READ) < 0 ||
507		    (paddr = pmap_kextract(addr)) == 0) {
508			int j;
509			for(j=0;j<i;j++)
510				vm_page_unwire(wpipe->pipe_map.ms[j], 1);
511			return EFAULT;
512		}
513
514		m = PHYS_TO_VM_PAGE(paddr);
515		vm_page_wire(m);
516		wpipe->pipe_map.ms[i] = m;
517	}
518
519/*
520 * set up the control block
521 */
522	wpipe->pipe_map.npages = i;
523	wpipe->pipe_map.pos = ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK;
524	wpipe->pipe_map.cnt = size;
525
526/*
527 * and map the buffer
528 */
529	if (wpipe->pipe_map.kva == 0) {
530		/*
531		 * We need to allocate space for an extra page because the
532		 * address range might (will) span pages at times.
533		 */
534		wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map,
535			wpipe->pipe_buffer.size + PAGE_SIZE);
536		amountpipekva += wpipe->pipe_buffer.size + PAGE_SIZE;
537	}
538	pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms,
539		wpipe->pipe_map.npages);
540
541/*
542 * and update the uio data
543 */
544
545	uio->uio_iov->iov_len -= size;
546	uio->uio_iov->iov_base += size;
547	if (uio->uio_iov->iov_len == 0)
548		uio->uio_iov++;
549	uio->uio_resid -= size;
550	uio->uio_offset += size;
551	return 0;
552}
553
554/*
555 * unmap and unwire the process buffer
556 */
557static void
558pipe_destroy_write_buffer(wpipe)
559struct pipe *wpipe;
560{
561	int i;
562	if (wpipe->pipe_map.kva) {
563		pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages);
564
565		if (amountpipekva > MAXPIPEKVA) {
566			vm_offset_t kva = wpipe->pipe_map.kva;
567			wpipe->pipe_map.kva = 0;
568			kmem_free(kernel_map, kva,
569				wpipe->pipe_buffer.size + PAGE_SIZE);
570			amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE;
571		}
572	}
573	for (i=0;i<wpipe->pipe_map.npages;i++)
574		vm_page_unwire(wpipe->pipe_map.ms[i], 1);
575}
576
577/*
578 * In the case of a signal, the writing process might go away.  This
579 * code copies the data into the circular buffer so that the source
580 * pages can be freed without loss of data.
581 */
582static void
583pipe_clone_write_buffer(wpipe)
584struct pipe *wpipe;
585{
586	int size;
587	int pos;
588
589	size = wpipe->pipe_map.cnt;
590	pos = wpipe->pipe_map.pos;
591	bcopy((caddr_t) wpipe->pipe_map.kva+pos,
592			(caddr_t) wpipe->pipe_buffer.buffer,
593			size);
594
595	wpipe->pipe_buffer.in = size;
596	wpipe->pipe_buffer.out = 0;
597	wpipe->pipe_buffer.cnt = size;
598	wpipe->pipe_state &= ~PIPE_DIRECTW;
599
600	pipe_destroy_write_buffer(wpipe);
601}
602
603/*
604 * This implements the pipe buffer write mechanism.  Note that only
605 * a direct write OR a normal pipe write can be pending at any given time.
606 * If there are any characters in the pipe buffer, the direct write will
607 * be deferred until the receiving process grabs all of the bytes from
608 * the pipe buffer.  Then the direct mapping write is set-up.
609 */
610static int
611pipe_direct_write(wpipe, uio)
612	struct pipe *wpipe;
613	struct uio *uio;
614{
615	int error;
616retry:
617	while (wpipe->pipe_state & PIPE_DIRECTW) {
618		if ( wpipe->pipe_state & PIPE_WANTR) {
619			wpipe->pipe_state &= ~PIPE_WANTR;
620			wakeup(wpipe);
621		}
622		wpipe->pipe_state |= PIPE_WANTW;
623		error = tsleep(wpipe,
624				PRIBIO|PCATCH, "pipdww", 0);
625		if (error)
626			goto error1;
627		if (wpipe->pipe_state & PIPE_EOF) {
628			error = EPIPE;
629			goto error1;
630		}
631	}
632	wpipe->pipe_map.cnt = 0;	/* transfer not ready yet */
633	if (wpipe->pipe_buffer.cnt > 0) {
634		if ( wpipe->pipe_state & PIPE_WANTR) {
635			wpipe->pipe_state &= ~PIPE_WANTR;
636			wakeup(wpipe);
637		}
638
639		wpipe->pipe_state |= PIPE_WANTW;
640		error = tsleep(wpipe,
641				PRIBIO|PCATCH, "pipdwc", 0);
642		if (error)
643			goto error1;
644		if (wpipe->pipe_state & PIPE_EOF) {
645			error = EPIPE;
646			goto error1;
647		}
648		goto retry;
649	}
650
651	wpipe->pipe_state |= PIPE_DIRECTW;
652
653	error = pipe_build_write_buffer(wpipe, uio);
654	if (error) {
655		wpipe->pipe_state &= ~PIPE_DIRECTW;
656		goto error1;
657	}
658
659	error = 0;
660	while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) {
661		if (wpipe->pipe_state & PIPE_EOF) {
662			pipelock(wpipe, 0);
663			pipe_destroy_write_buffer(wpipe);
664			pipeunlock(wpipe);
665			pipeselwakeup(wpipe);
666			error = EPIPE;
667			goto error1;
668		}
669		if (wpipe->pipe_state & PIPE_WANTR) {
670			wpipe->pipe_state &= ~PIPE_WANTR;
671			wakeup(wpipe);
672		}
673		pipeselwakeup(wpipe);
674		error = tsleep(wpipe, PRIBIO|PCATCH, "pipdwt", 0);
675	}
676
677	pipelock(wpipe,0);
678	if (wpipe->pipe_state & PIPE_DIRECTW) {
679		/*
680		 * this bit of trickery substitutes a kernel buffer for
681		 * the process that might be going away.
682		 */
683		pipe_clone_write_buffer(wpipe);
684	} else {
685		pipe_destroy_write_buffer(wpipe);
686	}
687	pipeunlock(wpipe);
688	return error;
689
690error1:
691	wakeup(wpipe);
692	return error;
693}
694#endif
695
696static int
697pipe_write(fp, uio, cred, flags, p)
698	struct file *fp;
699	struct uio *uio;
700	struct ucred *cred;
701	struct proc *p;
702	int flags;
703{
704	int error = 0;
705	int orig_resid;
706
707	struct pipe *wpipe, *rpipe;
708
709	rpipe = (struct pipe *) fp->f_data;
710	wpipe = rpipe->pipe_peer;
711
712	/*
713	 * detect loss of pipe read side, issue SIGPIPE if lost.
714	 */
715	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
716		return EPIPE;
717	}
718
719	/*
720	 * If it is advantageous to resize the pipe buffer, do
721	 * so.
722	 */
723	if ((uio->uio_resid > PIPE_SIZE) &&
724		(nbigpipe < LIMITBIGPIPES) &&
725		(wpipe->pipe_state & PIPE_DIRECTW) == 0 &&
726		(wpipe->pipe_buffer.size <= PIPE_SIZE) &&
727		(wpipe->pipe_buffer.cnt == 0)) {
728
729		if (wpipe->pipe_buffer.buffer) {
730			amountpipekva -= wpipe->pipe_buffer.size;
731			kmem_free(kernel_map,
732				(vm_offset_t)wpipe->pipe_buffer.buffer,
733				wpipe->pipe_buffer.size);
734		}
735
736#ifndef PIPE_NODIRECT
737		if (wpipe->pipe_map.kva) {
738			amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE;
739			kmem_free(kernel_map,
740				wpipe->pipe_map.kva,
741				wpipe->pipe_buffer.size + PAGE_SIZE);
742		}
743#endif
744
745		wpipe->pipe_buffer.in = 0;
746		wpipe->pipe_buffer.out = 0;
747		wpipe->pipe_buffer.cnt = 0;
748		wpipe->pipe_buffer.size = BIG_PIPE_SIZE;
749		wpipe->pipe_buffer.buffer = NULL;
750		++nbigpipe;
751
752#ifndef PIPE_NODIRECT
753		wpipe->pipe_map.cnt = 0;
754		wpipe->pipe_map.kva = 0;
755		wpipe->pipe_map.pos = 0;
756		wpipe->pipe_map.npages = 0;
757#endif
758
759	}
760
761
762	if( wpipe->pipe_buffer.buffer == NULL) {
763		if ((error = pipelock(wpipe,1)) == 0) {
764			pipespace(wpipe);
765			pipeunlock(wpipe);
766		} else {
767			return error;
768		}
769	}
770
771	++wpipe->pipe_busy;
772	orig_resid = uio->uio_resid;
773	while (uio->uio_resid) {
774		int space;
775#ifndef PIPE_NODIRECT
776		/*
777		 * If the transfer is large, we can gain performance if
778		 * we do process-to-process copies directly.
779		 * If the write is non-blocking, we don't use the
780		 * direct write mechanism.
781		 *
782		 * The direct write mechanism will detect the reader going
783		 * away on us.
784		 */
785		if ((uio->uio_iov->iov_len >= PIPE_MINDIRECT) &&
786		    (fp->f_flag & FNONBLOCK) == 0 &&
787			(wpipe->pipe_map.kva || (amountpipekva < LIMITPIPEKVA)) &&
788			(uio->uio_iov->iov_len >= PIPE_MINDIRECT)) {
789			error = pipe_direct_write( wpipe, uio);
790			if (error) {
791				break;
792			}
793			continue;
794		}
795#endif
796
797		/*
798		 * Pipe buffered writes cannot be coincidental with
799		 * direct writes.  We wait until the currently executing
800		 * direct write is completed before we start filling the
801		 * pipe buffer.  We break out if a signal occurs or the
802		 * reader goes away.
803		 */
804	retrywrite:
805		while (wpipe->pipe_state & PIPE_DIRECTW) {
806			if (wpipe->pipe_state & PIPE_WANTR) {
807				wpipe->pipe_state &= ~PIPE_WANTR;
808				wakeup(wpipe);
809			}
810			error = tsleep(wpipe, PRIBIO|PCATCH, "pipbww", 0);
811			if (wpipe->pipe_state & PIPE_EOF)
812				break;
813			if (error)
814				break;
815		}
816		if (wpipe->pipe_state & PIPE_EOF) {
817			error = EPIPE;
818			break;
819		}
820
821		space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
822
823		/* Writes of size <= PIPE_BUF must be atomic. */
824		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
825			space = 0;
826
827		if (space > 0 && (wpipe->pipe_buffer.cnt < PIPE_SIZE)) {
828			if ((error = pipelock(wpipe,1)) == 0) {
829				int size;	/* Transfer size */
830				int segsize;	/* first segment to transfer */
831				/*
832				 * It is possible for a direct write to
833				 * slip in on us... handle it here...
834				 */
835				if (wpipe->pipe_state & PIPE_DIRECTW) {
836					pipeunlock(wpipe);
837					goto retrywrite;
838				}
839				/*
840				 * If a process blocked in uiomove, our
841				 * value for space might be bad.
842				 *
843				 * XXX will we be ok if the reader has gone
844				 * away here?
845				 */
846				if (space > wpipe->pipe_buffer.size -
847				    wpipe->pipe_buffer.cnt) {
848					pipeunlock(wpipe);
849					goto retrywrite;
850				}
851
852				/*
853				 * Transfer size is minimum of uio transfer
854				 * and free space in pipe buffer.
855				 */
856				if (space > uio->uio_resid)
857					size = uio->uio_resid;
858				else
859					size = space;
860				/*
861				 * First segment to transfer is minimum of
862				 * transfer size and contiguous space in
863				 * pipe buffer.  If first segment to transfer
864				 * is less than the transfer size, we've got
865				 * a wraparound in the buffer.
866				 */
867				segsize = wpipe->pipe_buffer.size -
868					wpipe->pipe_buffer.in;
869				if (segsize > size)
870					segsize = size;
871
872				/* Transfer first segment */
873
874				error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
875						segsize, uio);
876
877				if (error == 0 && segsize < size) {
878					/*
879					 * Transfer remaining part now, to
880					 * support atomic writes.  Wraparound
881					 * happened.
882					 */
883					if (wpipe->pipe_buffer.in + segsize !=
884					    wpipe->pipe_buffer.size)
885						panic("Expected pipe buffer wraparound disappeared");
886
887					error = uiomove(&wpipe->pipe_buffer.buffer[0],
888							size - segsize, uio);
889				}
890				if (error == 0) {
891					wpipe->pipe_buffer.in += size;
892					if (wpipe->pipe_buffer.in >=
893					    wpipe->pipe_buffer.size) {
894						if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size)
895							panic("Expected wraparound bad");
896						wpipe->pipe_buffer.in = size - segsize;
897					}
898
899					wpipe->pipe_buffer.cnt += size;
900					if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size)
901						panic("Pipe buffer overflow");
902
903				}
904				pipeunlock(wpipe);
905			}
906			if (error)
907				break;
908
909		} else {
910			/*
911			 * If the "read-side" has been blocked, wake it up now.
912			 */
913			if (wpipe->pipe_state & PIPE_WANTR) {
914				wpipe->pipe_state &= ~PIPE_WANTR;
915				wakeup(wpipe);
916			}
917
918			/*
919			 * don't block on non-blocking I/O
920			 */
921			if (fp->f_flag & FNONBLOCK) {
922				error = EAGAIN;
923				break;
924			}
925
926			/*
927			 * We have no more space and have something to offer,
928			 * wake up select/poll.
929			 */
930			pipeselwakeup(wpipe);
931
932			wpipe->pipe_state |= PIPE_WANTW;
933			if ((error = tsleep(wpipe, (PRIBIO+1)|PCATCH, "pipewr", 0)) != 0) {
934				break;
935			}
936			/*
937			 * If read side wants to go away, we just issue a signal
938			 * to ourselves.
939			 */
940			if (wpipe->pipe_state & PIPE_EOF) {
941				error = EPIPE;
942				break;
943			}
944		}
945	}
946
947	--wpipe->pipe_busy;
948	if ((wpipe->pipe_busy == 0) &&
949		(wpipe->pipe_state & PIPE_WANT)) {
950		wpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTR);
951		wakeup(wpipe);
952	} else if (wpipe->pipe_buffer.cnt > 0) {
953		/*
954		 * If we have put any characters in the buffer, we wake up
955		 * the reader.
956		 */
957		if (wpipe->pipe_state & PIPE_WANTR) {
958			wpipe->pipe_state &= ~PIPE_WANTR;
959			wakeup(wpipe);
960		}
961	}
962
963	/*
964	 * Don't return EPIPE if I/O was successful
965	 */
966	if ((wpipe->pipe_buffer.cnt == 0) &&
967		(uio->uio_resid == 0) &&
968		(error == EPIPE))
969		error = 0;
970
971	if (error == 0)
972		vfs_timestamp(&wpipe->pipe_mtime);
973
974	/*
975	 * We have something to offer,
976	 * wake up select/poll.
977	 */
978	if (wpipe->pipe_buffer.cnt)
979		pipeselwakeup(wpipe);
980
981	return error;
982}
983
984/*
985 * we implement a very minimal set of ioctls for compatibility with sockets.
986 */
987int
988pipe_ioctl(fp, cmd, data, p)
989	struct file *fp;
990	u_long cmd;
991	register caddr_t data;
992	struct proc *p;
993{
994	register struct pipe *mpipe = (struct pipe *)fp->f_data;
995
996	switch (cmd) {
997
998	case FIONBIO:
999		return (0);
1000
1001	case FIOASYNC:
1002		if (*(int *)data) {
1003			mpipe->pipe_state |= PIPE_ASYNC;
1004		} else {
1005			mpipe->pipe_state &= ~PIPE_ASYNC;
1006		}
1007		return (0);
1008
1009	case FIONREAD:
1010		if (mpipe->pipe_state & PIPE_DIRECTW)
1011			*(int *)data = mpipe->pipe_map.cnt;
1012		else
1013			*(int *)data = mpipe->pipe_buffer.cnt;
1014		return (0);
1015
1016	case FIOSETOWN:
1017		return (fsetown(*(int *)data, &mpipe->pipe_sigio));
1018
1019	case FIOGETOWN:
1020		*(int *)data = fgetown(mpipe->pipe_sigio);
1021		return (0);
1022
1023	/* This is deprecated, FIOSETOWN should be used instead. */
1024	case TIOCSPGRP:
1025		return (fsetown(-(*(int *)data), &mpipe->pipe_sigio));
1026
1027	/* This is deprecated, FIOGETOWN should be used instead. */
1028	case TIOCGPGRP:
1029		*(int *)data = -fgetown(mpipe->pipe_sigio);
1030		return (0);
1031
1032	}
1033	return (ENOTTY);
1034}
1035
1036int
1037pipe_poll(fp, events, cred, p)
1038	struct file *fp;
1039	int events;
1040	struct ucred *cred;
1041	struct proc *p;
1042{
1043	register struct pipe *rpipe = (struct pipe *)fp->f_data;
1044	struct pipe *wpipe;
1045	int revents = 0;
1046
1047	wpipe = rpipe->pipe_peer;
1048	if (events & (POLLIN | POLLRDNORM))
1049		if ((rpipe->pipe_state & PIPE_DIRECTW) ||
1050		    (rpipe->pipe_buffer.cnt > 0) ||
1051		    (rpipe->pipe_state & PIPE_EOF))
1052			revents |= events & (POLLIN | POLLRDNORM);
1053
1054	if (events & (POLLOUT | POLLWRNORM))
1055		if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF) ||
1056		    (((wpipe->pipe_state & PIPE_DIRECTW) == 0) &&
1057		     (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF))
1058			revents |= events & (POLLOUT | POLLWRNORM);
1059
1060	if ((rpipe->pipe_state & PIPE_EOF) ||
1061	    (wpipe == NULL) ||
1062	    (wpipe->pipe_state & PIPE_EOF))
1063		revents |= POLLHUP;
1064
1065	if (revents == 0) {
1066		if (events & (POLLIN | POLLRDNORM)) {
1067			selrecord(p, &rpipe->pipe_sel);
1068			rpipe->pipe_state |= PIPE_SEL;
1069		}
1070
1071		if (events & (POLLOUT | POLLWRNORM)) {
1072			selrecord(p, &wpipe->pipe_sel);
1073			wpipe->pipe_state |= PIPE_SEL;
1074		}
1075	}
1076
1077	return (revents);
1078}
1079
1080static int
1081pipe_stat(fp, ub, p)
1082	struct file *fp;
1083	struct stat *ub;
1084	struct proc *p;
1085{
1086	struct pipe *pipe = (struct pipe *)fp->f_data;
1087
1088	bzero((caddr_t)ub, sizeof (*ub));
1089	ub->st_mode = S_IFIFO;
1090	ub->st_blksize = pipe->pipe_buffer.size;
1091	ub->st_size = pipe->pipe_buffer.cnt;
1092	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1093	ub->st_atimespec = pipe->pipe_atime;
1094	ub->st_mtimespec = pipe->pipe_mtime;
1095	ub->st_ctimespec = pipe->pipe_ctime;
1096	ub->st_uid = fp->f_cred->cr_uid;
1097	ub->st_gid = fp->f_cred->cr_gid;
1098	/*
1099	 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen.
1100	 * XXX (st_dev, st_ino) should be unique.
1101	 */
1102	return 0;
1103}
1104
1105/* ARGSUSED */
1106static int
1107pipe_close(fp, p)
1108	struct file *fp;
1109	struct proc *p;
1110{
1111	struct pipe *cpipe = (struct pipe *)fp->f_data;
1112
1113	fp->f_ops = &badfileops;
1114	fp->f_data = NULL;
1115	funsetown(cpipe->pipe_sigio);
1116	pipeclose(cpipe);
1117	return 0;
1118}
1119
1120/*
1121 * shutdown the pipe
1122 */
1123static void
1124pipeclose(cpipe)
1125	struct pipe *cpipe;
1126{
1127	struct pipe *ppipe;
1128	if (cpipe) {
1129
1130		pipeselwakeup(cpipe);
1131
1132		/*
1133		 * If the other side is blocked, wake it up saying that
1134		 * we want to close it down.
1135		 */
1136		while (cpipe->pipe_busy) {
1137			wakeup(cpipe);
1138			cpipe->pipe_state |= PIPE_WANT|PIPE_EOF;
1139			tsleep(cpipe, PRIBIO, "pipecl", 0);
1140		}
1141
1142		/*
1143		 * Disconnect from peer
1144		 */
1145		if ((ppipe = cpipe->pipe_peer) != NULL) {
1146			pipeselwakeup(ppipe);
1147
1148			ppipe->pipe_state |= PIPE_EOF;
1149			wakeup(ppipe);
1150			ppipe->pipe_peer = NULL;
1151		}
1152
1153		/*
1154		 * free resources
1155		 */
1156		if (cpipe->pipe_buffer.buffer) {
1157			if (cpipe->pipe_buffer.size > PIPE_SIZE)
1158				--nbigpipe;
1159			amountpipekva -= cpipe->pipe_buffer.size;
1160			kmem_free(kernel_map,
1161				(vm_offset_t)cpipe->pipe_buffer.buffer,
1162				cpipe->pipe_buffer.size);
1163		}
1164#ifndef PIPE_NODIRECT
1165		if (cpipe->pipe_map.kva) {
1166			amountpipekva -= cpipe->pipe_buffer.size + PAGE_SIZE;
1167			kmem_free(kernel_map,
1168				cpipe->pipe_map.kva,
1169				cpipe->pipe_buffer.size + PAGE_SIZE);
1170		}
1171#endif
1172		zfree(pipe_zone, cpipe);
1173	}
1174}
1175
1176static int
1177filt_pipeattach(struct knote *kn)
1178{
1179	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1180
1181	SLIST_INSERT_HEAD(&rpipe->pipe_sel.si_note, kn, kn_selnext);
1182	return (0);
1183}
1184
1185static void
1186filt_pipedetach(struct knote *kn)
1187{
1188	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1189
1190	SLIST_REMOVE(&rpipe->pipe_sel.si_note, kn, knote, kn_selnext);
1191}
1192
1193/*ARGSUSED*/
1194static int
1195filt_piperead(struct knote *kn, long hint)
1196{
1197	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1198	struct pipe *wpipe = rpipe->pipe_peer;
1199
1200	kn->kn_data = rpipe->pipe_buffer.cnt;
1201	if ((kn->kn_data == 0) && (rpipe->pipe_state & PIPE_DIRECTW))
1202		kn->kn_data = rpipe->pipe_map.cnt;
1203
1204	if ((rpipe->pipe_state & PIPE_EOF) ||
1205	    (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
1206		kn->kn_flags |= EV_EOF;
1207		return (1);
1208	}
1209	return (kn->kn_data > 0);
1210}
1211
1212/*ARGSUSED*/
1213static int
1214filt_pipewrite(struct knote *kn, long hint)
1215{
1216	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1217	struct pipe *wpipe = rpipe->pipe_peer;
1218
1219	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
1220		kn->kn_data = 0;
1221		kn->kn_flags |= EV_EOF;
1222		return (1);
1223	}
1224	kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
1225	if ((wpipe->pipe_state & PIPE_DIRECTW) == 0)
1226		kn->kn_data = 0;
1227
1228	return (kn->kn_data >= PIPE_BUF);
1229}
1230