1/*	$NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $	*/
2
3/*-
4 * SPDX-License-Identifier: BSD-3-Clause
5 *
6 * Copyright (c) 2009, Sun Microsystems, Inc.
7 * All rights reserved.
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions are met:
11 * - Redistributions of source code must retain the above copyright notice,
12 *   this list of conditions and the following disclaimer.
13 * - Redistributions in binary form must reproduce the above copyright notice,
14 *   this list of conditions and the following disclaimer in the documentation
15 *   and/or other materials provided with the distribution.
16 * - Neither the name of Sun Microsystems, Inc. nor the names of its
17 *   contributors may be used to endorse or promote products derived
18 *   from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
24 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 * POSSIBILITY OF SUCH DAMAGE.
31 */
32
33#include <sys/cdefs.h>
34/*
35 * svc.c, Server-side remote procedure call interface.
36 *
37 * There are two sets of procedures here.  The xprt routines are
38 * for handling transport handles.  The svc routines handle the
39 * list of service routines.
40 *
41 * Copyright (C) 1984, Sun Microsystems, Inc.
42 */
43
44#include <sys/param.h>
45#include <sys/jail.h>
46#include <sys/lock.h>
47#include <sys/kernel.h>
48#include <sys/kthread.h>
49#include <sys/malloc.h>
50#include <sys/mbuf.h>
51#include <sys/mutex.h>
52#include <sys/proc.h>
53#include <sys/protosw.h>
54#include <sys/queue.h>
55#include <sys/socketvar.h>
56#include <sys/systm.h>
57#include <sys/smp.h>
58#include <sys/sx.h>
59#include <sys/ucred.h>
60
61#include <netinet/tcp.h>
62
63#include <rpc/rpc.h>
64#include <rpc/rpcb_clnt.h>
65#include <rpc/replay.h>
66
67#include <rpc/rpc_com.h>
68
69#define SVC_VERSQUIET 0x0001		/* keep quiet about vers mismatch */
70#define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
71
72static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
73    char *);
74static void svc_new_thread(SVCGROUP *grp);
75static void xprt_unregister_locked(SVCXPRT *xprt);
76static void svc_change_space_used(SVCPOOL *pool, long delta);
77static bool_t svc_request_space_available(SVCPOOL *pool);
78static void svcpool_cleanup(SVCPOOL *pool);
79
80/* ***************  SVCXPRT related stuff **************** */
81
82static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
83static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
84static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS);
85
86SVCPOOL*
87svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
88{
89	SVCPOOL *pool;
90	SVCGROUP *grp;
91	int g;
92
93	pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
94
95	mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
96	pool->sp_name = name;
97	pool->sp_state = SVCPOOL_INIT;
98	pool->sp_proc = NULL;
99	TAILQ_INIT(&pool->sp_callouts);
100	TAILQ_INIT(&pool->sp_lcallouts);
101	pool->sp_minthreads = 1;
102	pool->sp_maxthreads = 1;
103	pool->sp_groupcount = 1;
104	for (g = 0; g < SVC_MAXGROUPS; g++) {
105		grp = &pool->sp_groups[g];
106		mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF);
107		grp->sg_pool = pool;
108		grp->sg_state = SVCPOOL_ACTIVE;
109		TAILQ_INIT(&grp->sg_xlist);
110		TAILQ_INIT(&grp->sg_active);
111		LIST_INIT(&grp->sg_idlethreads);
112		grp->sg_minthreads = 1;
113		grp->sg_maxthreads = 1;
114	}
115
116	/*
117	 * Don't use more than a quarter of mbuf clusters.  Nota bene:
118	 * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow
119	 * on LP64 architectures, so cast to u_long to avoid undefined
120	 * behavior.  (ILP32 architectures cannot have nmbclusters
121	 * large enough to overflow for other reasons.)
122	 */
123	pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4;
124	pool->sp_space_low = (pool->sp_space_high / 3) * 2;
125
126	sysctl_ctx_init(&pool->sp_sysctl);
127	if (IS_DEFAULT_VNET(curvnet) && sysctl_base) {
128		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
129		    "minthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE,
130		    pool, 0, svcpool_minthread_sysctl, "I",
131		    "Minimal number of threads");
132		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
133		    "maxthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE,
134		    pool, 0, svcpool_maxthread_sysctl, "I",
135		    "Maximal number of threads");
136		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
137		    "threads", CTLTYPE_INT | CTLFLAG_RD | CTLFLAG_MPSAFE,
138		    pool, 0, svcpool_threads_sysctl, "I",
139		    "Current number of threads");
140		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
141		    "groups", CTLFLAG_RD, &pool->sp_groupcount, 0,
142		    "Number of thread groups");
143
144		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
145		    "request_space_used", CTLFLAG_RD,
146		    &pool->sp_space_used,
147		    "Space in parsed but not handled requests.");
148
149		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
150		    "request_space_used_highest", CTLFLAG_RD,
151		    &pool->sp_space_used_highest,
152		    "Highest space used since reboot.");
153
154		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
155		    "request_space_high", CTLFLAG_RW,
156		    &pool->sp_space_high,
157		    "Maximum space in parsed but not handled requests.");
158
159		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
160		    "request_space_low", CTLFLAG_RW,
161		    &pool->sp_space_low,
162		    "Low water mark for request space.");
163
164		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
165		    "request_space_throttled", CTLFLAG_RD,
166		    &pool->sp_space_throttled, 0,
167		    "Whether nfs requests are currently throttled");
168
169		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
170		    "request_space_throttle_count", CTLFLAG_RD,
171		    &pool->sp_space_throttle_count, 0,
172		    "Count of times throttling based on request space has occurred");
173	}
174
175	return pool;
176}
177
178/*
179 * Code common to svcpool_destroy() and svcpool_close(), which cleans up
180 * the pool data structures.
181 */
182static void
183svcpool_cleanup(SVCPOOL *pool)
184{
185	SVCGROUP *grp;
186	SVCXPRT *xprt, *nxprt;
187	struct svc_callout *s;
188	struct svc_loss_callout *sl;
189	struct svcxprt_list cleanup;
190	int g;
191
192	TAILQ_INIT(&cleanup);
193
194	for (g = 0; g < SVC_MAXGROUPS; g++) {
195		grp = &pool->sp_groups[g];
196		mtx_lock(&grp->sg_lock);
197		while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) {
198			xprt_unregister_locked(xprt);
199			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
200		}
201		mtx_unlock(&grp->sg_lock);
202	}
203	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
204		if (xprt->xp_socket != NULL)
205			soshutdown(xprt->xp_socket, SHUT_WR);
206		SVC_RELEASE(xprt);
207	}
208
209	mtx_lock(&pool->sp_lock);
210	while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
211		mtx_unlock(&pool->sp_lock);
212		svc_unreg(pool, s->sc_prog, s->sc_vers);
213		mtx_lock(&pool->sp_lock);
214	}
215	while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
216		mtx_unlock(&pool->sp_lock);
217		svc_loss_unreg(pool, sl->slc_dispatch);
218		mtx_lock(&pool->sp_lock);
219	}
220	mtx_unlock(&pool->sp_lock);
221}
222
223void
224svcpool_destroy(SVCPOOL *pool)
225{
226	SVCGROUP *grp;
227	int g;
228
229	svcpool_cleanup(pool);
230
231	for (g = 0; g < SVC_MAXGROUPS; g++) {
232		grp = &pool->sp_groups[g];
233		mtx_destroy(&grp->sg_lock);
234	}
235	mtx_destroy(&pool->sp_lock);
236
237	if (pool->sp_rcache)
238		replay_freecache(pool->sp_rcache);
239
240	sysctl_ctx_free(&pool->sp_sysctl);
241	free(pool, M_RPC);
242}
243
244/*
245 * Similar to svcpool_destroy(), except that it does not destroy the actual
246 * data structures.  As such, "pool" may be used again.
247 */
248void
249svcpool_close(SVCPOOL *pool)
250{
251	SVCGROUP *grp;
252	int g;
253
254	svcpool_cleanup(pool);
255
256	/* Now, initialize the pool's state for a fresh svc_run() call. */
257	mtx_lock(&pool->sp_lock);
258	pool->sp_state = SVCPOOL_INIT;
259	mtx_unlock(&pool->sp_lock);
260	for (g = 0; g < SVC_MAXGROUPS; g++) {
261		grp = &pool->sp_groups[g];
262		mtx_lock(&grp->sg_lock);
263		grp->sg_state = SVCPOOL_ACTIVE;
264		mtx_unlock(&grp->sg_lock);
265	}
266}
267
268/*
269 * Sysctl handler to get the present thread count on a pool
270 */
271static int
272svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS)
273{
274	SVCPOOL *pool;
275	int threads, error, g;
276
277	pool = oidp->oid_arg1;
278	threads = 0;
279	mtx_lock(&pool->sp_lock);
280	for (g = 0; g < pool->sp_groupcount; g++)
281		threads += pool->sp_groups[g].sg_threadcount;
282	mtx_unlock(&pool->sp_lock);
283	error = sysctl_handle_int(oidp, &threads, 0, req);
284	return (error);
285}
286
287/*
288 * Sysctl handler to set the minimum thread count on a pool
289 */
290static int
291svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
292{
293	SVCPOOL *pool;
294	int newminthreads, error, g;
295
296	pool = oidp->oid_arg1;
297	newminthreads = pool->sp_minthreads;
298	error = sysctl_handle_int(oidp, &newminthreads, 0, req);
299	if (error == 0 && newminthreads != pool->sp_minthreads) {
300		if (newminthreads > pool->sp_maxthreads)
301			return (EINVAL);
302		mtx_lock(&pool->sp_lock);
303		pool->sp_minthreads = newminthreads;
304		for (g = 0; g < pool->sp_groupcount; g++) {
305			pool->sp_groups[g].sg_minthreads = max(1,
306			    pool->sp_minthreads / pool->sp_groupcount);
307		}
308		mtx_unlock(&pool->sp_lock);
309	}
310	return (error);
311}
312
313/*
314 * Sysctl handler to set the maximum thread count on a pool
315 */
316static int
317svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
318{
319	SVCPOOL *pool;
320	int newmaxthreads, error, g;
321
322	pool = oidp->oid_arg1;
323	newmaxthreads = pool->sp_maxthreads;
324	error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
325	if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
326		if (newmaxthreads < pool->sp_minthreads)
327			return (EINVAL);
328		mtx_lock(&pool->sp_lock);
329		pool->sp_maxthreads = newmaxthreads;
330		for (g = 0; g < pool->sp_groupcount; g++) {
331			pool->sp_groups[g].sg_maxthreads = max(1,
332			    pool->sp_maxthreads / pool->sp_groupcount);
333		}
334		mtx_unlock(&pool->sp_lock);
335	}
336	return (error);
337}
338
339/*
340 * Activate a transport handle.
341 */
342void
343xprt_register(SVCXPRT *xprt)
344{
345	SVCPOOL *pool = xprt->xp_pool;
346	SVCGROUP *grp;
347	int g;
348
349	SVC_ACQUIRE(xprt);
350	g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount;
351	xprt->xp_group = grp = &pool->sp_groups[g];
352	mtx_lock(&grp->sg_lock);
353	xprt->xp_registered = TRUE;
354	xprt->xp_active = FALSE;
355	TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link);
356	mtx_unlock(&grp->sg_lock);
357}
358
359/*
360 * De-activate a transport handle. Note: the locked version doesn't
361 * release the transport - caller must do that after dropping the pool
362 * lock.
363 */
364static void
365xprt_unregister_locked(SVCXPRT *xprt)
366{
367	SVCGROUP *grp = xprt->xp_group;
368
369	mtx_assert(&grp->sg_lock, MA_OWNED);
370	KASSERT(xprt->xp_registered == TRUE,
371	    ("xprt_unregister_locked: not registered"));
372	xprt_inactive_locked(xprt);
373	TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link);
374	xprt->xp_registered = FALSE;
375}
376
377void
378xprt_unregister(SVCXPRT *xprt)
379{
380	SVCGROUP *grp = xprt->xp_group;
381
382	mtx_lock(&grp->sg_lock);
383	if (xprt->xp_registered == FALSE) {
384		/* Already unregistered by another thread */
385		mtx_unlock(&grp->sg_lock);
386		return;
387	}
388	xprt_unregister_locked(xprt);
389	mtx_unlock(&grp->sg_lock);
390
391	if (xprt->xp_socket != NULL)
392		soshutdown(xprt->xp_socket, SHUT_WR);
393	SVC_RELEASE(xprt);
394}
395
396/*
397 * Attempt to assign a service thread to this transport.
398 */
399static int
400xprt_assignthread(SVCXPRT *xprt)
401{
402	SVCGROUP *grp = xprt->xp_group;
403	SVCTHREAD *st;
404
405	mtx_assert(&grp->sg_lock, MA_OWNED);
406	st = LIST_FIRST(&grp->sg_idlethreads);
407	if (st) {
408		LIST_REMOVE(st, st_ilink);
409		SVC_ACQUIRE(xprt);
410		xprt->xp_thread = st;
411		st->st_xprt = xprt;
412		cv_signal(&st->st_cond);
413		return (TRUE);
414	} else {
415		/*
416		 * See if we can create a new thread. The
417		 * actual thread creation happens in
418		 * svc_run_internal because our locking state
419		 * is poorly defined (we are typically called
420		 * from a socket upcall). Don't create more
421		 * than one thread per second.
422		 */
423		if (grp->sg_state == SVCPOOL_ACTIVE
424		    && grp->sg_lastcreatetime < time_uptime
425		    && grp->sg_threadcount < grp->sg_maxthreads) {
426			grp->sg_state = SVCPOOL_THREADWANTED;
427		}
428	}
429	return (FALSE);
430}
431
432void
433xprt_active(SVCXPRT *xprt)
434{
435	SVCGROUP *grp = xprt->xp_group;
436
437	mtx_lock(&grp->sg_lock);
438
439	if (!xprt->xp_registered) {
440		/*
441		 * Race with xprt_unregister - we lose.
442		 */
443		mtx_unlock(&grp->sg_lock);
444		return;
445	}
446
447	if (!xprt->xp_active) {
448		xprt->xp_active = TRUE;
449		if (xprt->xp_thread == NULL) {
450			if (!svc_request_space_available(xprt->xp_pool) ||
451			    !xprt_assignthread(xprt))
452				TAILQ_INSERT_TAIL(&grp->sg_active, xprt,
453				    xp_alink);
454		}
455	}
456
457	mtx_unlock(&grp->sg_lock);
458}
459
460void
461xprt_inactive_locked(SVCXPRT *xprt)
462{
463	SVCGROUP *grp = xprt->xp_group;
464
465	mtx_assert(&grp->sg_lock, MA_OWNED);
466	if (xprt->xp_active) {
467		if (xprt->xp_thread == NULL)
468			TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
469		xprt->xp_active = FALSE;
470	}
471}
472
473void
474xprt_inactive(SVCXPRT *xprt)
475{
476	SVCGROUP *grp = xprt->xp_group;
477
478	mtx_lock(&grp->sg_lock);
479	xprt_inactive_locked(xprt);
480	mtx_unlock(&grp->sg_lock);
481}
482
483/*
484 * Variant of xprt_inactive() for use only when sure that port is
485 * assigned to thread. For example, within receive handlers.
486 */
487void
488xprt_inactive_self(SVCXPRT *xprt)
489{
490
491	KASSERT(xprt->xp_thread != NULL,
492	    ("xprt_inactive_self(%p) with NULL xp_thread", xprt));
493	xprt->xp_active = FALSE;
494}
495
496/*
497 * Add a service program to the callout list.
498 * The dispatch routine will be called when a rpc request for this
499 * program number comes in.
500 */
501bool_t
502svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
503    void (*dispatch)(struct svc_req *, SVCXPRT *),
504    const struct netconfig *nconf)
505{
506	SVCPOOL *pool = xprt->xp_pool;
507	struct svc_callout *s;
508	char *netid = NULL;
509	int flag = 0;
510
511/* VARIABLES PROTECTED BY svc_lock: s, svc_head */
512
513	if (xprt->xp_netid) {
514		netid = strdup(xprt->xp_netid, M_RPC);
515		flag = 1;
516	} else if (nconf && nconf->nc_netid) {
517		netid = strdup(nconf->nc_netid, M_RPC);
518		flag = 1;
519	} /* must have been created with svc_raw_create */
520	if ((netid == NULL) && (flag == 1)) {
521		return (FALSE);
522	}
523
524	mtx_lock(&pool->sp_lock);
525	if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
526		if (netid)
527			free(netid, M_RPC);
528		if (s->sc_dispatch == dispatch)
529			goto rpcb_it; /* he is registering another xptr */
530		mtx_unlock(&pool->sp_lock);
531		return (FALSE);
532	}
533	s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
534	if (s == NULL) {
535		if (netid)
536			free(netid, M_RPC);
537		mtx_unlock(&pool->sp_lock);
538		return (FALSE);
539	}
540
541	s->sc_prog = prog;
542	s->sc_vers = vers;
543	s->sc_dispatch = dispatch;
544	s->sc_netid = netid;
545	TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
546
547	if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
548		((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
549
550rpcb_it:
551	mtx_unlock(&pool->sp_lock);
552	/* now register the information with the local binder service */
553	if (nconf) {
554		bool_t dummy;
555		struct netconfig tnc;
556		struct netbuf nb;
557		tnc = *nconf;
558		nb.buf = &xprt->xp_ltaddr;
559		nb.len = xprt->xp_ltaddr.ss_len;
560		dummy = rpcb_set(prog, vers, &tnc, &nb);
561		return (dummy);
562	}
563	return (TRUE);
564}
565
566/*
567 * Remove a service program from the callout list.
568 */
569void
570svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
571{
572	struct svc_callout *s;
573
574	/* unregister the information anyway */
575	(void) rpcb_unset(prog, vers, NULL);
576	mtx_lock(&pool->sp_lock);
577	while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
578		TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
579		if (s->sc_netid)
580			mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
581		mem_free(s, sizeof (struct svc_callout));
582	}
583	mtx_unlock(&pool->sp_lock);
584}
585
586/*
587 * Add a service connection loss program to the callout list.
588 * The dispatch routine will be called when some port in ths pool die.
589 */
590bool_t
591svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *))
592{
593	SVCPOOL *pool = xprt->xp_pool;
594	struct svc_loss_callout *s;
595
596	mtx_lock(&pool->sp_lock);
597	TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
598		if (s->slc_dispatch == dispatch)
599			break;
600	}
601	if (s != NULL) {
602		mtx_unlock(&pool->sp_lock);
603		return (TRUE);
604	}
605	s = malloc(sizeof(struct svc_loss_callout), M_RPC, M_NOWAIT);
606	if (s == NULL) {
607		mtx_unlock(&pool->sp_lock);
608		return (FALSE);
609	}
610	s->slc_dispatch = dispatch;
611	TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
612	mtx_unlock(&pool->sp_lock);
613	return (TRUE);
614}
615
616/*
617 * Remove a service connection loss program from the callout list.
618 */
619void
620svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
621{
622	struct svc_loss_callout *s;
623
624	mtx_lock(&pool->sp_lock);
625	TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
626		if (s->slc_dispatch == dispatch) {
627			TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
628			free(s, M_RPC);
629			break;
630		}
631	}
632	mtx_unlock(&pool->sp_lock);
633}
634
635/* ********************** CALLOUT list related stuff ************* */
636
637/*
638 * Search the callout list for a program number, return the callout
639 * struct.
640 */
641static struct svc_callout *
642svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
643{
644	struct svc_callout *s;
645
646	mtx_assert(&pool->sp_lock, MA_OWNED);
647	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
648		if (s->sc_prog == prog && s->sc_vers == vers
649		    && (netid == NULL || s->sc_netid == NULL ||
650			strcmp(netid, s->sc_netid) == 0))
651			break;
652	}
653
654	return (s);
655}
656
657/* ******************* REPLY GENERATION ROUTINES  ************ */
658
659static bool_t
660svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
661    struct mbuf *body)
662{
663	SVCXPRT *xprt = rqstp->rq_xprt;
664	bool_t ok;
665
666	if (rqstp->rq_args) {
667		m_freem(rqstp->rq_args);
668		rqstp->rq_args = NULL;
669	}
670
671	if (xprt->xp_pool->sp_rcache)
672		replay_setreply(xprt->xp_pool->sp_rcache,
673		    rply, svc_getrpccaller(rqstp), body);
674
675	if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
676		return (FALSE);
677
678	ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq);
679	if (rqstp->rq_addr) {
680		free(rqstp->rq_addr, M_SONAME);
681		rqstp->rq_addr = NULL;
682	}
683
684	return (ok);
685}
686
687/*
688 * Send a reply to an rpc request
689 */
690bool_t
691svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
692{
693	struct rpc_msg rply;
694	struct mbuf *m;
695	XDR xdrs;
696	bool_t ok;
697
698	rply.rm_xid = rqstp->rq_xid;
699	rply.rm_direction = REPLY;
700	rply.rm_reply.rp_stat = MSG_ACCEPTED;
701	rply.acpted_rply.ar_verf = rqstp->rq_verf;
702	rply.acpted_rply.ar_stat = SUCCESS;
703	rply.acpted_rply.ar_results.where = NULL;
704	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
705
706	m = m_getcl(M_WAITOK, MT_DATA, 0);
707	xdrmbuf_create(&xdrs, m, XDR_ENCODE);
708	ok = xdr_results(&xdrs, xdr_location);
709	XDR_DESTROY(&xdrs);
710
711	if (ok) {
712		return (svc_sendreply_common(rqstp, &rply, m));
713	} else {
714		m_freem(m);
715		return (FALSE);
716	}
717}
718
719bool_t
720svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
721{
722	struct rpc_msg rply;
723
724	rply.rm_xid = rqstp->rq_xid;
725	rply.rm_direction = REPLY;
726	rply.rm_reply.rp_stat = MSG_ACCEPTED;
727	rply.acpted_rply.ar_verf = rqstp->rq_verf;
728	rply.acpted_rply.ar_stat = SUCCESS;
729	rply.acpted_rply.ar_results.where = NULL;
730	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
731
732	return (svc_sendreply_common(rqstp, &rply, m));
733}
734
735/*
736 * No procedure error reply
737 */
738void
739svcerr_noproc(struct svc_req *rqstp)
740{
741	SVCXPRT *xprt = rqstp->rq_xprt;
742	struct rpc_msg rply;
743
744	rply.rm_xid = rqstp->rq_xid;
745	rply.rm_direction = REPLY;
746	rply.rm_reply.rp_stat = MSG_ACCEPTED;
747	rply.acpted_rply.ar_verf = rqstp->rq_verf;
748	rply.acpted_rply.ar_stat = PROC_UNAVAIL;
749
750	if (xprt->xp_pool->sp_rcache)
751		replay_setreply(xprt->xp_pool->sp_rcache,
752		    &rply, svc_getrpccaller(rqstp), NULL);
753
754	svc_sendreply_common(rqstp, &rply, NULL);
755}
756
757/*
758 * Can't decode args error reply
759 */
760void
761svcerr_decode(struct svc_req *rqstp)
762{
763	SVCXPRT *xprt = rqstp->rq_xprt;
764	struct rpc_msg rply;
765
766	rply.rm_xid = rqstp->rq_xid;
767	rply.rm_direction = REPLY;
768	rply.rm_reply.rp_stat = MSG_ACCEPTED;
769	rply.acpted_rply.ar_verf = rqstp->rq_verf;
770	rply.acpted_rply.ar_stat = GARBAGE_ARGS;
771
772	if (xprt->xp_pool->sp_rcache)
773		replay_setreply(xprt->xp_pool->sp_rcache,
774		    &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
775
776	svc_sendreply_common(rqstp, &rply, NULL);
777}
778
779/*
780 * Some system error
781 */
782void
783svcerr_systemerr(struct svc_req *rqstp)
784{
785	SVCXPRT *xprt = rqstp->rq_xprt;
786	struct rpc_msg rply;
787
788	rply.rm_xid = rqstp->rq_xid;
789	rply.rm_direction = REPLY;
790	rply.rm_reply.rp_stat = MSG_ACCEPTED;
791	rply.acpted_rply.ar_verf = rqstp->rq_verf;
792	rply.acpted_rply.ar_stat = SYSTEM_ERR;
793
794	if (xprt->xp_pool->sp_rcache)
795		replay_setreply(xprt->xp_pool->sp_rcache,
796		    &rply, svc_getrpccaller(rqstp), NULL);
797
798	svc_sendreply_common(rqstp, &rply, NULL);
799}
800
801/*
802 * Authentication error reply
803 */
804void
805svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
806{
807	SVCXPRT *xprt = rqstp->rq_xprt;
808	struct rpc_msg rply;
809
810	rply.rm_xid = rqstp->rq_xid;
811	rply.rm_direction = REPLY;
812	rply.rm_reply.rp_stat = MSG_DENIED;
813	rply.rjcted_rply.rj_stat = AUTH_ERROR;
814	rply.rjcted_rply.rj_why = why;
815
816	if (xprt->xp_pool->sp_rcache)
817		replay_setreply(xprt->xp_pool->sp_rcache,
818		    &rply, svc_getrpccaller(rqstp), NULL);
819
820	svc_sendreply_common(rqstp, &rply, NULL);
821}
822
823/*
824 * Auth too weak error reply
825 */
826void
827svcerr_weakauth(struct svc_req *rqstp)
828{
829
830	svcerr_auth(rqstp, AUTH_TOOWEAK);
831}
832
833/*
834 * Program unavailable error reply
835 */
836void
837svcerr_noprog(struct svc_req *rqstp)
838{
839	SVCXPRT *xprt = rqstp->rq_xprt;
840	struct rpc_msg rply;
841
842	rply.rm_xid = rqstp->rq_xid;
843	rply.rm_direction = REPLY;
844	rply.rm_reply.rp_stat = MSG_ACCEPTED;
845	rply.acpted_rply.ar_verf = rqstp->rq_verf;
846	rply.acpted_rply.ar_stat = PROG_UNAVAIL;
847
848	if (xprt->xp_pool->sp_rcache)
849		replay_setreply(xprt->xp_pool->sp_rcache,
850		    &rply, svc_getrpccaller(rqstp), NULL);
851
852	svc_sendreply_common(rqstp, &rply, NULL);
853}
854
855/*
856 * Program version mismatch error reply
857 */
858void
859svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
860{
861	SVCXPRT *xprt = rqstp->rq_xprt;
862	struct rpc_msg rply;
863
864	rply.rm_xid = rqstp->rq_xid;
865	rply.rm_direction = REPLY;
866	rply.rm_reply.rp_stat = MSG_ACCEPTED;
867	rply.acpted_rply.ar_verf = rqstp->rq_verf;
868	rply.acpted_rply.ar_stat = PROG_MISMATCH;
869	rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
870	rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
871
872	if (xprt->xp_pool->sp_rcache)
873		replay_setreply(xprt->xp_pool->sp_rcache,
874		    &rply, svc_getrpccaller(rqstp), NULL);
875
876	svc_sendreply_common(rqstp, &rply, NULL);
877}
878
879/*
880 * Allocate a new server transport structure. All fields are
881 * initialized to zero and xp_p3 is initialized to point at an
882 * extension structure to hold various flags and authentication
883 * parameters.
884 */
885SVCXPRT *
886svc_xprt_alloc(void)
887{
888	SVCXPRT *xprt;
889	SVCXPRT_EXT *ext;
890
891	xprt = mem_alloc(sizeof(SVCXPRT));
892	ext = mem_alloc(sizeof(SVCXPRT_EXT));
893	xprt->xp_p3 = ext;
894	refcount_init(&xprt->xp_refs, 1);
895
896	return (xprt);
897}
898
899/*
900 * Free a server transport structure.
901 */
902void
903svc_xprt_free(SVCXPRT *xprt)
904{
905
906	mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
907	/* The size argument is ignored, so 0 is ok. */
908	mem_free(xprt->xp_gidp, 0);
909	mem_free(xprt, sizeof(SVCXPRT));
910}
911
912/* ******************* SERVER INPUT STUFF ******************* */
913
914/*
915 * Read RPC requests from a transport and queue them to be
916 * executed. We handle authentication and replay cache replies here.
917 * Actually dispatching the RPC is deferred till svc_executereq.
918 */
919static enum xprt_stat
920svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
921{
922	SVCPOOL *pool = xprt->xp_pool;
923	struct svc_req *r;
924	struct rpc_msg msg;
925	struct mbuf *args;
926	struct svc_loss_callout *s;
927	enum xprt_stat stat;
928
929	/* now receive msgs from xprtprt (support batch calls) */
930	r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
931
932	msg.rm_call.cb_cred.oa_base = r->rq_credarea;
933	msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
934	r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
935	if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
936		enum auth_stat why;
937
938		/*
939		 * Handle replays and authenticate before queuing the
940		 * request to be executed.
941		 */
942		SVC_ACQUIRE(xprt);
943		r->rq_xprt = xprt;
944		if (pool->sp_rcache) {
945			struct rpc_msg repmsg;
946			struct mbuf *repbody;
947			enum replay_state rs;
948			rs = replay_find(pool->sp_rcache, &msg,
949			    svc_getrpccaller(r), &repmsg, &repbody);
950			switch (rs) {
951			case RS_NEW:
952				break;
953			case RS_DONE:
954				SVC_REPLY(xprt, &repmsg, r->rq_addr,
955				    repbody, &r->rq_reply_seq);
956				if (r->rq_addr) {
957					free(r->rq_addr, M_SONAME);
958					r->rq_addr = NULL;
959				}
960				m_freem(args);
961				goto call_done;
962
963			default:
964				m_freem(args);
965				goto call_done;
966			}
967		}
968
969		r->rq_xid = msg.rm_xid;
970		r->rq_prog = msg.rm_call.cb_prog;
971		r->rq_vers = msg.rm_call.cb_vers;
972		r->rq_proc = msg.rm_call.cb_proc;
973		r->rq_size = sizeof(*r) + m_length(args, NULL);
974		r->rq_args = args;
975		if ((why = _authenticate(r, &msg)) != AUTH_OK) {
976			/*
977			 * RPCSEC_GSS uses this return code
978			 * for requests that form part of its
979			 * context establishment protocol and
980			 * should not be dispatched to the
981			 * application.
982			 */
983			if (why != RPCSEC_GSS_NODISPATCH)
984				svcerr_auth(r, why);
985			goto call_done;
986		}
987
988		if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
989			svcerr_decode(r);
990			goto call_done;
991		}
992
993		/*
994		 * Defer enabling DDP until the first non-NULLPROC RPC
995		 * is received to allow STARTTLS authentication to
996		 * enable TLS offload first.
997		 */
998		if (xprt->xp_doneddp == 0 && r->rq_proc != NULLPROC &&
999		    xprt->xp_socket != NULL &&
1000		    atomic_cmpset_int(&xprt->xp_doneddp, 0, 1)) {
1001			if (xprt->xp_socket->so_proto->pr_protocol ==
1002			    IPPROTO_TCP) {
1003				int optval = 1;
1004
1005				(void)so_setsockopt(xprt->xp_socket,
1006				    IPPROTO_TCP, TCP_USE_DDP, &optval,
1007				    sizeof(optval));
1008			}
1009		}
1010
1011		/*
1012		 * Everything checks out, return request to caller.
1013		 */
1014		*rqstp_ret = r;
1015		r = NULL;
1016	}
1017call_done:
1018	if (r) {
1019		svc_freereq(r);
1020		r = NULL;
1021	}
1022	if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
1023		TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
1024			(*s->slc_dispatch)(xprt);
1025		xprt_unregister(xprt);
1026	}
1027
1028	return (stat);
1029}
1030
1031static void
1032svc_executereq(struct svc_req *rqstp)
1033{
1034	SVCXPRT *xprt = rqstp->rq_xprt;
1035	SVCPOOL *pool = xprt->xp_pool;
1036	int prog_found;
1037	rpcvers_t low_vers;
1038	rpcvers_t high_vers;
1039	struct svc_callout *s;
1040
1041	/* now match message with a registered service*/
1042	prog_found = FALSE;
1043	low_vers = (rpcvers_t) -1L;
1044	high_vers = (rpcvers_t) 0L;
1045	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
1046		if (s->sc_prog == rqstp->rq_prog) {
1047			if (s->sc_vers == rqstp->rq_vers) {
1048				/*
1049				 * We hand ownership of r to the
1050				 * dispatch method - they must call
1051				 * svc_freereq.
1052				 */
1053				(*s->sc_dispatch)(rqstp, xprt);
1054				return;
1055			}  /* found correct version */
1056			prog_found = TRUE;
1057			if (s->sc_vers < low_vers)
1058				low_vers = s->sc_vers;
1059			if (s->sc_vers > high_vers)
1060				high_vers = s->sc_vers;
1061		}   /* found correct program */
1062	}
1063
1064	/*
1065	 * if we got here, the program or version
1066	 * is not served ...
1067	 */
1068	if (prog_found)
1069		svcerr_progvers(rqstp, low_vers, high_vers);
1070	else
1071		svcerr_noprog(rqstp);
1072
1073	svc_freereq(rqstp);
1074}
1075
1076static void
1077svc_checkidle(SVCGROUP *grp)
1078{
1079	SVCXPRT *xprt, *nxprt;
1080	time_t timo;
1081	struct svcxprt_list cleanup;
1082
1083	TAILQ_INIT(&cleanup);
1084	TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) {
1085		/*
1086		 * Only some transports have idle timers. Don't time
1087		 * something out which is just waking up.
1088		 */
1089		if (!xprt->xp_idletimeout || xprt->xp_thread)
1090			continue;
1091
1092		timo = xprt->xp_lastactive + xprt->xp_idletimeout;
1093		if (time_uptime > timo) {
1094			xprt_unregister_locked(xprt);
1095			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
1096		}
1097	}
1098
1099	mtx_unlock(&grp->sg_lock);
1100	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
1101		soshutdown(xprt->xp_socket, SHUT_WR);
1102		SVC_RELEASE(xprt);
1103	}
1104	mtx_lock(&grp->sg_lock);
1105}
1106
1107static void
1108svc_assign_waiting_sockets(SVCPOOL *pool)
1109{
1110	SVCGROUP *grp;
1111	SVCXPRT *xprt;
1112	int g;
1113
1114	for (g = 0; g < pool->sp_groupcount; g++) {
1115		grp = &pool->sp_groups[g];
1116		mtx_lock(&grp->sg_lock);
1117		while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1118			if (xprt_assignthread(xprt))
1119				TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1120			else
1121				break;
1122		}
1123		mtx_unlock(&grp->sg_lock);
1124	}
1125}
1126
1127static void
1128svc_change_space_used(SVCPOOL *pool, long delta)
1129{
1130	unsigned long value;
1131
1132	value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta;
1133	if (delta > 0) {
1134		if (value >= pool->sp_space_high && !pool->sp_space_throttled) {
1135			pool->sp_space_throttled = TRUE;
1136			pool->sp_space_throttle_count++;
1137		}
1138		if (value > pool->sp_space_used_highest)
1139			pool->sp_space_used_highest = value;
1140	} else {
1141		if (value < pool->sp_space_low && pool->sp_space_throttled) {
1142			pool->sp_space_throttled = FALSE;
1143			svc_assign_waiting_sockets(pool);
1144		}
1145	}
1146}
1147
1148static bool_t
1149svc_request_space_available(SVCPOOL *pool)
1150{
1151
1152	if (pool->sp_space_throttled)
1153		return (FALSE);
1154	return (TRUE);
1155}
1156
1157static void
1158svc_run_internal(SVCGROUP *grp, bool_t ismaster)
1159{
1160	SVCPOOL *pool = grp->sg_pool;
1161	SVCTHREAD *st, *stpref;
1162	SVCXPRT *xprt;
1163	enum xprt_stat stat;
1164	struct svc_req *rqstp;
1165	struct proc *p;
1166	long sz;
1167	int error;
1168
1169	st = mem_alloc(sizeof(*st));
1170	mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF);
1171	st->st_pool = pool;
1172	st->st_xprt = NULL;
1173	STAILQ_INIT(&st->st_reqs);
1174	cv_init(&st->st_cond, "rpcsvc");
1175
1176	mtx_lock(&grp->sg_lock);
1177
1178	/*
1179	 * If we are a new thread which was spawned to cope with
1180	 * increased load, set the state back to SVCPOOL_ACTIVE.
1181	 */
1182	if (grp->sg_state == SVCPOOL_THREADSTARTING)
1183		grp->sg_state = SVCPOOL_ACTIVE;
1184
1185	while (grp->sg_state != SVCPOOL_CLOSING) {
1186		/*
1187		 * Create new thread if requested.
1188		 */
1189		if (grp->sg_state == SVCPOOL_THREADWANTED) {
1190			grp->sg_state = SVCPOOL_THREADSTARTING;
1191			grp->sg_lastcreatetime = time_uptime;
1192			mtx_unlock(&grp->sg_lock);
1193			svc_new_thread(grp);
1194			mtx_lock(&grp->sg_lock);
1195			continue;
1196		}
1197
1198		/*
1199		 * Check for idle transports once per second.
1200		 */
1201		if (time_uptime > grp->sg_lastidlecheck) {
1202			grp->sg_lastidlecheck = time_uptime;
1203			svc_checkidle(grp);
1204		}
1205
1206		xprt = st->st_xprt;
1207		if (!xprt) {
1208			/*
1209			 * Enforce maxthreads count.
1210			 */
1211			if (!ismaster && grp->sg_threadcount >
1212			    grp->sg_maxthreads)
1213				break;
1214
1215			/*
1216			 * Before sleeping, see if we can find an
1217			 * active transport which isn't being serviced
1218			 * by a thread.
1219			 */
1220			if (svc_request_space_available(pool) &&
1221			    (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1222				TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1223				SVC_ACQUIRE(xprt);
1224				xprt->xp_thread = st;
1225				st->st_xprt = xprt;
1226				continue;
1227			}
1228
1229			LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink);
1230			if (ismaster || (!ismaster &&
1231			    grp->sg_threadcount > grp->sg_minthreads))
1232				error = cv_timedwait_sig(&st->st_cond,
1233				    &grp->sg_lock, 5 * hz);
1234			else
1235				error = cv_wait_sig(&st->st_cond,
1236				    &grp->sg_lock);
1237			if (st->st_xprt == NULL)
1238				LIST_REMOVE(st, st_ilink);
1239
1240			/*
1241			 * Reduce worker thread count when idle.
1242			 */
1243			if (error == EWOULDBLOCK) {
1244				if (!ismaster
1245				    && (grp->sg_threadcount
1246					> grp->sg_minthreads)
1247					&& !st->st_xprt)
1248					break;
1249			} else if (error != 0) {
1250				KASSERT(error == EINTR || error == ERESTART,
1251				    ("non-signal error %d", error));
1252				mtx_unlock(&grp->sg_lock);
1253				p = curproc;
1254				PROC_LOCK(p);
1255				if (P_SHOULDSTOP(p) ||
1256				    (p->p_flag & P_TOTAL_STOP) != 0) {
1257					thread_suspend_check(0);
1258					PROC_UNLOCK(p);
1259					mtx_lock(&grp->sg_lock);
1260				} else {
1261					PROC_UNLOCK(p);
1262					svc_exit(pool);
1263					mtx_lock(&grp->sg_lock);
1264					break;
1265				}
1266			}
1267			continue;
1268		}
1269		mtx_unlock(&grp->sg_lock);
1270
1271		/*
1272		 * Drain the transport socket and queue up any RPCs.
1273		 */
1274		xprt->xp_lastactive = time_uptime;
1275		do {
1276			if (!svc_request_space_available(pool))
1277				break;
1278			rqstp = NULL;
1279			stat = svc_getreq(xprt, &rqstp);
1280			if (rqstp) {
1281				svc_change_space_used(pool, rqstp->rq_size);
1282				/*
1283				 * See if the application has a preference
1284				 * for some other thread.
1285				 */
1286				if (pool->sp_assign) {
1287					stpref = pool->sp_assign(st, rqstp);
1288					rqstp->rq_thread = stpref;
1289					STAILQ_INSERT_TAIL(&stpref->st_reqs,
1290					    rqstp, rq_link);
1291					mtx_unlock(&stpref->st_lock);
1292					if (stpref != st)
1293						rqstp = NULL;
1294				} else {
1295					rqstp->rq_thread = st;
1296					STAILQ_INSERT_TAIL(&st->st_reqs,
1297					    rqstp, rq_link);
1298				}
1299			}
1300		} while (rqstp == NULL && stat == XPRT_MOREREQS
1301		    && grp->sg_state != SVCPOOL_CLOSING);
1302
1303		/*
1304		 * Move this transport to the end of the active list to
1305		 * ensure fairness when multiple transports are active.
1306		 * If this was the last queued request, svc_getreq will end
1307		 * up calling xprt_inactive to remove from the active list.
1308		 */
1309		mtx_lock(&grp->sg_lock);
1310		xprt->xp_thread = NULL;
1311		st->st_xprt = NULL;
1312		if (xprt->xp_active) {
1313			if (!svc_request_space_available(pool) ||
1314			    !xprt_assignthread(xprt))
1315				TAILQ_INSERT_TAIL(&grp->sg_active,
1316				    xprt, xp_alink);
1317		}
1318		mtx_unlock(&grp->sg_lock);
1319		SVC_RELEASE(xprt);
1320
1321		/*
1322		 * Execute what we have queued.
1323		 */
1324		mtx_lock(&st->st_lock);
1325		while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1326			STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1327			mtx_unlock(&st->st_lock);
1328			sz = (long)rqstp->rq_size;
1329			svc_executereq(rqstp);
1330			svc_change_space_used(pool, -sz);
1331			mtx_lock(&st->st_lock);
1332		}
1333		mtx_unlock(&st->st_lock);
1334		mtx_lock(&grp->sg_lock);
1335	}
1336
1337	if (st->st_xprt) {
1338		xprt = st->st_xprt;
1339		st->st_xprt = NULL;
1340		SVC_RELEASE(xprt);
1341	}
1342	KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1343	mtx_destroy(&st->st_lock);
1344	cv_destroy(&st->st_cond);
1345	mem_free(st, sizeof(*st));
1346
1347	grp->sg_threadcount--;
1348	if (!ismaster)
1349		wakeup(grp);
1350	mtx_unlock(&grp->sg_lock);
1351}
1352
1353static void
1354svc_thread_start(void *arg)
1355{
1356
1357	svc_run_internal((SVCGROUP *) arg, FALSE);
1358	kthread_exit();
1359}
1360
1361static void
1362svc_new_thread(SVCGROUP *grp)
1363{
1364	SVCPOOL *pool = grp->sg_pool;
1365	struct thread *td;
1366
1367	mtx_lock(&grp->sg_lock);
1368	grp->sg_threadcount++;
1369	mtx_unlock(&grp->sg_lock);
1370	kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0,
1371	    "%s: service", pool->sp_name);
1372}
1373
1374void
1375svc_run(SVCPOOL *pool)
1376{
1377	int g, i;
1378	struct proc *p;
1379	struct thread *td;
1380	SVCGROUP *grp;
1381
1382	p = curproc;
1383	td = curthread;
1384	snprintf(td->td_name, sizeof(td->td_name),
1385	    "%s: master", pool->sp_name);
1386	pool->sp_state = SVCPOOL_ACTIVE;
1387	pool->sp_proc = p;
1388
1389	/* Choose group count based on number of threads and CPUs. */
1390	pool->sp_groupcount = max(1, min(SVC_MAXGROUPS,
1391	    min(pool->sp_maxthreads / 2, mp_ncpus) / 6));
1392	for (g = 0; g < pool->sp_groupcount; g++) {
1393		grp = &pool->sp_groups[g];
1394		grp->sg_minthreads = max(1,
1395		    pool->sp_minthreads / pool->sp_groupcount);
1396		grp->sg_maxthreads = max(1,
1397		    pool->sp_maxthreads / pool->sp_groupcount);
1398		grp->sg_lastcreatetime = time_uptime;
1399	}
1400
1401	/* Starting threads */
1402	pool->sp_groups[0].sg_threadcount++;
1403	for (g = 0; g < pool->sp_groupcount; g++) {
1404		grp = &pool->sp_groups[g];
1405		for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++)
1406			svc_new_thread(grp);
1407	}
1408	svc_run_internal(&pool->sp_groups[0], TRUE);
1409
1410	/* Waiting for threads to stop. */
1411	for (g = 0; g < pool->sp_groupcount; g++) {
1412		grp = &pool->sp_groups[g];
1413		mtx_lock(&grp->sg_lock);
1414		while (grp->sg_threadcount > 0)
1415			msleep(grp, &grp->sg_lock, 0, "svcexit", 0);
1416		mtx_unlock(&grp->sg_lock);
1417	}
1418}
1419
1420void
1421svc_exit(SVCPOOL *pool)
1422{
1423	SVCGROUP *grp;
1424	SVCTHREAD *st;
1425	int g;
1426
1427	pool->sp_state = SVCPOOL_CLOSING;
1428	for (g = 0; g < pool->sp_groupcount; g++) {
1429		grp = &pool->sp_groups[g];
1430		mtx_lock(&grp->sg_lock);
1431		if (grp->sg_state != SVCPOOL_CLOSING) {
1432			grp->sg_state = SVCPOOL_CLOSING;
1433			LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink)
1434				cv_signal(&st->st_cond);
1435		}
1436		mtx_unlock(&grp->sg_lock);
1437	}
1438}
1439
1440bool_t
1441svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1442{
1443	struct mbuf *m;
1444	XDR xdrs;
1445	bool_t stat;
1446
1447	m = rqstp->rq_args;
1448	rqstp->rq_args = NULL;
1449
1450	xdrmbuf_create(&xdrs, m, XDR_DECODE);
1451	stat = xargs(&xdrs, args);
1452	XDR_DESTROY(&xdrs);
1453
1454	return (stat);
1455}
1456
1457bool_t
1458svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1459{
1460	XDR xdrs;
1461
1462	if (rqstp->rq_addr) {
1463		free(rqstp->rq_addr, M_SONAME);
1464		rqstp->rq_addr = NULL;
1465	}
1466
1467	xdrs.x_op = XDR_FREE;
1468	return (xargs(&xdrs, args));
1469}
1470
1471void
1472svc_freereq(struct svc_req *rqstp)
1473{
1474	SVCTHREAD *st;
1475	SVCPOOL *pool;
1476
1477	st = rqstp->rq_thread;
1478	if (st) {
1479		pool = st->st_pool;
1480		if (pool->sp_done)
1481			pool->sp_done(st, rqstp);
1482	}
1483
1484	if (rqstp->rq_auth.svc_ah_ops)
1485		SVCAUTH_RELEASE(&rqstp->rq_auth);
1486
1487	if (rqstp->rq_xprt) {
1488		SVC_RELEASE(rqstp->rq_xprt);
1489	}
1490
1491	if (rqstp->rq_addr)
1492		free(rqstp->rq_addr, M_SONAME);
1493
1494	if (rqstp->rq_args)
1495		m_freem(rqstp->rq_args);
1496
1497	free(rqstp, M_RPC);
1498}
1499