sys_pipe.c revision 27899
1/*
2 * Copyright (c) 1996 John S. Dyson
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice immediately at the beginning of the file, without modification,
10 *    this list of conditions, and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 * 3. Absolutely no warranty of function or purpose is made by the author
15 *    John S. Dyson.
16 * 4. Modifications may be freely made to this file if the above conditions
17 *    are met.
18 *
19 * $Id: sys_pipe.c,v 1.28 1997/04/09 16:53:39 bde Exp $
20 */
21
22/*
23 * This file contains a high-performance replacement for the socket-based
24 * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
25 * all features of sockets, but does do everything that pipes normally
26 * do.
27 */
28
29/*
30 * This code has two modes of operation, a small write mode and a large
31 * write mode.  The small write mode acts like conventional pipes with
32 * a kernel buffer.  If the buffer is less than PIPE_MINDIRECT, then the
33 * "normal" pipe buffering is done.  If the buffer is between PIPE_MINDIRECT
34 * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and
35 * the receiving process can copy it directly from the pages in the sending
36 * process.
37 *
38 * If the sending process receives a signal, it is possible that it will
39 * go away, and certainly its address space can change, because control
40 * is returned back to the user-mode side.  In that case, the pipe code
41 * arranges to copy the buffer supplied by the user process, to a pageable
42 * kernel buffer, and the receiving process will grab the data from the
43 * pageable kernel buffer.  Since signals don't happen all that often,
44 * the copy operation is normally eliminated.
45 *
46 * The constant PIPE_MINDIRECT is chosen to make sure that buffering will
47 * happen for small transfers so that the system will not spend all of
48 * its time context switching.  PIPE_SIZE is constrained by the
49 * amount of kernel virtual memory.
50 */
51
52#include <sys/param.h>
53#include <sys/systm.h>
54#include <sys/proc.h>
55#include <sys/fcntl.h>
56#include <sys/file.h>
57#include <sys/protosw.h>
58#include <sys/stat.h>
59#include <sys/filedesc.h>
60#include <sys/malloc.h>
61#include <sys/filio.h>
62#include <sys/ttycom.h>
63#include <sys/stat.h>
64#include <sys/select.h>
65#include <sys/signalvar.h>
66#include <sys/errno.h>
67#include <sys/queue.h>
68#include <sys/vmmeter.h>
69#include <sys/kernel.h>
70#include <sys/sysproto.h>
71#include <sys/pipe.h>
72
73#include <vm/vm.h>
74#include <vm/vm_prot.h>
75#include <vm/vm_param.h>
76#include <sys/lock.h>
77#include <vm/vm_object.h>
78#include <vm/vm_kern.h>
79#include <vm/vm_extern.h>
80#include <vm/pmap.h>
81#include <vm/vm_map.h>
82#include <vm/vm_page.h>
83#include <vm/vm_zone.h>
84
85/*
86 * Use this define if you want to disable *fancy* VM things.  Expect an
87 * approx 30% decrease in transfer rate.  This could be useful for
88 * NetBSD or OpenBSD.
89 */
90/* #define PIPE_NODIRECT */
91
92/*
93 * interfaces to the outside world
94 */
95static int pipe_read __P((struct file *fp, struct uio *uio,
96		struct ucred *cred));
97static int pipe_write __P((struct file *fp, struct uio *uio,
98		struct ucred *cred));
99static int pipe_close __P((struct file *fp, struct proc *p));
100static int pipe_select __P((struct file *fp, int which, struct proc *p));
101static int pipe_ioctl __P((struct file *fp, int cmd, caddr_t data, struct proc *p));
102
103static struct fileops pipeops =
104    { pipe_read, pipe_write, pipe_ioctl, pipe_select, pipe_close };
105
106/*
107 * Default pipe buffer size(s), this can be kind-of large now because pipe
108 * space is pageable.  The pipe code will try to maintain locality of
109 * reference for performance reasons, so small amounts of outstanding I/O
110 * will not wipe the cache.
111 */
112#define MINPIPESIZE (PIPE_SIZE/3)
113#define MAXPIPESIZE (2*PIPE_SIZE/3)
114
115/*
116 * Maximum amount of kva for pipes -- this is kind-of a soft limit, but
117 * is there so that on large systems, we don't exhaust it.
118 */
119#define MAXPIPEKVA (8*1024*1024)
120
121/*
122 * Limit for direct transfers, we cannot, of course limit
123 * the amount of kva for pipes in general though.
124 */
125#define LIMITPIPEKVA (16*1024*1024)
126
127/*
128 * Limit the number of "big" pipes
129 */
130#define LIMITBIGPIPES	32
131int nbigpipe;
132
133static int amountpipekva;
134
135static void pipeclose __P((struct pipe *cpipe));
136static void pipeinit __P((struct pipe *cpipe));
137static __inline int pipelock __P((struct pipe *cpipe, int catch));
138static __inline void pipeunlock __P((struct pipe *cpipe));
139static __inline void pipeselwakeup __P((struct pipe *cpipe));
140#ifndef PIPE_NODIRECT
141static int pipe_build_write_buffer __P((struct pipe *wpipe, struct uio *uio));
142static void pipe_destroy_write_buffer __P((struct pipe *wpipe));
143static int pipe_direct_write __P((struct pipe *wpipe, struct uio *uio));
144static void pipe_clone_write_buffer __P((struct pipe *wpipe));
145#endif
146static void pipespace __P((struct pipe *cpipe));
147
148vm_zone_t pipe_zone;
149
150/*
151 * The pipe system call for the DTYPE_PIPE type of pipes
152 */
153
154/* ARGSUSED */
155int
156pipe(p, uap, retval)
157	struct proc *p;
158	struct pipe_args /* {
159		int	dummy;
160	} */ *uap;
161	int retval[];
162{
163	register struct filedesc *fdp = p->p_fd;
164	struct file *rf, *wf;
165	struct pipe *rpipe, *wpipe;
166	int fd, error;
167
168	if (pipe_zone == NULL)
169		pipe_zone = zinit("PIPE", sizeof (struct pipe), 0,
170			ZONE_WAIT, 4);
171
172	rpipe = zalloc( pipe_zone);
173/*
174	rpipe = malloc( sizeof (*rpipe), M_TEMP, M_WAITOK);
175*/
176	pipeinit(rpipe);
177	rpipe->pipe_state |= PIPE_DIRECTOK;
178/*
179	wpipe = malloc( sizeof (*wpipe), M_TEMP, M_WAITOK);
180*/
181	wpipe = zalloc( pipe_zone);
182	pipeinit(wpipe);
183	wpipe->pipe_state |= PIPE_DIRECTOK;
184
185	error = falloc(p, &rf, &fd);
186	if (error)
187		goto free2;
188	retval[0] = fd;
189	rf->f_flag = FREAD | FWRITE;
190	rf->f_type = DTYPE_PIPE;
191	rf->f_ops = &pipeops;
192	rf->f_data = (caddr_t)rpipe;
193	error = falloc(p, &wf, &fd);
194	if (error)
195		goto free3;
196	wf->f_flag = FREAD | FWRITE;
197	wf->f_type = DTYPE_PIPE;
198	wf->f_ops = &pipeops;
199	wf->f_data = (caddr_t)wpipe;
200	retval[1] = fd;
201
202	rpipe->pipe_peer = wpipe;
203	wpipe->pipe_peer = rpipe;
204
205	return (0);
206free3:
207	ffree(rf);
208	fdp->fd_ofiles[retval[0]] = 0;
209free2:
210	(void)pipeclose(wpipe);
211	(void)pipeclose(rpipe);
212	return (error);
213}
214
215/*
216 * Allocate kva for pipe circular buffer, the space is pageable
217 */
218static void
219pipespace(cpipe)
220	struct pipe *cpipe;
221{
222	int npages, error;
223
224	npages = round_page(cpipe->pipe_buffer.size)/PAGE_SIZE;
225	/*
226	 * Create an object, I don't like the idea of paging to/from
227	 * kernel_object.
228	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
229	 */
230	cpipe->pipe_buffer.object = vm_object_allocate(OBJT_DEFAULT, npages);
231	cpipe->pipe_buffer.buffer = (caddr_t) vm_map_min(kernel_map);
232
233	/*
234	 * Insert the object into the kernel map, and allocate kva for it.
235	 * The map entry is, by default, pageable.
236	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
237	 */
238	error = vm_map_find(kernel_map, cpipe->pipe_buffer.object, 0,
239		(vm_offset_t *) &cpipe->pipe_buffer.buffer,
240		cpipe->pipe_buffer.size, 1,
241		VM_PROT_ALL, VM_PROT_ALL, 0);
242
243	if (error != KERN_SUCCESS)
244		panic("pipeinit: cannot allocate pipe -- out of kvm -- code = %d", error);
245	amountpipekva += cpipe->pipe_buffer.size;
246}
247
248/*
249 * initialize and allocate VM and memory for pipe
250 */
251static void
252pipeinit(cpipe)
253	struct pipe *cpipe;
254{
255	int s;
256
257	cpipe->pipe_buffer.in = 0;
258	cpipe->pipe_buffer.out = 0;
259	cpipe->pipe_buffer.cnt = 0;
260	cpipe->pipe_buffer.size = PIPE_SIZE;
261
262	/* Buffer kva gets dynamically allocated */
263	cpipe->pipe_buffer.buffer = NULL;
264	/* cpipe->pipe_buffer.object = invalid */
265
266	cpipe->pipe_state = 0;
267	cpipe->pipe_peer = NULL;
268	cpipe->pipe_busy = 0;
269	gettime(&cpipe->pipe_ctime);
270	cpipe->pipe_atime = cpipe->pipe_ctime;
271	cpipe->pipe_mtime = cpipe->pipe_ctime;
272	bzero(&cpipe->pipe_sel, sizeof cpipe->pipe_sel);
273	cpipe->pipe_pgid = NO_PID;
274
275#ifndef PIPE_NODIRECT
276	/*
277	 * pipe data structure initializations to support direct pipe I/O
278	 */
279	cpipe->pipe_map.cnt = 0;
280	cpipe->pipe_map.kva = 0;
281	cpipe->pipe_map.pos = 0;
282	cpipe->pipe_map.npages = 0;
283	/* cpipe->pipe_map.ms[] = invalid */
284#endif
285}
286
287
288/*
289 * lock a pipe for I/O, blocking other access
290 */
291static __inline int
292pipelock(cpipe, catch)
293	struct pipe *cpipe;
294	int catch;
295{
296	int error;
297	while (cpipe->pipe_state & PIPE_LOCK) {
298		cpipe->pipe_state |= PIPE_LWANT;
299		if (error = tsleep( cpipe,
300			catch?(PRIBIO|PCATCH):PRIBIO, "pipelk", 0)) {
301			return error;
302		}
303	}
304	cpipe->pipe_state |= PIPE_LOCK;
305	return 0;
306}
307
308/*
309 * unlock a pipe I/O lock
310 */
311static __inline void
312pipeunlock(cpipe)
313	struct pipe *cpipe;
314{
315	cpipe->pipe_state &= ~PIPE_LOCK;
316	if (cpipe->pipe_state & PIPE_LWANT) {
317		cpipe->pipe_state &= ~PIPE_LWANT;
318		wakeup(cpipe);
319	}
320}
321
322static __inline void
323pipeselwakeup(cpipe)
324	struct pipe *cpipe;
325{
326	struct proc *p;
327
328	if (cpipe->pipe_state & PIPE_SEL) {
329		cpipe->pipe_state &= ~PIPE_SEL;
330		selwakeup(&cpipe->pipe_sel);
331	}
332	if (cpipe->pipe_state & PIPE_ASYNC) {
333		if (cpipe->pipe_pgid < 0)
334			gsignal(-cpipe->pipe_pgid, SIGIO);
335		else if ((p = pfind(cpipe->pipe_pgid)) != NULL)
336			psignal(p, SIGIO);
337	}
338}
339
340/* ARGSUSED */
341static int
342pipe_read(fp, uio, cred)
343	struct file *fp;
344	struct uio *uio;
345	struct ucred *cred;
346{
347
348	struct pipe *rpipe = (struct pipe *) fp->f_data;
349	int error = 0;
350	int nread = 0;
351	u_int size;
352
353	++rpipe->pipe_busy;
354	while (uio->uio_resid) {
355		/*
356		 * normal pipe buffer receive
357		 */
358		if (rpipe->pipe_buffer.cnt > 0) {
359			size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
360			if (size > rpipe->pipe_buffer.cnt)
361				size = rpipe->pipe_buffer.cnt;
362			if (size > (u_int) uio->uio_resid)
363				size = (u_int) uio->uio_resid;
364			if ((error = pipelock(rpipe,1)) == 0) {
365				error = uiomove( &rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
366					size, uio);
367				pipeunlock(rpipe);
368			}
369			if (error) {
370				break;
371			}
372			rpipe->pipe_buffer.out += size;
373			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
374				rpipe->pipe_buffer.out = 0;
375
376			rpipe->pipe_buffer.cnt -= size;
377			nread += size;
378#ifndef PIPE_NODIRECT
379		/*
380		 * Direct copy, bypassing a kernel buffer.
381		 */
382		} else if ((size = rpipe->pipe_map.cnt) &&
383			(rpipe->pipe_state & PIPE_DIRECTW)) {
384			caddr_t va;
385			if (size > (u_int) uio->uio_resid)
386				size = (u_int) uio->uio_resid;
387			if ((error = pipelock(rpipe,1)) == 0) {
388				va = (caddr_t) rpipe->pipe_map.kva + rpipe->pipe_map.pos;
389				error = uiomove(va, size, uio);
390				pipeunlock(rpipe);
391			}
392			if (error)
393				break;
394			nread += size;
395			rpipe->pipe_map.pos += size;
396			rpipe->pipe_map.cnt -= size;
397			if (rpipe->pipe_map.cnt == 0) {
398				rpipe->pipe_state &= ~PIPE_DIRECTW;
399				wakeup(rpipe);
400			}
401#endif
402		} else {
403			/*
404			 * detect EOF condition
405			 */
406			if (rpipe->pipe_state & PIPE_EOF) {
407				/* XXX error = ? */
408				break;
409			}
410			/*
411			 * If the "write-side" has been blocked, wake it up now.
412			 */
413			if (rpipe->pipe_state & PIPE_WANTW) {
414				rpipe->pipe_state &= ~PIPE_WANTW;
415				wakeup(rpipe);
416			}
417			if (nread > 0)
418				break;
419
420			if (fp->f_flag & FNONBLOCK) {
421				error = EAGAIN;
422				break;
423			}
424
425			/*
426			 * If there is no more to read in the pipe, reset
427			 * its pointers to the beginning.  This improves
428			 * cache hit stats.
429			 */
430
431			if ((error = pipelock(rpipe,1)) == 0) {
432				if (rpipe->pipe_buffer.cnt == 0) {
433					rpipe->pipe_buffer.in = 0;
434					rpipe->pipe_buffer.out = 0;
435				}
436				pipeunlock(rpipe);
437			} else {
438				break;
439			}
440
441			if (rpipe->pipe_state & PIPE_WANTW) {
442				rpipe->pipe_state &= ~PIPE_WANTW;
443				wakeup(rpipe);
444			}
445
446			rpipe->pipe_state |= PIPE_WANTR;
447			if (error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) {
448				break;
449			}
450		}
451	}
452
453	if (error == 0)
454		gettime(&rpipe->pipe_atime);
455
456	--rpipe->pipe_busy;
457	if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
458		rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
459		wakeup(rpipe);
460	} else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
461		/*
462		 * If there is no more to read in the pipe, reset
463		 * its pointers to the beginning.  This improves
464		 * cache hit stats.
465		 */
466		if (rpipe->pipe_buffer.cnt == 0) {
467			if ((error == 0) && (error = pipelock(rpipe,1)) == 0) {
468				rpipe->pipe_buffer.in = 0;
469				rpipe->pipe_buffer.out = 0;
470				pipeunlock(rpipe);
471			}
472		}
473
474		/*
475		 * If the "write-side" has been blocked, wake it up now.
476		 */
477		if (rpipe->pipe_state & PIPE_WANTW) {
478			rpipe->pipe_state &= ~PIPE_WANTW;
479			wakeup(rpipe);
480		}
481	}
482
483	if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
484		pipeselwakeup(rpipe);
485
486	return error;
487}
488
489#ifndef PIPE_NODIRECT
490/*
491 * Map the sending processes' buffer into kernel space and wire it.
492 * This is similar to a physical write operation.
493 */
494static int
495pipe_build_write_buffer(wpipe, uio)
496	struct pipe *wpipe;
497	struct uio *uio;
498{
499	u_int size;
500	int i;
501	vm_offset_t addr, endaddr, paddr;
502
503	size = (u_int) uio->uio_iov->iov_len;
504	if (size > wpipe->pipe_buffer.size)
505		size = wpipe->pipe_buffer.size;
506
507	endaddr = round_page(uio->uio_iov->iov_base + size);
508	for(i = 0, addr = trunc_page(uio->uio_iov->iov_base);
509		addr < endaddr;
510		addr += PAGE_SIZE, i+=1) {
511
512		vm_page_t m;
513
514		vm_fault_quick( (caddr_t) addr, VM_PROT_READ);
515		paddr = pmap_kextract(addr);
516		if (!paddr) {
517			int j;
518			for(j=0;j<i;j++)
519				vm_page_unwire(wpipe->pipe_map.ms[j]);
520			return EFAULT;
521		}
522
523		m = PHYS_TO_VM_PAGE(paddr);
524		vm_page_wire(m);
525		wpipe->pipe_map.ms[i] = m;
526	}
527
528/*
529 * set up the control block
530 */
531	wpipe->pipe_map.npages = i;
532	wpipe->pipe_map.pos = ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK;
533	wpipe->pipe_map.cnt = size;
534
535/*
536 * and map the buffer
537 */
538	if (wpipe->pipe_map.kva == 0) {
539		/*
540		 * We need to allocate space for an extra page because the
541		 * address range might (will) span pages at times.
542		 */
543		wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map,
544			wpipe->pipe_buffer.size + PAGE_SIZE);
545		amountpipekva += wpipe->pipe_buffer.size + PAGE_SIZE;
546	}
547	pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms,
548		wpipe->pipe_map.npages);
549
550/*
551 * and update the uio data
552 */
553
554	uio->uio_iov->iov_len -= size;
555	uio->uio_iov->iov_base += size;
556	if (uio->uio_iov->iov_len == 0)
557		uio->uio_iov++;
558	uio->uio_resid -= size;
559	uio->uio_offset += size;
560	return 0;
561}
562
563/*
564 * unmap and unwire the process buffer
565 */
566static void
567pipe_destroy_write_buffer(wpipe)
568struct pipe *wpipe;
569{
570	int i;
571	if (wpipe->pipe_map.kva) {
572		pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages);
573
574		if (amountpipekva > MAXPIPEKVA) {
575			vm_offset_t kva = wpipe->pipe_map.kva;
576			wpipe->pipe_map.kva = 0;
577			kmem_free(kernel_map, kva,
578				wpipe->pipe_buffer.size + PAGE_SIZE);
579			amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE;
580		}
581	}
582	for (i=0;i<wpipe->pipe_map.npages;i++)
583		vm_page_unwire(wpipe->pipe_map.ms[i]);
584}
585
586/*
587 * In the case of a signal, the writing process might go away.  This
588 * code copies the data into the circular buffer so that the source
589 * pages can be freed without loss of data.
590 */
591static void
592pipe_clone_write_buffer(wpipe)
593struct pipe *wpipe;
594{
595	int size;
596	int pos;
597
598	size = wpipe->pipe_map.cnt;
599	pos = wpipe->pipe_map.pos;
600	bcopy((caddr_t) wpipe->pipe_map.kva+pos,
601			(caddr_t) wpipe->pipe_buffer.buffer,
602			size);
603
604	wpipe->pipe_buffer.in = size;
605	wpipe->pipe_buffer.out = 0;
606	wpipe->pipe_buffer.cnt = size;
607	wpipe->pipe_state &= ~PIPE_DIRECTW;
608
609	pipe_destroy_write_buffer(wpipe);
610}
611
612/*
613 * This implements the pipe buffer write mechanism.  Note that only
614 * a direct write OR a normal pipe write can be pending at any given time.
615 * If there are any characters in the pipe buffer, the direct write will
616 * be deferred until the receiving process grabs all of the bytes from
617 * the pipe buffer.  Then the direct mapping write is set-up.
618 */
619static int
620pipe_direct_write(wpipe, uio)
621	struct pipe *wpipe;
622	struct uio *uio;
623{
624	int error;
625retry:
626	while (wpipe->pipe_state & PIPE_DIRECTW) {
627		if ( wpipe->pipe_state & PIPE_WANTR) {
628			wpipe->pipe_state &= ~PIPE_WANTR;
629			wakeup(wpipe);
630		}
631		wpipe->pipe_state |= PIPE_WANTW;
632		error = tsleep(wpipe,
633				PRIBIO|PCATCH, "pipdww", 0);
634		if (error)
635			goto error1;
636		if (wpipe->pipe_state & PIPE_EOF) {
637			error = EPIPE;
638			goto error1;
639		}
640	}
641	wpipe->pipe_map.cnt = 0;	/* transfer not ready yet */
642	if (wpipe->pipe_buffer.cnt > 0) {
643		if ( wpipe->pipe_state & PIPE_WANTR) {
644			wpipe->pipe_state &= ~PIPE_WANTR;
645			wakeup(wpipe);
646		}
647
648		wpipe->pipe_state |= PIPE_WANTW;
649		error = tsleep(wpipe,
650				PRIBIO|PCATCH, "pipdwc", 0);
651		if (error)
652			goto error1;
653		if (wpipe->pipe_state & PIPE_EOF) {
654			error = EPIPE;
655			goto error1;
656		}
657		goto retry;
658	}
659
660	wpipe->pipe_state |= PIPE_DIRECTW;
661
662	error = pipe_build_write_buffer(wpipe, uio);
663	if (error) {
664		wpipe->pipe_state &= ~PIPE_DIRECTW;
665		goto error1;
666	}
667
668	error = 0;
669	while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) {
670		if (wpipe->pipe_state & PIPE_EOF) {
671			pipelock(wpipe, 0);
672			pipe_destroy_write_buffer(wpipe);
673			pipeunlock(wpipe);
674			pipeselwakeup(wpipe);
675			error = EPIPE;
676			goto error1;
677		}
678		if (wpipe->pipe_state & PIPE_WANTR) {
679			wpipe->pipe_state &= ~PIPE_WANTR;
680			wakeup(wpipe);
681		}
682		pipeselwakeup(wpipe);
683		error = tsleep(wpipe, PRIBIO|PCATCH, "pipdwt", 0);
684	}
685
686	pipelock(wpipe,0);
687	if (wpipe->pipe_state & PIPE_DIRECTW) {
688		/*
689		 * this bit of trickery substitutes a kernel buffer for
690		 * the process that might be going away.
691		 */
692		pipe_clone_write_buffer(wpipe);
693	} else {
694		pipe_destroy_write_buffer(wpipe);
695	}
696	pipeunlock(wpipe);
697	return error;
698
699error1:
700	wakeup(wpipe);
701	return error;
702}
703#endif
704
705static int
706pipe_write(fp, uio, cred)
707	struct file *fp;
708	struct uio *uio;
709	struct ucred *cred;
710{
711	int error = 0;
712	int orig_resid;
713
714	struct pipe *wpipe, *rpipe;
715
716	rpipe = (struct pipe *) fp->f_data;
717	wpipe = rpipe->pipe_peer;
718
719	/*
720	 * detect loss of pipe read side, issue SIGPIPE if lost.
721	 */
722	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
723		return EPIPE;
724	}
725
726	/*
727	 * If it is advantageous to resize the pipe buffer, do
728	 * so.
729	 */
730	if ((uio->uio_resid > PIPE_SIZE) &&
731		(nbigpipe < LIMITBIGPIPES) &&
732		(wpipe->pipe_state & PIPE_DIRECTW) == 0 &&
733		(wpipe->pipe_buffer.size <= PIPE_SIZE) &&
734		(wpipe->pipe_buffer.cnt == 0)) {
735
736		if (wpipe->pipe_buffer.buffer) {
737			amountpipekva -= wpipe->pipe_buffer.size;
738			kmem_free(kernel_map,
739				(vm_offset_t)wpipe->pipe_buffer.buffer,
740				wpipe->pipe_buffer.size);
741		}
742
743#ifndef PIPE_NODIRECT
744		if (wpipe->pipe_map.kva) {
745			amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE;
746			kmem_free(kernel_map,
747				wpipe->pipe_map.kva,
748				wpipe->pipe_buffer.size + PAGE_SIZE);
749		}
750#endif
751
752		wpipe->pipe_buffer.in = 0;
753		wpipe->pipe_buffer.out = 0;
754		wpipe->pipe_buffer.cnt = 0;
755		wpipe->pipe_buffer.size = BIG_PIPE_SIZE;
756		wpipe->pipe_buffer.buffer = NULL;
757		++nbigpipe;
758
759#ifndef PIPE_NODIRECT
760		wpipe->pipe_map.cnt = 0;
761		wpipe->pipe_map.kva = 0;
762		wpipe->pipe_map.pos = 0;
763		wpipe->pipe_map.npages = 0;
764#endif
765
766	}
767
768
769	if( wpipe->pipe_buffer.buffer == NULL) {
770		if ((error = pipelock(wpipe,1)) == 0) {
771			pipespace(wpipe);
772			pipeunlock(wpipe);
773		} else {
774			return error;
775		}
776	}
777
778	++wpipe->pipe_busy;
779	orig_resid = uio->uio_resid;
780	while (uio->uio_resid) {
781		int space;
782#ifndef PIPE_NODIRECT
783		/*
784		 * If the transfer is large, we can gain performance if
785		 * we do process-to-process copies directly.
786		 * If the write is non-blocking, we don't use the
787		 * direct write mechanism.
788		 */
789		if ((uio->uio_iov->iov_len >= PIPE_MINDIRECT) &&
790		    (fp->f_flag & FNONBLOCK) == 0 &&
791			(wpipe->pipe_map.kva || (amountpipekva < LIMITPIPEKVA)) &&
792			(uio->uio_iov->iov_len >= PIPE_MINDIRECT)) {
793			error = pipe_direct_write( wpipe, uio);
794			if (error) {
795				break;
796			}
797			continue;
798		}
799#endif
800
801		/*
802		 * Pipe buffered writes cannot be coincidental with
803		 * direct writes.  We wait until the currently executing
804		 * direct write is completed before we start filling the
805		 * pipe buffer.
806		 */
807	retrywrite:
808		while (wpipe->pipe_state & PIPE_DIRECTW) {
809			if (wpipe->pipe_state & PIPE_WANTR) {
810				wpipe->pipe_state &= ~PIPE_WANTR;
811				wakeup(wpipe);
812			}
813			error = tsleep(wpipe,
814					PRIBIO|PCATCH, "pipbww", 0);
815			if (error)
816				break;
817		}
818
819		space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
820
821		/* Writes of size <= PIPE_BUF must be atomic. */
822		/* XXX perhaps they need to be contiguous to be atomic? */
823		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
824			space = 0;
825
826		if (space > 0 && (wpipe->pipe_buffer.cnt < PIPE_SIZE)) {
827			/*
828			 * This set the maximum transfer as a segment of
829			 * the buffer.
830			 */
831			int size = wpipe->pipe_buffer.size - wpipe->pipe_buffer.in;
832			/*
833			 * space is the size left in the buffer
834			 */
835			if (size > space)
836				size = space;
837			/*
838			 * now limit it to the size of the uio transfer
839			 */
840			if (size > uio->uio_resid)
841				size = uio->uio_resid;
842			if ((error = pipelock(wpipe,1)) == 0) {
843				/*
844				 * It is possible for a direct write to
845				 * slip in on us... handle it here...
846				 */
847				if (wpipe->pipe_state & PIPE_DIRECTW) {
848					pipeunlock(wpipe);
849					goto retrywrite;
850				}
851				error = uiomove( &wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
852					size, uio);
853				pipeunlock(wpipe);
854			}
855			if (error)
856				break;
857
858			wpipe->pipe_buffer.in += size;
859			if (wpipe->pipe_buffer.in >= wpipe->pipe_buffer.size)
860				wpipe->pipe_buffer.in = 0;
861
862			wpipe->pipe_buffer.cnt += size;
863		} else {
864			/*
865			 * If the "read-side" has been blocked, wake it up now.
866			 */
867			if (wpipe->pipe_state & PIPE_WANTR) {
868				wpipe->pipe_state &= ~PIPE_WANTR;
869				wakeup(wpipe);
870			}
871
872			/*
873			 * don't block on non-blocking I/O
874			 */
875			if (fp->f_flag & FNONBLOCK) {
876				error = EAGAIN;
877				break;
878			}
879
880			/*
881			 * We have no more space and have something to offer,
882			 * wake up selects.
883			 */
884			pipeselwakeup(wpipe);
885
886			wpipe->pipe_state |= PIPE_WANTW;
887			if (error = tsleep(wpipe, (PRIBIO+1)|PCATCH, "pipewr", 0)) {
888				break;
889			}
890			/*
891			 * If read side wants to go away, we just issue a signal
892			 * to ourselves.
893			 */
894			if (wpipe->pipe_state & PIPE_EOF) {
895				error = EPIPE;
896				break;
897			}
898		}
899	}
900
901	--wpipe->pipe_busy;
902	if ((wpipe->pipe_busy == 0) &&
903		(wpipe->pipe_state & PIPE_WANT)) {
904		wpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTR);
905		wakeup(wpipe);
906	} else if (wpipe->pipe_buffer.cnt > 0) {
907		/*
908		 * If we have put any characters in the buffer, we wake up
909		 * the reader.
910		 */
911		if (wpipe->pipe_state & PIPE_WANTR) {
912			wpipe->pipe_state &= ~PIPE_WANTR;
913			wakeup(wpipe);
914		}
915	}
916
917	/*
918	 * Don't return EPIPE if I/O was successful
919	 */
920	if ((wpipe->pipe_buffer.cnt == 0) &&
921		(uio->uio_resid == 0) &&
922		(error == EPIPE))
923		error = 0;
924
925	if (error == 0)
926		gettime(&wpipe->pipe_mtime);
927
928	/*
929	 * We have something to offer,
930	 * wake up select.
931	 */
932	if (wpipe->pipe_buffer.cnt)
933		pipeselwakeup(wpipe);
934
935	return error;
936}
937
938/*
939 * we implement a very minimal set of ioctls for compatibility with sockets.
940 */
941int
942pipe_ioctl(fp, cmd, data, p)
943	struct file *fp;
944	int cmd;
945	register caddr_t data;
946	struct proc *p;
947{
948	register struct pipe *mpipe = (struct pipe *)fp->f_data;
949
950	switch (cmd) {
951
952	case FIONBIO:
953		return (0);
954
955	case FIOASYNC:
956		if (*(int *)data) {
957			mpipe->pipe_state |= PIPE_ASYNC;
958		} else {
959			mpipe->pipe_state &= ~PIPE_ASYNC;
960		}
961		return (0);
962
963	case FIONREAD:
964		if (mpipe->pipe_state & PIPE_DIRECTW)
965			*(int *)data = mpipe->pipe_map.cnt;
966		else
967			*(int *)data = mpipe->pipe_buffer.cnt;
968		return (0);
969
970	case TIOCSPGRP:
971		mpipe->pipe_pgid = *(int *)data;
972		return (0);
973
974	case TIOCGPGRP:
975		*(int *)data = mpipe->pipe_pgid;
976		return (0);
977
978	}
979	return (ENOTTY);
980}
981
982int
983pipe_select(fp, which, p)
984	struct file *fp;
985	int which;
986	struct proc *p;
987{
988	register struct pipe *rpipe = (struct pipe *)fp->f_data;
989	struct pipe *wpipe;
990
991	wpipe = rpipe->pipe_peer;
992	switch (which) {
993
994	case FREAD:
995		if ( (rpipe->pipe_state & PIPE_DIRECTW) ||
996			(rpipe->pipe_buffer.cnt > 0) ||
997			(rpipe->pipe_state & PIPE_EOF)) {
998			return (1);
999		}
1000		selrecord(p, &rpipe->pipe_sel);
1001		rpipe->pipe_state |= PIPE_SEL;
1002		break;
1003
1004	case FWRITE:
1005		if ((wpipe == NULL) ||
1006			(wpipe->pipe_state & PIPE_EOF) ||
1007			(((wpipe->pipe_state & PIPE_DIRECTW) == 0) &&
1008			 (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)) {
1009			return (1);
1010		}
1011		selrecord(p, &wpipe->pipe_sel);
1012		wpipe->pipe_state |= PIPE_SEL;
1013		break;
1014
1015	case 0:
1016		if ((rpipe->pipe_state & PIPE_EOF) ||
1017			(wpipe == NULL) ||
1018			(wpipe->pipe_state & PIPE_EOF)) {
1019			return (1);
1020		}
1021
1022		selrecord(p, &rpipe->pipe_sel);
1023		rpipe->pipe_state |= PIPE_SEL;
1024		break;
1025	}
1026	return (0);
1027}
1028
1029int
1030pipe_stat(pipe, ub)
1031	register struct pipe *pipe;
1032	register struct stat *ub;
1033{
1034	bzero((caddr_t)ub, sizeof (*ub));
1035	ub->st_mode = S_IFIFO;
1036	ub->st_blksize = pipe->pipe_buffer.size;
1037	ub->st_size = pipe->pipe_buffer.cnt;
1038	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1039	TIMEVAL_TO_TIMESPEC(&pipe->pipe_atime, &ub->st_atimespec);
1040	TIMEVAL_TO_TIMESPEC(&pipe->pipe_mtime, &ub->st_mtimespec);
1041	TIMEVAL_TO_TIMESPEC(&pipe->pipe_ctime, &ub->st_ctimespec);
1042	/*
1043	 * Left as 0: st_dev, st_ino, st_nlink, st_uid, st_gid, st_rdev,
1044	 * st_flags, st_gen.
1045	 * XXX (st_dev, st_ino) should be unique.
1046	 */
1047	return 0;
1048}
1049
1050/* ARGSUSED */
1051static int
1052pipe_close(fp, p)
1053	struct file *fp;
1054	struct proc *p;
1055{
1056	struct pipe *cpipe = (struct pipe *)fp->f_data;
1057
1058	pipeclose(cpipe);
1059	fp->f_data = NULL;
1060	return 0;
1061}
1062
1063/*
1064 * shutdown the pipe
1065 */
1066static void
1067pipeclose(cpipe)
1068	struct pipe *cpipe;
1069{
1070	struct pipe *ppipe;
1071	if (cpipe) {
1072
1073		pipeselwakeup(cpipe);
1074
1075		/*
1076		 * If the other side is blocked, wake it up saying that
1077		 * we want to close it down.
1078		 */
1079		while (cpipe->pipe_busy) {
1080			wakeup(cpipe);
1081			cpipe->pipe_state |= PIPE_WANT|PIPE_EOF;
1082			tsleep(cpipe, PRIBIO, "pipecl", 0);
1083		}
1084
1085		/*
1086		 * Disconnect from peer
1087		 */
1088		if (ppipe = cpipe->pipe_peer) {
1089			pipeselwakeup(ppipe);
1090
1091			ppipe->pipe_state |= PIPE_EOF;
1092			wakeup(ppipe);
1093			ppipe->pipe_peer = NULL;
1094		}
1095
1096		/*
1097		 * free resources
1098		 */
1099		if (cpipe->pipe_buffer.buffer) {
1100			if (cpipe->pipe_buffer.size > PIPE_SIZE)
1101				--nbigpipe;
1102			amountpipekva -= cpipe->pipe_buffer.size;
1103			kmem_free(kernel_map,
1104				(vm_offset_t)cpipe->pipe_buffer.buffer,
1105				cpipe->pipe_buffer.size);
1106		}
1107#ifndef PIPE_NODIRECT
1108		if (cpipe->pipe_map.kva) {
1109			amountpipekva -= cpipe->pipe_buffer.size + PAGE_SIZE;
1110			kmem_free(kernel_map,
1111				cpipe->pipe_map.kva,
1112				cpipe->pipe_buffer.size + PAGE_SIZE);
1113		}
1114#endif
1115		zfree(pipe_zone, cpipe);
1116/*
1117		free(cpipe, M_TEMP);
1118*/
1119	}
1120}
1121