svc.c revision 276272
1/*	$NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $	*/
2
3/*-
4 * Copyright (c) 2009, Sun Microsystems, Inc.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 * - Redistributions of source code must retain the above copyright notice,
10 *   this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright notice,
12 *   this list of conditions and the following disclaimer in the documentation
13 *   and/or other materials provided with the distribution.
14 * - Neither the name of Sun Microsystems, Inc. nor the names of its
15 *   contributors may be used to endorse or promote products derived
16 *   from this software without specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 */
30
31#if defined(LIBC_SCCS) && !defined(lint)
32static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
33static char *sccsid = "@(#)svc.c	2.4 88/08/11 4.0 RPCSRC";
34#endif
35#include <sys/cdefs.h>
36__FBSDID("$FreeBSD: stable/10/sys/rpc/svc.c 276272 2014-12-27 00:55:14Z kib $");
37
38/*
39 * svc.c, Server-side remote procedure call interface.
40 *
41 * There are two sets of procedures here.  The xprt routines are
42 * for handling transport handles.  The svc routines handle the
43 * list of service routines.
44 *
45 * Copyright (C) 1984, Sun Microsystems, Inc.
46 */
47
48#include <sys/param.h>
49#include <sys/lock.h>
50#include <sys/kernel.h>
51#include <sys/kthread.h>
52#include <sys/malloc.h>
53#include <sys/mbuf.h>
54#include <sys/mutex.h>
55#include <sys/proc.h>
56#include <sys/queue.h>
57#include <sys/socketvar.h>
58#include <sys/systm.h>
59#include <sys/smp.h>
60#include <sys/sx.h>
61#include <sys/ucred.h>
62
63#include <rpc/rpc.h>
64#include <rpc/rpcb_clnt.h>
65#include <rpc/replay.h>
66
67#include <rpc/rpc_com.h>
68
69#define SVC_VERSQUIET 0x0001		/* keep quiet about vers mismatch */
70#define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
71
72static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
73    char *);
74static void svc_new_thread(SVCGROUP *grp);
75static void xprt_unregister_locked(SVCXPRT *xprt);
76static void svc_change_space_used(SVCPOOL *pool, int delta);
77static bool_t svc_request_space_available(SVCPOOL *pool);
78
79/* ***************  SVCXPRT related stuff **************** */
80
81static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
82static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
83static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS);
84
85SVCPOOL*
86svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
87{
88	SVCPOOL *pool;
89	SVCGROUP *grp;
90	int g;
91
92	pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
93
94	mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
95	pool->sp_name = name;
96	pool->sp_state = SVCPOOL_INIT;
97	pool->sp_proc = NULL;
98	TAILQ_INIT(&pool->sp_callouts);
99	TAILQ_INIT(&pool->sp_lcallouts);
100	pool->sp_minthreads = 1;
101	pool->sp_maxthreads = 1;
102	pool->sp_groupcount = 1;
103	for (g = 0; g < SVC_MAXGROUPS; g++) {
104		grp = &pool->sp_groups[g];
105		mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF);
106		grp->sg_pool = pool;
107		grp->sg_state = SVCPOOL_ACTIVE;
108		TAILQ_INIT(&grp->sg_xlist);
109		TAILQ_INIT(&grp->sg_active);
110		LIST_INIT(&grp->sg_idlethreads);
111		grp->sg_minthreads = 1;
112		grp->sg_maxthreads = 1;
113	}
114
115	/*
116	 * Don't use more than a quarter of mbuf clusters or more than
117	 * 45Mb buffering requests.
118	 */
119	pool->sp_space_high = nmbclusters * MCLBYTES / 4;
120	if (pool->sp_space_high > 45 << 20)
121		pool->sp_space_high = 45 << 20;
122	pool->sp_space_low = 2 * pool->sp_space_high / 3;
123
124	sysctl_ctx_init(&pool->sp_sysctl);
125	if (sysctl_base) {
126		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
127		    "minthreads", CTLTYPE_INT | CTLFLAG_RW,
128		    pool, 0, svcpool_minthread_sysctl, "I",
129		    "Minimal number of threads");
130		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
131		    "maxthreads", CTLTYPE_INT | CTLFLAG_RW,
132		    pool, 0, svcpool_maxthread_sysctl, "I",
133		    "Maximal number of threads");
134		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
135		    "threads", CTLTYPE_INT | CTLFLAG_RD,
136		    pool, 0, svcpool_threads_sysctl, "I",
137		    "Current number of threads");
138		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
139		    "groups", CTLFLAG_RD, &pool->sp_groupcount, 0,
140		    "Number of thread groups");
141
142		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
143		    "request_space_used", CTLFLAG_RD,
144		    &pool->sp_space_used, 0,
145		    "Space in parsed but not handled requests.");
146
147		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
148		    "request_space_used_highest", CTLFLAG_RD,
149		    &pool->sp_space_used_highest, 0,
150		    "Highest space used since reboot.");
151
152		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
153		    "request_space_high", CTLFLAG_RW,
154		    &pool->sp_space_high, 0,
155		    "Maximum space in parsed but not handled requests.");
156
157		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
158		    "request_space_low", CTLFLAG_RW,
159		    &pool->sp_space_low, 0,
160		    "Low water mark for request space.");
161
162		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
163		    "request_space_throttled", CTLFLAG_RD,
164		    &pool->sp_space_throttled, 0,
165		    "Whether nfs requests are currently throttled");
166
167		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
168		    "request_space_throttle_count", CTLFLAG_RD,
169		    &pool->sp_space_throttle_count, 0,
170		    "Count of times throttling based on request space has occurred");
171	}
172
173	return pool;
174}
175
176void
177svcpool_destroy(SVCPOOL *pool)
178{
179	SVCGROUP *grp;
180	SVCXPRT *xprt, *nxprt;
181	struct svc_callout *s;
182	struct svc_loss_callout *sl;
183	struct svcxprt_list cleanup;
184	int g;
185
186	TAILQ_INIT(&cleanup);
187
188	for (g = 0; g < SVC_MAXGROUPS; g++) {
189		grp = &pool->sp_groups[g];
190		mtx_lock(&grp->sg_lock);
191		while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) {
192			xprt_unregister_locked(xprt);
193			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
194		}
195		mtx_unlock(&grp->sg_lock);
196	}
197	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
198		SVC_RELEASE(xprt);
199	}
200
201	mtx_lock(&pool->sp_lock);
202	while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
203		mtx_unlock(&pool->sp_lock);
204		svc_unreg(pool, s->sc_prog, s->sc_vers);
205		mtx_lock(&pool->sp_lock);
206	}
207	while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
208		mtx_unlock(&pool->sp_lock);
209		svc_loss_unreg(pool, sl->slc_dispatch);
210		mtx_lock(&pool->sp_lock);
211	}
212	mtx_unlock(&pool->sp_lock);
213
214	for (g = 0; g < SVC_MAXGROUPS; g++) {
215		grp = &pool->sp_groups[g];
216		mtx_destroy(&grp->sg_lock);
217	}
218	mtx_destroy(&pool->sp_lock);
219
220	if (pool->sp_rcache)
221		replay_freecache(pool->sp_rcache);
222
223	sysctl_ctx_free(&pool->sp_sysctl);
224	free(pool, M_RPC);
225}
226
227/*
228 * Sysctl handler to get the present thread count on a pool
229 */
230static int
231svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS)
232{
233	SVCPOOL *pool;
234	int threads, error, g;
235
236	pool = oidp->oid_arg1;
237	threads = 0;
238	mtx_lock(&pool->sp_lock);
239	for (g = 0; g < pool->sp_groupcount; g++)
240		threads += pool->sp_groups[g].sg_threadcount;
241	mtx_unlock(&pool->sp_lock);
242	error = sysctl_handle_int(oidp, &threads, 0, req);
243	return (error);
244}
245
246/*
247 * Sysctl handler to set the minimum thread count on a pool
248 */
249static int
250svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
251{
252	SVCPOOL *pool;
253	int newminthreads, error, g;
254
255	pool = oidp->oid_arg1;
256	newminthreads = pool->sp_minthreads;
257	error = sysctl_handle_int(oidp, &newminthreads, 0, req);
258	if (error == 0 && newminthreads != pool->sp_minthreads) {
259		if (newminthreads > pool->sp_maxthreads)
260			return (EINVAL);
261		mtx_lock(&pool->sp_lock);
262		pool->sp_minthreads = newminthreads;
263		for (g = 0; g < pool->sp_groupcount; g++) {
264			pool->sp_groups[g].sg_minthreads = max(1,
265			    pool->sp_minthreads / pool->sp_groupcount);
266		}
267		mtx_unlock(&pool->sp_lock);
268	}
269	return (error);
270}
271
272/*
273 * Sysctl handler to set the maximum thread count on a pool
274 */
275static int
276svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
277{
278	SVCPOOL *pool;
279	int newmaxthreads, error, g;
280
281	pool = oidp->oid_arg1;
282	newmaxthreads = pool->sp_maxthreads;
283	error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
284	if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
285		if (newmaxthreads < pool->sp_minthreads)
286			return (EINVAL);
287		mtx_lock(&pool->sp_lock);
288		pool->sp_maxthreads = newmaxthreads;
289		for (g = 0; g < pool->sp_groupcount; g++) {
290			pool->sp_groups[g].sg_maxthreads = max(1,
291			    pool->sp_maxthreads / pool->sp_groupcount);
292		}
293		mtx_unlock(&pool->sp_lock);
294	}
295	return (error);
296}
297
298/*
299 * Activate a transport handle.
300 */
301void
302xprt_register(SVCXPRT *xprt)
303{
304	SVCPOOL *pool = xprt->xp_pool;
305	SVCGROUP *grp;
306	int g;
307
308	SVC_ACQUIRE(xprt);
309	g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount;
310	xprt->xp_group = grp = &pool->sp_groups[g];
311	mtx_lock(&grp->sg_lock);
312	xprt->xp_registered = TRUE;
313	xprt->xp_active = FALSE;
314	TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link);
315	mtx_unlock(&grp->sg_lock);
316}
317
318/*
319 * De-activate a transport handle. Note: the locked version doesn't
320 * release the transport - caller must do that after dropping the pool
321 * lock.
322 */
323static void
324xprt_unregister_locked(SVCXPRT *xprt)
325{
326	SVCGROUP *grp = xprt->xp_group;
327
328	mtx_assert(&grp->sg_lock, MA_OWNED);
329	KASSERT(xprt->xp_registered == TRUE,
330	    ("xprt_unregister_locked: not registered"));
331	xprt_inactive_locked(xprt);
332	TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link);
333	xprt->xp_registered = FALSE;
334}
335
336void
337xprt_unregister(SVCXPRT *xprt)
338{
339	SVCGROUP *grp = xprt->xp_group;
340
341	mtx_lock(&grp->sg_lock);
342	if (xprt->xp_registered == FALSE) {
343		/* Already unregistered by another thread */
344		mtx_unlock(&grp->sg_lock);
345		return;
346	}
347	xprt_unregister_locked(xprt);
348	mtx_unlock(&grp->sg_lock);
349
350	SVC_RELEASE(xprt);
351}
352
353/*
354 * Attempt to assign a service thread to this transport.
355 */
356static int
357xprt_assignthread(SVCXPRT *xprt)
358{
359	SVCGROUP *grp = xprt->xp_group;
360	SVCTHREAD *st;
361
362	mtx_assert(&grp->sg_lock, MA_OWNED);
363	st = LIST_FIRST(&grp->sg_idlethreads);
364	if (st) {
365		LIST_REMOVE(st, st_ilink);
366		SVC_ACQUIRE(xprt);
367		xprt->xp_thread = st;
368		st->st_xprt = xprt;
369		cv_signal(&st->st_cond);
370		return (TRUE);
371	} else {
372		/*
373		 * See if we can create a new thread. The
374		 * actual thread creation happens in
375		 * svc_run_internal because our locking state
376		 * is poorly defined (we are typically called
377		 * from a socket upcall). Don't create more
378		 * than one thread per second.
379		 */
380		if (grp->sg_state == SVCPOOL_ACTIVE
381		    && grp->sg_lastcreatetime < time_uptime
382		    && grp->sg_threadcount < grp->sg_maxthreads) {
383			grp->sg_state = SVCPOOL_THREADWANTED;
384		}
385	}
386	return (FALSE);
387}
388
389void
390xprt_active(SVCXPRT *xprt)
391{
392	SVCGROUP *grp = xprt->xp_group;
393
394	mtx_lock(&grp->sg_lock);
395
396	if (!xprt->xp_registered) {
397		/*
398		 * Race with xprt_unregister - we lose.
399		 */
400		mtx_unlock(&grp->sg_lock);
401		return;
402	}
403
404	if (!xprt->xp_active) {
405		xprt->xp_active = TRUE;
406		if (xprt->xp_thread == NULL) {
407			if (!svc_request_space_available(xprt->xp_pool) ||
408			    !xprt_assignthread(xprt))
409				TAILQ_INSERT_TAIL(&grp->sg_active, xprt,
410				    xp_alink);
411		}
412	}
413
414	mtx_unlock(&grp->sg_lock);
415}
416
417void
418xprt_inactive_locked(SVCXPRT *xprt)
419{
420	SVCGROUP *grp = xprt->xp_group;
421
422	mtx_assert(&grp->sg_lock, MA_OWNED);
423	if (xprt->xp_active) {
424		if (xprt->xp_thread == NULL)
425			TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
426		xprt->xp_active = FALSE;
427	}
428}
429
430void
431xprt_inactive(SVCXPRT *xprt)
432{
433	SVCGROUP *grp = xprt->xp_group;
434
435	mtx_lock(&grp->sg_lock);
436	xprt_inactive_locked(xprt);
437	mtx_unlock(&grp->sg_lock);
438}
439
440/*
441 * Variant of xprt_inactive() for use only when sure that port is
442 * assigned to thread. For example, withing receive handlers.
443 */
444void
445xprt_inactive_self(SVCXPRT *xprt)
446{
447
448	KASSERT(xprt->xp_thread != NULL,
449	    ("xprt_inactive_self(%p) with NULL xp_thread", xprt));
450	xprt->xp_active = FALSE;
451}
452
453/*
454 * Add a service program to the callout list.
455 * The dispatch routine will be called when a rpc request for this
456 * program number comes in.
457 */
458bool_t
459svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
460    void (*dispatch)(struct svc_req *, SVCXPRT *),
461    const struct netconfig *nconf)
462{
463	SVCPOOL *pool = xprt->xp_pool;
464	struct svc_callout *s;
465	char *netid = NULL;
466	int flag = 0;
467
468/* VARIABLES PROTECTED BY svc_lock: s, svc_head */
469
470	if (xprt->xp_netid) {
471		netid = strdup(xprt->xp_netid, M_RPC);
472		flag = 1;
473	} else if (nconf && nconf->nc_netid) {
474		netid = strdup(nconf->nc_netid, M_RPC);
475		flag = 1;
476	} /* must have been created with svc_raw_create */
477	if ((netid == NULL) && (flag == 1)) {
478		return (FALSE);
479	}
480
481	mtx_lock(&pool->sp_lock);
482	if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
483		if (netid)
484			free(netid, M_RPC);
485		if (s->sc_dispatch == dispatch)
486			goto rpcb_it; /* he is registering another xptr */
487		mtx_unlock(&pool->sp_lock);
488		return (FALSE);
489	}
490	s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
491	if (s == NULL) {
492		if (netid)
493			free(netid, M_RPC);
494		mtx_unlock(&pool->sp_lock);
495		return (FALSE);
496	}
497
498	s->sc_prog = prog;
499	s->sc_vers = vers;
500	s->sc_dispatch = dispatch;
501	s->sc_netid = netid;
502	TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
503
504	if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
505		((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
506
507rpcb_it:
508	mtx_unlock(&pool->sp_lock);
509	/* now register the information with the local binder service */
510	if (nconf) {
511		bool_t dummy;
512		struct netconfig tnc;
513		struct netbuf nb;
514		tnc = *nconf;
515		nb.buf = &xprt->xp_ltaddr;
516		nb.len = xprt->xp_ltaddr.ss_len;
517		dummy = rpcb_set(prog, vers, &tnc, &nb);
518		return (dummy);
519	}
520	return (TRUE);
521}
522
523/*
524 * Remove a service program from the callout list.
525 */
526void
527svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
528{
529	struct svc_callout *s;
530
531	/* unregister the information anyway */
532	(void) rpcb_unset(prog, vers, NULL);
533	mtx_lock(&pool->sp_lock);
534	while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
535		TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
536		if (s->sc_netid)
537			mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
538		mem_free(s, sizeof (struct svc_callout));
539	}
540	mtx_unlock(&pool->sp_lock);
541}
542
543/*
544 * Add a service connection loss program to the callout list.
545 * The dispatch routine will be called when some port in ths pool die.
546 */
547bool_t
548svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *))
549{
550	SVCPOOL *pool = xprt->xp_pool;
551	struct svc_loss_callout *s;
552
553	mtx_lock(&pool->sp_lock);
554	TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
555		if (s->slc_dispatch == dispatch)
556			break;
557	}
558	if (s != NULL) {
559		mtx_unlock(&pool->sp_lock);
560		return (TRUE);
561	}
562	s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
563	if (s == NULL) {
564		mtx_unlock(&pool->sp_lock);
565		return (FALSE);
566	}
567	s->slc_dispatch = dispatch;
568	TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
569	mtx_unlock(&pool->sp_lock);
570	return (TRUE);
571}
572
573/*
574 * Remove a service connection loss program from the callout list.
575 */
576void
577svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
578{
579	struct svc_loss_callout *s;
580
581	mtx_lock(&pool->sp_lock);
582	TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
583		if (s->slc_dispatch == dispatch) {
584			TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
585			free(s, M_RPC);
586			break;
587		}
588	}
589	mtx_unlock(&pool->sp_lock);
590}
591
592/* ********************** CALLOUT list related stuff ************* */
593
594/*
595 * Search the callout list for a program number, return the callout
596 * struct.
597 */
598static struct svc_callout *
599svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
600{
601	struct svc_callout *s;
602
603	mtx_assert(&pool->sp_lock, MA_OWNED);
604	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
605		if (s->sc_prog == prog && s->sc_vers == vers
606		    && (netid == NULL || s->sc_netid == NULL ||
607			strcmp(netid, s->sc_netid) == 0))
608			break;
609	}
610
611	return (s);
612}
613
614/* ******************* REPLY GENERATION ROUTINES  ************ */
615
616static bool_t
617svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
618    struct mbuf *body)
619{
620	SVCXPRT *xprt = rqstp->rq_xprt;
621	bool_t ok;
622
623	if (rqstp->rq_args) {
624		m_freem(rqstp->rq_args);
625		rqstp->rq_args = NULL;
626	}
627
628	if (xprt->xp_pool->sp_rcache)
629		replay_setreply(xprt->xp_pool->sp_rcache,
630		    rply, svc_getrpccaller(rqstp), body);
631
632	if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
633		return (FALSE);
634
635	ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq);
636	if (rqstp->rq_addr) {
637		free(rqstp->rq_addr, M_SONAME);
638		rqstp->rq_addr = NULL;
639	}
640
641	return (ok);
642}
643
644/*
645 * Send a reply to an rpc request
646 */
647bool_t
648svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
649{
650	struct rpc_msg rply;
651	struct mbuf *m;
652	XDR xdrs;
653	bool_t ok;
654
655	rply.rm_xid = rqstp->rq_xid;
656	rply.rm_direction = REPLY;
657	rply.rm_reply.rp_stat = MSG_ACCEPTED;
658	rply.acpted_rply.ar_verf = rqstp->rq_verf;
659	rply.acpted_rply.ar_stat = SUCCESS;
660	rply.acpted_rply.ar_results.where = NULL;
661	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
662
663	m = m_getcl(M_WAITOK, MT_DATA, 0);
664	xdrmbuf_create(&xdrs, m, XDR_ENCODE);
665	ok = xdr_results(&xdrs, xdr_location);
666	XDR_DESTROY(&xdrs);
667
668	if (ok) {
669		return (svc_sendreply_common(rqstp, &rply, m));
670	} else {
671		m_freem(m);
672		return (FALSE);
673	}
674}
675
676bool_t
677svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
678{
679	struct rpc_msg rply;
680
681	rply.rm_xid = rqstp->rq_xid;
682	rply.rm_direction = REPLY;
683	rply.rm_reply.rp_stat = MSG_ACCEPTED;
684	rply.acpted_rply.ar_verf = rqstp->rq_verf;
685	rply.acpted_rply.ar_stat = SUCCESS;
686	rply.acpted_rply.ar_results.where = NULL;
687	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
688
689	return (svc_sendreply_common(rqstp, &rply, m));
690}
691
692/*
693 * No procedure error reply
694 */
695void
696svcerr_noproc(struct svc_req *rqstp)
697{
698	SVCXPRT *xprt = rqstp->rq_xprt;
699	struct rpc_msg rply;
700
701	rply.rm_xid = rqstp->rq_xid;
702	rply.rm_direction = REPLY;
703	rply.rm_reply.rp_stat = MSG_ACCEPTED;
704	rply.acpted_rply.ar_verf = rqstp->rq_verf;
705	rply.acpted_rply.ar_stat = PROC_UNAVAIL;
706
707	if (xprt->xp_pool->sp_rcache)
708		replay_setreply(xprt->xp_pool->sp_rcache,
709		    &rply, svc_getrpccaller(rqstp), NULL);
710
711	svc_sendreply_common(rqstp, &rply, NULL);
712}
713
714/*
715 * Can't decode args error reply
716 */
717void
718svcerr_decode(struct svc_req *rqstp)
719{
720	SVCXPRT *xprt = rqstp->rq_xprt;
721	struct rpc_msg rply;
722
723	rply.rm_xid = rqstp->rq_xid;
724	rply.rm_direction = REPLY;
725	rply.rm_reply.rp_stat = MSG_ACCEPTED;
726	rply.acpted_rply.ar_verf = rqstp->rq_verf;
727	rply.acpted_rply.ar_stat = GARBAGE_ARGS;
728
729	if (xprt->xp_pool->sp_rcache)
730		replay_setreply(xprt->xp_pool->sp_rcache,
731		    &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
732
733	svc_sendreply_common(rqstp, &rply, NULL);
734}
735
736/*
737 * Some system error
738 */
739void
740svcerr_systemerr(struct svc_req *rqstp)
741{
742	SVCXPRT *xprt = rqstp->rq_xprt;
743	struct rpc_msg rply;
744
745	rply.rm_xid = rqstp->rq_xid;
746	rply.rm_direction = REPLY;
747	rply.rm_reply.rp_stat = MSG_ACCEPTED;
748	rply.acpted_rply.ar_verf = rqstp->rq_verf;
749	rply.acpted_rply.ar_stat = SYSTEM_ERR;
750
751	if (xprt->xp_pool->sp_rcache)
752		replay_setreply(xprt->xp_pool->sp_rcache,
753		    &rply, svc_getrpccaller(rqstp), NULL);
754
755	svc_sendreply_common(rqstp, &rply, NULL);
756}
757
758/*
759 * Authentication error reply
760 */
761void
762svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
763{
764	SVCXPRT *xprt = rqstp->rq_xprt;
765	struct rpc_msg rply;
766
767	rply.rm_xid = rqstp->rq_xid;
768	rply.rm_direction = REPLY;
769	rply.rm_reply.rp_stat = MSG_DENIED;
770	rply.rjcted_rply.rj_stat = AUTH_ERROR;
771	rply.rjcted_rply.rj_why = why;
772
773	if (xprt->xp_pool->sp_rcache)
774		replay_setreply(xprt->xp_pool->sp_rcache,
775		    &rply, svc_getrpccaller(rqstp), NULL);
776
777	svc_sendreply_common(rqstp, &rply, NULL);
778}
779
780/*
781 * Auth too weak error reply
782 */
783void
784svcerr_weakauth(struct svc_req *rqstp)
785{
786
787	svcerr_auth(rqstp, AUTH_TOOWEAK);
788}
789
790/*
791 * Program unavailable error reply
792 */
793void
794svcerr_noprog(struct svc_req *rqstp)
795{
796	SVCXPRT *xprt = rqstp->rq_xprt;
797	struct rpc_msg rply;
798
799	rply.rm_xid = rqstp->rq_xid;
800	rply.rm_direction = REPLY;
801	rply.rm_reply.rp_stat = MSG_ACCEPTED;
802	rply.acpted_rply.ar_verf = rqstp->rq_verf;
803	rply.acpted_rply.ar_stat = PROG_UNAVAIL;
804
805	if (xprt->xp_pool->sp_rcache)
806		replay_setreply(xprt->xp_pool->sp_rcache,
807		    &rply, svc_getrpccaller(rqstp), NULL);
808
809	svc_sendreply_common(rqstp, &rply, NULL);
810}
811
812/*
813 * Program version mismatch error reply
814 */
815void
816svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
817{
818	SVCXPRT *xprt = rqstp->rq_xprt;
819	struct rpc_msg rply;
820
821	rply.rm_xid = rqstp->rq_xid;
822	rply.rm_direction = REPLY;
823	rply.rm_reply.rp_stat = MSG_ACCEPTED;
824	rply.acpted_rply.ar_verf = rqstp->rq_verf;
825	rply.acpted_rply.ar_stat = PROG_MISMATCH;
826	rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
827	rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
828
829	if (xprt->xp_pool->sp_rcache)
830		replay_setreply(xprt->xp_pool->sp_rcache,
831		    &rply, svc_getrpccaller(rqstp), NULL);
832
833	svc_sendreply_common(rqstp, &rply, NULL);
834}
835
836/*
837 * Allocate a new server transport structure. All fields are
838 * initialized to zero and xp_p3 is initialized to point at an
839 * extension structure to hold various flags and authentication
840 * parameters.
841 */
842SVCXPRT *
843svc_xprt_alloc()
844{
845	SVCXPRT *xprt;
846	SVCXPRT_EXT *ext;
847
848	xprt = mem_alloc(sizeof(SVCXPRT));
849	memset(xprt, 0, sizeof(SVCXPRT));
850	ext = mem_alloc(sizeof(SVCXPRT_EXT));
851	memset(ext, 0, sizeof(SVCXPRT_EXT));
852	xprt->xp_p3 = ext;
853	refcount_init(&xprt->xp_refs, 1);
854
855	return (xprt);
856}
857
858/*
859 * Free a server transport structure.
860 */
861void
862svc_xprt_free(xprt)
863	SVCXPRT *xprt;
864{
865
866	mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
867	mem_free(xprt, sizeof(SVCXPRT));
868}
869
870/* ******************* SERVER INPUT STUFF ******************* */
871
872/*
873 * Read RPC requests from a transport and queue them to be
874 * executed. We handle authentication and replay cache replies here.
875 * Actually dispatching the RPC is deferred till svc_executereq.
876 */
877static enum xprt_stat
878svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
879{
880	SVCPOOL *pool = xprt->xp_pool;
881	struct svc_req *r;
882	struct rpc_msg msg;
883	struct mbuf *args;
884	struct svc_loss_callout *s;
885	enum xprt_stat stat;
886
887	/* now receive msgs from xprtprt (support batch calls) */
888	r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
889
890	msg.rm_call.cb_cred.oa_base = r->rq_credarea;
891	msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
892	r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
893	if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
894		enum auth_stat why;
895
896		/*
897		 * Handle replays and authenticate before queuing the
898		 * request to be executed.
899		 */
900		SVC_ACQUIRE(xprt);
901		r->rq_xprt = xprt;
902		if (pool->sp_rcache) {
903			struct rpc_msg repmsg;
904			struct mbuf *repbody;
905			enum replay_state rs;
906			rs = replay_find(pool->sp_rcache, &msg,
907			    svc_getrpccaller(r), &repmsg, &repbody);
908			switch (rs) {
909			case RS_NEW:
910				break;
911			case RS_DONE:
912				SVC_REPLY(xprt, &repmsg, r->rq_addr,
913				    repbody, &r->rq_reply_seq);
914				if (r->rq_addr) {
915					free(r->rq_addr, M_SONAME);
916					r->rq_addr = NULL;
917				}
918				m_freem(args);
919				goto call_done;
920
921			default:
922				m_freem(args);
923				goto call_done;
924			}
925		}
926
927		r->rq_xid = msg.rm_xid;
928		r->rq_prog = msg.rm_call.cb_prog;
929		r->rq_vers = msg.rm_call.cb_vers;
930		r->rq_proc = msg.rm_call.cb_proc;
931		r->rq_size = sizeof(*r) + m_length(args, NULL);
932		r->rq_args = args;
933		if ((why = _authenticate(r, &msg)) != AUTH_OK) {
934			/*
935			 * RPCSEC_GSS uses this return code
936			 * for requests that form part of its
937			 * context establishment protocol and
938			 * should not be dispatched to the
939			 * application.
940			 */
941			if (why != RPCSEC_GSS_NODISPATCH)
942				svcerr_auth(r, why);
943			goto call_done;
944		}
945
946		if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
947			svcerr_decode(r);
948			goto call_done;
949		}
950
951		/*
952		 * Everything checks out, return request to caller.
953		 */
954		*rqstp_ret = r;
955		r = NULL;
956	}
957call_done:
958	if (r) {
959		svc_freereq(r);
960		r = NULL;
961	}
962	if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
963		TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
964			(*s->slc_dispatch)(xprt);
965		xprt_unregister(xprt);
966	}
967
968	return (stat);
969}
970
971static void
972svc_executereq(struct svc_req *rqstp)
973{
974	SVCXPRT *xprt = rqstp->rq_xprt;
975	SVCPOOL *pool = xprt->xp_pool;
976	int prog_found;
977	rpcvers_t low_vers;
978	rpcvers_t high_vers;
979	struct svc_callout *s;
980
981	/* now match message with a registered service*/
982	prog_found = FALSE;
983	low_vers = (rpcvers_t) -1L;
984	high_vers = (rpcvers_t) 0L;
985	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
986		if (s->sc_prog == rqstp->rq_prog) {
987			if (s->sc_vers == rqstp->rq_vers) {
988				/*
989				 * We hand ownership of r to the
990				 * dispatch method - they must call
991				 * svc_freereq.
992				 */
993				(*s->sc_dispatch)(rqstp, xprt);
994				return;
995			}  /* found correct version */
996			prog_found = TRUE;
997			if (s->sc_vers < low_vers)
998				low_vers = s->sc_vers;
999			if (s->sc_vers > high_vers)
1000				high_vers = s->sc_vers;
1001		}   /* found correct program */
1002	}
1003
1004	/*
1005	 * if we got here, the program or version
1006	 * is not served ...
1007	 */
1008	if (prog_found)
1009		svcerr_progvers(rqstp, low_vers, high_vers);
1010	else
1011		svcerr_noprog(rqstp);
1012
1013	svc_freereq(rqstp);
1014}
1015
1016static void
1017svc_checkidle(SVCGROUP *grp)
1018{
1019	SVCXPRT *xprt, *nxprt;
1020	time_t timo;
1021	struct svcxprt_list cleanup;
1022
1023	TAILQ_INIT(&cleanup);
1024	TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) {
1025		/*
1026		 * Only some transports have idle timers. Don't time
1027		 * something out which is just waking up.
1028		 */
1029		if (!xprt->xp_idletimeout || xprt->xp_thread)
1030			continue;
1031
1032		timo = xprt->xp_lastactive + xprt->xp_idletimeout;
1033		if (time_uptime > timo) {
1034			xprt_unregister_locked(xprt);
1035			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
1036		}
1037	}
1038
1039	mtx_unlock(&grp->sg_lock);
1040	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
1041		SVC_RELEASE(xprt);
1042	}
1043	mtx_lock(&grp->sg_lock);
1044}
1045
1046static void
1047svc_assign_waiting_sockets(SVCPOOL *pool)
1048{
1049	SVCGROUP *grp;
1050	SVCXPRT *xprt;
1051	int g;
1052
1053	for (g = 0; g < pool->sp_groupcount; g++) {
1054		grp = &pool->sp_groups[g];
1055		mtx_lock(&grp->sg_lock);
1056		while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1057			if (xprt_assignthread(xprt))
1058				TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1059			else
1060				break;
1061		}
1062		mtx_unlock(&grp->sg_lock);
1063	}
1064}
1065
1066static void
1067svc_change_space_used(SVCPOOL *pool, int delta)
1068{
1069	unsigned int value;
1070
1071	value = atomic_fetchadd_int(&pool->sp_space_used, delta) + delta;
1072	if (delta > 0) {
1073		if (value >= pool->sp_space_high && !pool->sp_space_throttled) {
1074			pool->sp_space_throttled = TRUE;
1075			pool->sp_space_throttle_count++;
1076		}
1077		if (value > pool->sp_space_used_highest)
1078			pool->sp_space_used_highest = value;
1079	} else {
1080		if (value < pool->sp_space_low && pool->sp_space_throttled) {
1081			pool->sp_space_throttled = FALSE;
1082			svc_assign_waiting_sockets(pool);
1083		}
1084	}
1085}
1086
1087static bool_t
1088svc_request_space_available(SVCPOOL *pool)
1089{
1090
1091	if (pool->sp_space_throttled)
1092		return (FALSE);
1093	return (TRUE);
1094}
1095
1096static void
1097svc_run_internal(SVCGROUP *grp, bool_t ismaster)
1098{
1099	SVCPOOL *pool = grp->sg_pool;
1100	SVCTHREAD *st, *stpref;
1101	SVCXPRT *xprt;
1102	enum xprt_stat stat;
1103	struct svc_req *rqstp;
1104	struct proc *p;
1105	size_t sz;
1106	int error;
1107
1108	st = mem_alloc(sizeof(*st));
1109	mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF);
1110	st->st_pool = pool;
1111	st->st_xprt = NULL;
1112	STAILQ_INIT(&st->st_reqs);
1113	cv_init(&st->st_cond, "rpcsvc");
1114
1115	mtx_lock(&grp->sg_lock);
1116
1117	/*
1118	 * If we are a new thread which was spawned to cope with
1119	 * increased load, set the state back to SVCPOOL_ACTIVE.
1120	 */
1121	if (grp->sg_state == SVCPOOL_THREADSTARTING)
1122		grp->sg_state = SVCPOOL_ACTIVE;
1123
1124	while (grp->sg_state != SVCPOOL_CLOSING) {
1125		/*
1126		 * Create new thread if requested.
1127		 */
1128		if (grp->sg_state == SVCPOOL_THREADWANTED) {
1129			grp->sg_state = SVCPOOL_THREADSTARTING;
1130			grp->sg_lastcreatetime = time_uptime;
1131			mtx_unlock(&grp->sg_lock);
1132			svc_new_thread(grp);
1133			mtx_lock(&grp->sg_lock);
1134			continue;
1135		}
1136
1137		/*
1138		 * Check for idle transports once per second.
1139		 */
1140		if (time_uptime > grp->sg_lastidlecheck) {
1141			grp->sg_lastidlecheck = time_uptime;
1142			svc_checkidle(grp);
1143		}
1144
1145		xprt = st->st_xprt;
1146		if (!xprt) {
1147			/*
1148			 * Enforce maxthreads count.
1149			 */
1150			if (grp->sg_threadcount > grp->sg_maxthreads)
1151				break;
1152
1153			/*
1154			 * Before sleeping, see if we can find an
1155			 * active transport which isn't being serviced
1156			 * by a thread.
1157			 */
1158			if (svc_request_space_available(pool) &&
1159			    (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1160				TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1161				SVC_ACQUIRE(xprt);
1162				xprt->xp_thread = st;
1163				st->st_xprt = xprt;
1164				continue;
1165			}
1166
1167			LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink);
1168			if (ismaster || (!ismaster &&
1169			    grp->sg_threadcount > grp->sg_minthreads))
1170				error = cv_timedwait_sig(&st->st_cond,
1171				    &grp->sg_lock, 5 * hz);
1172			else
1173				error = cv_wait_sig(&st->st_cond,
1174				    &grp->sg_lock);
1175			if (st->st_xprt == NULL)
1176				LIST_REMOVE(st, st_ilink);
1177
1178			/*
1179			 * Reduce worker thread count when idle.
1180			 */
1181			if (error == EWOULDBLOCK) {
1182				if (!ismaster
1183				    && (grp->sg_threadcount
1184					> grp->sg_minthreads)
1185					&& !st->st_xprt)
1186					break;
1187			} else if (error != 0) {
1188				KASSERT(error == EINTR || error == ERESTART,
1189				    ("non-signal error %d", error));
1190				mtx_unlock(&grp->sg_lock);
1191				p = curproc;
1192				PROC_LOCK(p);
1193				if (P_SHOULDSTOP(p) ||
1194				    (p->p_flag & P_TOTAL_STOP) != 0) {
1195					thread_suspend_check(0);
1196					PROC_UNLOCK(p);
1197					mtx_lock(&grp->sg_lock);
1198				} else {
1199					PROC_UNLOCK(p);
1200					svc_exit(pool);
1201					mtx_lock(&grp->sg_lock);
1202					break;
1203				}
1204			}
1205			continue;
1206		}
1207		mtx_unlock(&grp->sg_lock);
1208
1209		/*
1210		 * Drain the transport socket and queue up any RPCs.
1211		 */
1212		xprt->xp_lastactive = time_uptime;
1213		do {
1214			if (!svc_request_space_available(pool))
1215				break;
1216			rqstp = NULL;
1217			stat = svc_getreq(xprt, &rqstp);
1218			if (rqstp) {
1219				svc_change_space_used(pool, rqstp->rq_size);
1220				/*
1221				 * See if the application has a preference
1222				 * for some other thread.
1223				 */
1224				if (pool->sp_assign) {
1225					stpref = pool->sp_assign(st, rqstp);
1226					rqstp->rq_thread = stpref;
1227					STAILQ_INSERT_TAIL(&stpref->st_reqs,
1228					    rqstp, rq_link);
1229					mtx_unlock(&stpref->st_lock);
1230					if (stpref != st)
1231						rqstp = NULL;
1232				} else {
1233					rqstp->rq_thread = st;
1234					STAILQ_INSERT_TAIL(&st->st_reqs,
1235					    rqstp, rq_link);
1236				}
1237			}
1238		} while (rqstp == NULL && stat == XPRT_MOREREQS
1239		    && grp->sg_state != SVCPOOL_CLOSING);
1240
1241		/*
1242		 * Move this transport to the end of the active list to
1243		 * ensure fairness when multiple transports are active.
1244		 * If this was the last queued request, svc_getreq will end
1245		 * up calling xprt_inactive to remove from the active list.
1246		 */
1247		mtx_lock(&grp->sg_lock);
1248		xprt->xp_thread = NULL;
1249		st->st_xprt = NULL;
1250		if (xprt->xp_active) {
1251			if (!svc_request_space_available(pool) ||
1252			    !xprt_assignthread(xprt))
1253				TAILQ_INSERT_TAIL(&grp->sg_active,
1254				    xprt, xp_alink);
1255		}
1256		mtx_unlock(&grp->sg_lock);
1257		SVC_RELEASE(xprt);
1258
1259		/*
1260		 * Execute what we have queued.
1261		 */
1262		sz = 0;
1263		mtx_lock(&st->st_lock);
1264		while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1265			STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1266			mtx_unlock(&st->st_lock);
1267			sz += rqstp->rq_size;
1268			svc_executereq(rqstp);
1269			mtx_lock(&st->st_lock);
1270		}
1271		mtx_unlock(&st->st_lock);
1272		svc_change_space_used(pool, -sz);
1273		mtx_lock(&grp->sg_lock);
1274	}
1275
1276	if (st->st_xprt) {
1277		xprt = st->st_xprt;
1278		st->st_xprt = NULL;
1279		SVC_RELEASE(xprt);
1280	}
1281	KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1282	mtx_destroy(&st->st_lock);
1283	cv_destroy(&st->st_cond);
1284	mem_free(st, sizeof(*st));
1285
1286	grp->sg_threadcount--;
1287	if (!ismaster)
1288		wakeup(grp);
1289	mtx_unlock(&grp->sg_lock);
1290}
1291
1292static void
1293svc_thread_start(void *arg)
1294{
1295
1296	svc_run_internal((SVCGROUP *) arg, FALSE);
1297	kthread_exit();
1298}
1299
1300static void
1301svc_new_thread(SVCGROUP *grp)
1302{
1303	SVCPOOL *pool = grp->sg_pool;
1304	struct thread *td;
1305
1306	grp->sg_threadcount++;
1307	kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0,
1308	    "%s: service", pool->sp_name);
1309}
1310
1311void
1312svc_run(SVCPOOL *pool)
1313{
1314	int g, i;
1315	struct proc *p;
1316	struct thread *td;
1317	SVCGROUP *grp;
1318
1319	p = curproc;
1320	td = curthread;
1321	snprintf(td->td_name, sizeof(td->td_name),
1322	    "%s: master", pool->sp_name);
1323	pool->sp_state = SVCPOOL_ACTIVE;
1324	pool->sp_proc = p;
1325
1326	/* Choose group count based on number of threads and CPUs. */
1327	pool->sp_groupcount = max(1, min(SVC_MAXGROUPS,
1328	    min(pool->sp_maxthreads / 2, mp_ncpus) / 6));
1329	for (g = 0; g < pool->sp_groupcount; g++) {
1330		grp = &pool->sp_groups[g];
1331		grp->sg_minthreads = max(1,
1332		    pool->sp_minthreads / pool->sp_groupcount);
1333		grp->sg_maxthreads = max(1,
1334		    pool->sp_maxthreads / pool->sp_groupcount);
1335		grp->sg_lastcreatetime = time_uptime;
1336	}
1337
1338	/* Starting threads */
1339	for (g = 0; g < pool->sp_groupcount; g++) {
1340		grp = &pool->sp_groups[g];
1341		for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++)
1342			svc_new_thread(grp);
1343	}
1344	pool->sp_groups[0].sg_threadcount++;
1345	svc_run_internal(&pool->sp_groups[0], TRUE);
1346
1347	/* Waiting for threads to stop. */
1348	for (g = 0; g < pool->sp_groupcount; g++) {
1349		grp = &pool->sp_groups[g];
1350		mtx_lock(&grp->sg_lock);
1351		while (grp->sg_threadcount > 0)
1352			msleep(grp, &grp->sg_lock, 0, "svcexit", 0);
1353		mtx_unlock(&grp->sg_lock);
1354	}
1355}
1356
1357void
1358svc_exit(SVCPOOL *pool)
1359{
1360	SVCGROUP *grp;
1361	SVCTHREAD *st;
1362	int g;
1363
1364	pool->sp_state = SVCPOOL_CLOSING;
1365	for (g = 0; g < pool->sp_groupcount; g++) {
1366		grp = &pool->sp_groups[g];
1367		mtx_lock(&grp->sg_lock);
1368		if (grp->sg_state != SVCPOOL_CLOSING) {
1369			grp->sg_state = SVCPOOL_CLOSING;
1370			LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink)
1371				cv_signal(&st->st_cond);
1372		}
1373		mtx_unlock(&grp->sg_lock);
1374	}
1375}
1376
1377bool_t
1378svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1379{
1380	struct mbuf *m;
1381	XDR xdrs;
1382	bool_t stat;
1383
1384	m = rqstp->rq_args;
1385	rqstp->rq_args = NULL;
1386
1387	xdrmbuf_create(&xdrs, m, XDR_DECODE);
1388	stat = xargs(&xdrs, args);
1389	XDR_DESTROY(&xdrs);
1390
1391	return (stat);
1392}
1393
1394bool_t
1395svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1396{
1397	XDR xdrs;
1398
1399	if (rqstp->rq_addr) {
1400		free(rqstp->rq_addr, M_SONAME);
1401		rqstp->rq_addr = NULL;
1402	}
1403
1404	xdrs.x_op = XDR_FREE;
1405	return (xargs(&xdrs, args));
1406}
1407
1408void
1409svc_freereq(struct svc_req *rqstp)
1410{
1411	SVCTHREAD *st;
1412	SVCPOOL *pool;
1413
1414	st = rqstp->rq_thread;
1415	if (st) {
1416		pool = st->st_pool;
1417		if (pool->sp_done)
1418			pool->sp_done(st, rqstp);
1419	}
1420
1421	if (rqstp->rq_auth.svc_ah_ops)
1422		SVCAUTH_RELEASE(&rqstp->rq_auth);
1423
1424	if (rqstp->rq_xprt) {
1425		SVC_RELEASE(rqstp->rq_xprt);
1426	}
1427
1428	if (rqstp->rq_addr)
1429		free(rqstp->rq_addr, M_SONAME);
1430
1431	if (rqstp->rq_args)
1432		m_freem(rqstp->rq_args);
1433
1434	free(rqstp, M_RPC);
1435}
1436