1/*	$NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $	*/
2
3/*-
4 * Copyright (c) 2009, Sun Microsystems, Inc.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 * - Redistributions of source code must retain the above copyright notice,
10 *   this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright notice,
12 *   this list of conditions and the following disclaimer in the documentation
13 *   and/or other materials provided with the distribution.
14 * - Neither the name of Sun Microsystems, Inc. nor the names of its
15 *   contributors may be used to endorse or promote products derived
16 *   from this software without specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 */
30
31#if defined(LIBC_SCCS) && !defined(lint)
32static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
33static char *sccsid = "@(#)svc.c	2.4 88/08/11 4.0 RPCSRC";
34#endif
35#include <sys/cdefs.h>
36__FBSDID("$FreeBSD: stable/10/sys/rpc/svc.c 336928 2018-07-30 19:29:31Z rmacklem $");
37
38/*
39 * svc.c, Server-side remote procedure call interface.
40 *
41 * There are two sets of procedures here.  The xprt routines are
42 * for handling transport handles.  The svc routines handle the
43 * list of service routines.
44 *
45 * Copyright (C) 1984, Sun Microsystems, Inc.
46 */
47
48#include <sys/param.h>
49#include <sys/lock.h>
50#include <sys/kernel.h>
51#include <sys/kthread.h>
52#include <sys/malloc.h>
53#include <sys/mbuf.h>
54#include <sys/mutex.h>
55#include <sys/proc.h>
56#include <sys/queue.h>
57#include <sys/socketvar.h>
58#include <sys/systm.h>
59#include <sys/smp.h>
60#include <sys/sx.h>
61#include <sys/ucred.h>
62
63#include <rpc/rpc.h>
64#include <rpc/rpcb_clnt.h>
65#include <rpc/replay.h>
66
67#include <rpc/rpc_com.h>
68
69#define SVC_VERSQUIET 0x0001		/* keep quiet about vers mismatch */
70#define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
71
72static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
73    char *);
74static void svc_new_thread(SVCGROUP *grp);
75static void xprt_unregister_locked(SVCXPRT *xprt);
76static void svc_change_space_used(SVCPOOL *pool, long delta);
77static bool_t svc_request_space_available(SVCPOOL *pool);
78static void svcpool_cleanup(SVCPOOL *pool);
79
80/* ***************  SVCXPRT related stuff **************** */
81
82static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
83static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
84static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS);
85
86SVCPOOL*
87svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
88{
89	SVCPOOL *pool;
90	SVCGROUP *grp;
91	int g;
92
93	pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
94
95	mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
96	pool->sp_name = name;
97	pool->sp_state = SVCPOOL_INIT;
98	pool->sp_proc = NULL;
99	TAILQ_INIT(&pool->sp_callouts);
100	TAILQ_INIT(&pool->sp_lcallouts);
101	pool->sp_minthreads = 1;
102	pool->sp_maxthreads = 1;
103	pool->sp_groupcount = 1;
104	for (g = 0; g < SVC_MAXGROUPS; g++) {
105		grp = &pool->sp_groups[g];
106		mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF);
107		grp->sg_pool = pool;
108		grp->sg_state = SVCPOOL_ACTIVE;
109		TAILQ_INIT(&grp->sg_xlist);
110		TAILQ_INIT(&grp->sg_active);
111		LIST_INIT(&grp->sg_idlethreads);
112		grp->sg_minthreads = 1;
113		grp->sg_maxthreads = 1;
114	}
115
116	/*
117	 * Don't use more than a quarter of mbuf clusters.  Nota bene:
118	 * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow
119	 * on LP64 architectures, so cast to u_long to avoid undefined
120	 * behavior.  (ILP32 architectures cannot have nmbclusters
121	 * large enough to overflow for other reasons.)
122	 */
123	pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4;
124	pool->sp_space_low = (pool->sp_space_high / 3) * 2;
125
126	sysctl_ctx_init(&pool->sp_sysctl);
127	if (sysctl_base) {
128		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
129		    "minthreads", CTLTYPE_INT | CTLFLAG_RW,
130		    pool, 0, svcpool_minthread_sysctl, "I",
131		    "Minimal number of threads");
132		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
133		    "maxthreads", CTLTYPE_INT | CTLFLAG_RW,
134		    pool, 0, svcpool_maxthread_sysctl, "I",
135		    "Maximal number of threads");
136		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
137		    "threads", CTLTYPE_INT | CTLFLAG_RD,
138		    pool, 0, svcpool_threads_sysctl, "I",
139		    "Current number of threads");
140		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
141		    "groups", CTLFLAG_RD, &pool->sp_groupcount, 0,
142		    "Number of thread groups");
143
144		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
145		    "request_space_used", CTLFLAG_RD,
146		    &pool->sp_space_used,
147		    "Space in parsed but not handled requests.");
148
149		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
150		    "request_space_used_highest", CTLFLAG_RD,
151		    &pool->sp_space_used_highest,
152		    "Highest space used since reboot.");
153
154		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
155		    "request_space_high", CTLFLAG_RW,
156		    &pool->sp_space_high,
157		    "Maximum space in parsed but not handled requests.");
158
159		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
160		    "request_space_low", CTLFLAG_RW,
161		    &pool->sp_space_low,
162		    "Low water mark for request space.");
163
164		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
165		    "request_space_throttled", CTLFLAG_RD,
166		    &pool->sp_space_throttled, 0,
167		    "Whether nfs requests are currently throttled");
168
169		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
170		    "request_space_throttle_count", CTLFLAG_RD,
171		    &pool->sp_space_throttle_count, 0,
172		    "Count of times throttling based on request space has occurred");
173	}
174
175	return pool;
176}
177
178/*
179 * Code common to svcpool_destroy() and svcpool_close(), which cleans up
180 * the pool data structures.
181 */
182static void
183svcpool_cleanup(SVCPOOL *pool)
184{
185	SVCGROUP *grp;
186	SVCXPRT *xprt, *nxprt;
187	struct svc_callout *s;
188	struct svc_loss_callout *sl;
189	struct svcxprt_list cleanup;
190	int g;
191
192	TAILQ_INIT(&cleanup);
193
194	for (g = 0; g < SVC_MAXGROUPS; g++) {
195		grp = &pool->sp_groups[g];
196		mtx_lock(&grp->sg_lock);
197		while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) {
198			xprt_unregister_locked(xprt);
199			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
200		}
201		mtx_unlock(&grp->sg_lock);
202	}
203	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
204		SVC_RELEASE(xprt);
205	}
206
207	mtx_lock(&pool->sp_lock);
208	while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
209		mtx_unlock(&pool->sp_lock);
210		svc_unreg(pool, s->sc_prog, s->sc_vers);
211		mtx_lock(&pool->sp_lock);
212	}
213	while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
214		mtx_unlock(&pool->sp_lock);
215		svc_loss_unreg(pool, sl->slc_dispatch);
216		mtx_lock(&pool->sp_lock);
217	}
218	mtx_unlock(&pool->sp_lock);
219}
220
221void
222svcpool_destroy(SVCPOOL *pool)
223{
224	SVCGROUP *grp;
225	int g;
226
227	svcpool_cleanup(pool);
228
229	for (g = 0; g < SVC_MAXGROUPS; g++) {
230		grp = &pool->sp_groups[g];
231		mtx_destroy(&grp->sg_lock);
232	}
233	mtx_destroy(&pool->sp_lock);
234
235	if (pool->sp_rcache)
236		replay_freecache(pool->sp_rcache);
237
238	sysctl_ctx_free(&pool->sp_sysctl);
239	free(pool, M_RPC);
240}
241
242/*
243 * Similar to svcpool_destroy(), except that it does not destroy the actual
244 * data structures.  As such, "pool" may be used again.
245 */
246void
247svcpool_close(SVCPOOL *pool)
248{
249	SVCGROUP *grp;
250	int g;
251
252	svcpool_cleanup(pool);
253
254	/* Now, initialize the pool's state for a fresh svc_run() call. */
255	mtx_lock(&pool->sp_lock);
256	pool->sp_state = SVCPOOL_INIT;
257	mtx_unlock(&pool->sp_lock);
258	for (g = 0; g < SVC_MAXGROUPS; g++) {
259		grp = &pool->sp_groups[g];
260		mtx_lock(&grp->sg_lock);
261		grp->sg_state = SVCPOOL_ACTIVE;
262		mtx_unlock(&grp->sg_lock);
263	}
264}
265
266/*
267 * Sysctl handler to get the present thread count on a pool
268 */
269static int
270svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS)
271{
272	SVCPOOL *pool;
273	int threads, error, g;
274
275	pool = oidp->oid_arg1;
276	threads = 0;
277	mtx_lock(&pool->sp_lock);
278	for (g = 0; g < pool->sp_groupcount; g++)
279		threads += pool->sp_groups[g].sg_threadcount;
280	mtx_unlock(&pool->sp_lock);
281	error = sysctl_handle_int(oidp, &threads, 0, req);
282	return (error);
283}
284
285/*
286 * Sysctl handler to set the minimum thread count on a pool
287 */
288static int
289svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
290{
291	SVCPOOL *pool;
292	int newminthreads, error, g;
293
294	pool = oidp->oid_arg1;
295	newminthreads = pool->sp_minthreads;
296	error = sysctl_handle_int(oidp, &newminthreads, 0, req);
297	if (error == 0 && newminthreads != pool->sp_minthreads) {
298		if (newminthreads > pool->sp_maxthreads)
299			return (EINVAL);
300		mtx_lock(&pool->sp_lock);
301		pool->sp_minthreads = newminthreads;
302		for (g = 0; g < pool->sp_groupcount; g++) {
303			pool->sp_groups[g].sg_minthreads = max(1,
304			    pool->sp_minthreads / pool->sp_groupcount);
305		}
306		mtx_unlock(&pool->sp_lock);
307	}
308	return (error);
309}
310
311/*
312 * Sysctl handler to set the maximum thread count on a pool
313 */
314static int
315svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
316{
317	SVCPOOL *pool;
318	int newmaxthreads, error, g;
319
320	pool = oidp->oid_arg1;
321	newmaxthreads = pool->sp_maxthreads;
322	error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
323	if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
324		if (newmaxthreads < pool->sp_minthreads)
325			return (EINVAL);
326		mtx_lock(&pool->sp_lock);
327		pool->sp_maxthreads = newmaxthreads;
328		for (g = 0; g < pool->sp_groupcount; g++) {
329			pool->sp_groups[g].sg_maxthreads = max(1,
330			    pool->sp_maxthreads / pool->sp_groupcount);
331		}
332		mtx_unlock(&pool->sp_lock);
333	}
334	return (error);
335}
336
337/*
338 * Activate a transport handle.
339 */
340void
341xprt_register(SVCXPRT *xprt)
342{
343	SVCPOOL *pool = xprt->xp_pool;
344	SVCGROUP *grp;
345	int g;
346
347	SVC_ACQUIRE(xprt);
348	g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount;
349	xprt->xp_group = grp = &pool->sp_groups[g];
350	mtx_lock(&grp->sg_lock);
351	xprt->xp_registered = TRUE;
352	xprt->xp_active = FALSE;
353	TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link);
354	mtx_unlock(&grp->sg_lock);
355}
356
357/*
358 * De-activate a transport handle. Note: the locked version doesn't
359 * release the transport - caller must do that after dropping the pool
360 * lock.
361 */
362static void
363xprt_unregister_locked(SVCXPRT *xprt)
364{
365	SVCGROUP *grp = xprt->xp_group;
366
367	mtx_assert(&grp->sg_lock, MA_OWNED);
368	KASSERT(xprt->xp_registered == TRUE,
369	    ("xprt_unregister_locked: not registered"));
370	xprt_inactive_locked(xprt);
371	TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link);
372	xprt->xp_registered = FALSE;
373}
374
375void
376xprt_unregister(SVCXPRT *xprt)
377{
378	SVCGROUP *grp = xprt->xp_group;
379
380	mtx_lock(&grp->sg_lock);
381	if (xprt->xp_registered == FALSE) {
382		/* Already unregistered by another thread */
383		mtx_unlock(&grp->sg_lock);
384		return;
385	}
386	xprt_unregister_locked(xprt);
387	mtx_unlock(&grp->sg_lock);
388
389	SVC_RELEASE(xprt);
390}
391
392/*
393 * Attempt to assign a service thread to this transport.
394 */
395static int
396xprt_assignthread(SVCXPRT *xprt)
397{
398	SVCGROUP *grp = xprt->xp_group;
399	SVCTHREAD *st;
400
401	mtx_assert(&grp->sg_lock, MA_OWNED);
402	st = LIST_FIRST(&grp->sg_idlethreads);
403	if (st) {
404		LIST_REMOVE(st, st_ilink);
405		SVC_ACQUIRE(xprt);
406		xprt->xp_thread = st;
407		st->st_xprt = xprt;
408		cv_signal(&st->st_cond);
409		return (TRUE);
410	} else {
411		/*
412		 * See if we can create a new thread. The
413		 * actual thread creation happens in
414		 * svc_run_internal because our locking state
415		 * is poorly defined (we are typically called
416		 * from a socket upcall). Don't create more
417		 * than one thread per second.
418		 */
419		if (grp->sg_state == SVCPOOL_ACTIVE
420		    && grp->sg_lastcreatetime < time_uptime
421		    && grp->sg_threadcount < grp->sg_maxthreads) {
422			grp->sg_state = SVCPOOL_THREADWANTED;
423		}
424	}
425	return (FALSE);
426}
427
428void
429xprt_active(SVCXPRT *xprt)
430{
431	SVCGROUP *grp = xprt->xp_group;
432
433	mtx_lock(&grp->sg_lock);
434
435	if (!xprt->xp_registered) {
436		/*
437		 * Race with xprt_unregister - we lose.
438		 */
439		mtx_unlock(&grp->sg_lock);
440		return;
441	}
442
443	if (!xprt->xp_active) {
444		xprt->xp_active = TRUE;
445		if (xprt->xp_thread == NULL) {
446			if (!svc_request_space_available(xprt->xp_pool) ||
447			    !xprt_assignthread(xprt))
448				TAILQ_INSERT_TAIL(&grp->sg_active, xprt,
449				    xp_alink);
450		}
451	}
452
453	mtx_unlock(&grp->sg_lock);
454}
455
456void
457xprt_inactive_locked(SVCXPRT *xprt)
458{
459	SVCGROUP *grp = xprt->xp_group;
460
461	mtx_assert(&grp->sg_lock, MA_OWNED);
462	if (xprt->xp_active) {
463		if (xprt->xp_thread == NULL)
464			TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
465		xprt->xp_active = FALSE;
466	}
467}
468
469void
470xprt_inactive(SVCXPRT *xprt)
471{
472	SVCGROUP *grp = xprt->xp_group;
473
474	mtx_lock(&grp->sg_lock);
475	xprt_inactive_locked(xprt);
476	mtx_unlock(&grp->sg_lock);
477}
478
479/*
480 * Variant of xprt_inactive() for use only when sure that port is
481 * assigned to thread. For example, withing receive handlers.
482 */
483void
484xprt_inactive_self(SVCXPRT *xprt)
485{
486
487	KASSERT(xprt->xp_thread != NULL,
488	    ("xprt_inactive_self(%p) with NULL xp_thread", xprt));
489	xprt->xp_active = FALSE;
490}
491
492/*
493 * Add a service program to the callout list.
494 * The dispatch routine will be called when a rpc request for this
495 * program number comes in.
496 */
497bool_t
498svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
499    void (*dispatch)(struct svc_req *, SVCXPRT *),
500    const struct netconfig *nconf)
501{
502	SVCPOOL *pool = xprt->xp_pool;
503	struct svc_callout *s;
504	char *netid = NULL;
505	int flag = 0;
506
507/* VARIABLES PROTECTED BY svc_lock: s, svc_head */
508
509	if (xprt->xp_netid) {
510		netid = strdup(xprt->xp_netid, M_RPC);
511		flag = 1;
512	} else if (nconf && nconf->nc_netid) {
513		netid = strdup(nconf->nc_netid, M_RPC);
514		flag = 1;
515	} /* must have been created with svc_raw_create */
516	if ((netid == NULL) && (flag == 1)) {
517		return (FALSE);
518	}
519
520	mtx_lock(&pool->sp_lock);
521	if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
522		if (netid)
523			free(netid, M_RPC);
524		if (s->sc_dispatch == dispatch)
525			goto rpcb_it; /* he is registering another xptr */
526		mtx_unlock(&pool->sp_lock);
527		return (FALSE);
528	}
529	s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
530	if (s == NULL) {
531		if (netid)
532			free(netid, M_RPC);
533		mtx_unlock(&pool->sp_lock);
534		return (FALSE);
535	}
536
537	s->sc_prog = prog;
538	s->sc_vers = vers;
539	s->sc_dispatch = dispatch;
540	s->sc_netid = netid;
541	TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
542
543	if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
544		((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
545
546rpcb_it:
547	mtx_unlock(&pool->sp_lock);
548	/* now register the information with the local binder service */
549	if (nconf) {
550		bool_t dummy;
551		struct netconfig tnc;
552		struct netbuf nb;
553		tnc = *nconf;
554		nb.buf = &xprt->xp_ltaddr;
555		nb.len = xprt->xp_ltaddr.ss_len;
556		dummy = rpcb_set(prog, vers, &tnc, &nb);
557		return (dummy);
558	}
559	return (TRUE);
560}
561
562/*
563 * Remove a service program from the callout list.
564 */
565void
566svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
567{
568	struct svc_callout *s;
569
570	/* unregister the information anyway */
571	(void) rpcb_unset(prog, vers, NULL);
572	mtx_lock(&pool->sp_lock);
573	while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
574		TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
575		if (s->sc_netid)
576			mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
577		mem_free(s, sizeof (struct svc_callout));
578	}
579	mtx_unlock(&pool->sp_lock);
580}
581
582/*
583 * Add a service connection loss program to the callout list.
584 * The dispatch routine will be called when some port in ths pool die.
585 */
586bool_t
587svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *))
588{
589	SVCPOOL *pool = xprt->xp_pool;
590	struct svc_loss_callout *s;
591
592	mtx_lock(&pool->sp_lock);
593	TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
594		if (s->slc_dispatch == dispatch)
595			break;
596	}
597	if (s != NULL) {
598		mtx_unlock(&pool->sp_lock);
599		return (TRUE);
600	}
601	s = malloc(sizeof(struct svc_loss_callout), M_RPC, M_NOWAIT);
602	if (s == NULL) {
603		mtx_unlock(&pool->sp_lock);
604		return (FALSE);
605	}
606	s->slc_dispatch = dispatch;
607	TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
608	mtx_unlock(&pool->sp_lock);
609	return (TRUE);
610}
611
612/*
613 * Remove a service connection loss program from the callout list.
614 */
615void
616svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
617{
618	struct svc_loss_callout *s;
619
620	mtx_lock(&pool->sp_lock);
621	TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
622		if (s->slc_dispatch == dispatch) {
623			TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
624			free(s, M_RPC);
625			break;
626		}
627	}
628	mtx_unlock(&pool->sp_lock);
629}
630
631/* ********************** CALLOUT list related stuff ************* */
632
633/*
634 * Search the callout list for a program number, return the callout
635 * struct.
636 */
637static struct svc_callout *
638svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
639{
640	struct svc_callout *s;
641
642	mtx_assert(&pool->sp_lock, MA_OWNED);
643	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
644		if (s->sc_prog == prog && s->sc_vers == vers
645		    && (netid == NULL || s->sc_netid == NULL ||
646			strcmp(netid, s->sc_netid) == 0))
647			break;
648	}
649
650	return (s);
651}
652
653/* ******************* REPLY GENERATION ROUTINES  ************ */
654
655static bool_t
656svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
657    struct mbuf *body)
658{
659	SVCXPRT *xprt = rqstp->rq_xprt;
660	bool_t ok;
661
662	if (rqstp->rq_args) {
663		m_freem(rqstp->rq_args);
664		rqstp->rq_args = NULL;
665	}
666
667	if (xprt->xp_pool->sp_rcache)
668		replay_setreply(xprt->xp_pool->sp_rcache,
669		    rply, svc_getrpccaller(rqstp), body);
670
671	if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
672		return (FALSE);
673
674	ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq);
675	if (rqstp->rq_addr) {
676		free(rqstp->rq_addr, M_SONAME);
677		rqstp->rq_addr = NULL;
678	}
679
680	return (ok);
681}
682
683/*
684 * Send a reply to an rpc request
685 */
686bool_t
687svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
688{
689	struct rpc_msg rply;
690	struct mbuf *m;
691	XDR xdrs;
692	bool_t ok;
693
694	rply.rm_xid = rqstp->rq_xid;
695	rply.rm_direction = REPLY;
696	rply.rm_reply.rp_stat = MSG_ACCEPTED;
697	rply.acpted_rply.ar_verf = rqstp->rq_verf;
698	rply.acpted_rply.ar_stat = SUCCESS;
699	rply.acpted_rply.ar_results.where = NULL;
700	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
701
702	m = m_getcl(M_WAITOK, MT_DATA, 0);
703	xdrmbuf_create(&xdrs, m, XDR_ENCODE);
704	ok = xdr_results(&xdrs, xdr_location);
705	XDR_DESTROY(&xdrs);
706
707	if (ok) {
708		return (svc_sendreply_common(rqstp, &rply, m));
709	} else {
710		m_freem(m);
711		return (FALSE);
712	}
713}
714
715bool_t
716svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
717{
718	struct rpc_msg rply;
719
720	rply.rm_xid = rqstp->rq_xid;
721	rply.rm_direction = REPLY;
722	rply.rm_reply.rp_stat = MSG_ACCEPTED;
723	rply.acpted_rply.ar_verf = rqstp->rq_verf;
724	rply.acpted_rply.ar_stat = SUCCESS;
725	rply.acpted_rply.ar_results.where = NULL;
726	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
727
728	return (svc_sendreply_common(rqstp, &rply, m));
729}
730
731/*
732 * No procedure error reply
733 */
734void
735svcerr_noproc(struct svc_req *rqstp)
736{
737	SVCXPRT *xprt = rqstp->rq_xprt;
738	struct rpc_msg rply;
739
740	rply.rm_xid = rqstp->rq_xid;
741	rply.rm_direction = REPLY;
742	rply.rm_reply.rp_stat = MSG_ACCEPTED;
743	rply.acpted_rply.ar_verf = rqstp->rq_verf;
744	rply.acpted_rply.ar_stat = PROC_UNAVAIL;
745
746	if (xprt->xp_pool->sp_rcache)
747		replay_setreply(xprt->xp_pool->sp_rcache,
748		    &rply, svc_getrpccaller(rqstp), NULL);
749
750	svc_sendreply_common(rqstp, &rply, NULL);
751}
752
753/*
754 * Can't decode args error reply
755 */
756void
757svcerr_decode(struct svc_req *rqstp)
758{
759	SVCXPRT *xprt = rqstp->rq_xprt;
760	struct rpc_msg rply;
761
762	rply.rm_xid = rqstp->rq_xid;
763	rply.rm_direction = REPLY;
764	rply.rm_reply.rp_stat = MSG_ACCEPTED;
765	rply.acpted_rply.ar_verf = rqstp->rq_verf;
766	rply.acpted_rply.ar_stat = GARBAGE_ARGS;
767
768	if (xprt->xp_pool->sp_rcache)
769		replay_setreply(xprt->xp_pool->sp_rcache,
770		    &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
771
772	svc_sendreply_common(rqstp, &rply, NULL);
773}
774
775/*
776 * Some system error
777 */
778void
779svcerr_systemerr(struct svc_req *rqstp)
780{
781	SVCXPRT *xprt = rqstp->rq_xprt;
782	struct rpc_msg rply;
783
784	rply.rm_xid = rqstp->rq_xid;
785	rply.rm_direction = REPLY;
786	rply.rm_reply.rp_stat = MSG_ACCEPTED;
787	rply.acpted_rply.ar_verf = rqstp->rq_verf;
788	rply.acpted_rply.ar_stat = SYSTEM_ERR;
789
790	if (xprt->xp_pool->sp_rcache)
791		replay_setreply(xprt->xp_pool->sp_rcache,
792		    &rply, svc_getrpccaller(rqstp), NULL);
793
794	svc_sendreply_common(rqstp, &rply, NULL);
795}
796
797/*
798 * Authentication error reply
799 */
800void
801svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
802{
803	SVCXPRT *xprt = rqstp->rq_xprt;
804	struct rpc_msg rply;
805
806	rply.rm_xid = rqstp->rq_xid;
807	rply.rm_direction = REPLY;
808	rply.rm_reply.rp_stat = MSG_DENIED;
809	rply.rjcted_rply.rj_stat = AUTH_ERROR;
810	rply.rjcted_rply.rj_why = why;
811
812	if (xprt->xp_pool->sp_rcache)
813		replay_setreply(xprt->xp_pool->sp_rcache,
814		    &rply, svc_getrpccaller(rqstp), NULL);
815
816	svc_sendreply_common(rqstp, &rply, NULL);
817}
818
819/*
820 * Auth too weak error reply
821 */
822void
823svcerr_weakauth(struct svc_req *rqstp)
824{
825
826	svcerr_auth(rqstp, AUTH_TOOWEAK);
827}
828
829/*
830 * Program unavailable error reply
831 */
832void
833svcerr_noprog(struct svc_req *rqstp)
834{
835	SVCXPRT *xprt = rqstp->rq_xprt;
836	struct rpc_msg rply;
837
838	rply.rm_xid = rqstp->rq_xid;
839	rply.rm_direction = REPLY;
840	rply.rm_reply.rp_stat = MSG_ACCEPTED;
841	rply.acpted_rply.ar_verf = rqstp->rq_verf;
842	rply.acpted_rply.ar_stat = PROG_UNAVAIL;
843
844	if (xprt->xp_pool->sp_rcache)
845		replay_setreply(xprt->xp_pool->sp_rcache,
846		    &rply, svc_getrpccaller(rqstp), NULL);
847
848	svc_sendreply_common(rqstp, &rply, NULL);
849}
850
851/*
852 * Program version mismatch error reply
853 */
854void
855svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
856{
857	SVCXPRT *xprt = rqstp->rq_xprt;
858	struct rpc_msg rply;
859
860	rply.rm_xid = rqstp->rq_xid;
861	rply.rm_direction = REPLY;
862	rply.rm_reply.rp_stat = MSG_ACCEPTED;
863	rply.acpted_rply.ar_verf = rqstp->rq_verf;
864	rply.acpted_rply.ar_stat = PROG_MISMATCH;
865	rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
866	rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
867
868	if (xprt->xp_pool->sp_rcache)
869		replay_setreply(xprt->xp_pool->sp_rcache,
870		    &rply, svc_getrpccaller(rqstp), NULL);
871
872	svc_sendreply_common(rqstp, &rply, NULL);
873}
874
875/*
876 * Allocate a new server transport structure. All fields are
877 * initialized to zero and xp_p3 is initialized to point at an
878 * extension structure to hold various flags and authentication
879 * parameters.
880 */
881SVCXPRT *
882svc_xprt_alloc(void)
883{
884	SVCXPRT *xprt;
885	SVCXPRT_EXT *ext;
886
887	xprt = mem_alloc(sizeof(SVCXPRT));
888	ext = mem_alloc(sizeof(SVCXPRT_EXT));
889	xprt->xp_p3 = ext;
890	refcount_init(&xprt->xp_refs, 1);
891
892	return (xprt);
893}
894
895/*
896 * Free a server transport structure.
897 */
898void
899svc_xprt_free(SVCXPRT *xprt)
900{
901
902	mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
903	mem_free(xprt, sizeof(SVCXPRT));
904}
905
906/* ******************* SERVER INPUT STUFF ******************* */
907
908/*
909 * Read RPC requests from a transport and queue them to be
910 * executed. We handle authentication and replay cache replies here.
911 * Actually dispatching the RPC is deferred till svc_executereq.
912 */
913static enum xprt_stat
914svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
915{
916	SVCPOOL *pool = xprt->xp_pool;
917	struct svc_req *r;
918	struct rpc_msg msg;
919	struct mbuf *args;
920	struct svc_loss_callout *s;
921	enum xprt_stat stat;
922
923	/* now receive msgs from xprtprt (support batch calls) */
924	r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
925
926	msg.rm_call.cb_cred.oa_base = r->rq_credarea;
927	msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
928	r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
929	if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
930		enum auth_stat why;
931
932		/*
933		 * Handle replays and authenticate before queuing the
934		 * request to be executed.
935		 */
936		SVC_ACQUIRE(xprt);
937		r->rq_xprt = xprt;
938		if (pool->sp_rcache) {
939			struct rpc_msg repmsg;
940			struct mbuf *repbody;
941			enum replay_state rs;
942			rs = replay_find(pool->sp_rcache, &msg,
943			    svc_getrpccaller(r), &repmsg, &repbody);
944			switch (rs) {
945			case RS_NEW:
946				break;
947			case RS_DONE:
948				SVC_REPLY(xprt, &repmsg, r->rq_addr,
949				    repbody, &r->rq_reply_seq);
950				if (r->rq_addr) {
951					free(r->rq_addr, M_SONAME);
952					r->rq_addr = NULL;
953				}
954				m_freem(args);
955				goto call_done;
956
957			default:
958				m_freem(args);
959				goto call_done;
960			}
961		}
962
963		r->rq_xid = msg.rm_xid;
964		r->rq_prog = msg.rm_call.cb_prog;
965		r->rq_vers = msg.rm_call.cb_vers;
966		r->rq_proc = msg.rm_call.cb_proc;
967		r->rq_size = sizeof(*r) + m_length(args, NULL);
968		r->rq_args = args;
969		if ((why = _authenticate(r, &msg)) != AUTH_OK) {
970			/*
971			 * RPCSEC_GSS uses this return code
972			 * for requests that form part of its
973			 * context establishment protocol and
974			 * should not be dispatched to the
975			 * application.
976			 */
977			if (why != RPCSEC_GSS_NODISPATCH)
978				svcerr_auth(r, why);
979			goto call_done;
980		}
981
982		if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
983			svcerr_decode(r);
984			goto call_done;
985		}
986
987		/*
988		 * Everything checks out, return request to caller.
989		 */
990		*rqstp_ret = r;
991		r = NULL;
992	}
993call_done:
994	if (r) {
995		svc_freereq(r);
996		r = NULL;
997	}
998	if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
999		TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
1000			(*s->slc_dispatch)(xprt);
1001		xprt_unregister(xprt);
1002	}
1003
1004	return (stat);
1005}
1006
1007static void
1008svc_executereq(struct svc_req *rqstp)
1009{
1010	SVCXPRT *xprt = rqstp->rq_xprt;
1011	SVCPOOL *pool = xprt->xp_pool;
1012	int prog_found;
1013	rpcvers_t low_vers;
1014	rpcvers_t high_vers;
1015	struct svc_callout *s;
1016
1017	/* now match message with a registered service*/
1018	prog_found = FALSE;
1019	low_vers = (rpcvers_t) -1L;
1020	high_vers = (rpcvers_t) 0L;
1021	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
1022		if (s->sc_prog == rqstp->rq_prog) {
1023			if (s->sc_vers == rqstp->rq_vers) {
1024				/*
1025				 * We hand ownership of r to the
1026				 * dispatch method - they must call
1027				 * svc_freereq.
1028				 */
1029				(*s->sc_dispatch)(rqstp, xprt);
1030				return;
1031			}  /* found correct version */
1032			prog_found = TRUE;
1033			if (s->sc_vers < low_vers)
1034				low_vers = s->sc_vers;
1035			if (s->sc_vers > high_vers)
1036				high_vers = s->sc_vers;
1037		}   /* found correct program */
1038	}
1039
1040	/*
1041	 * if we got here, the program or version
1042	 * is not served ...
1043	 */
1044	if (prog_found)
1045		svcerr_progvers(rqstp, low_vers, high_vers);
1046	else
1047		svcerr_noprog(rqstp);
1048
1049	svc_freereq(rqstp);
1050}
1051
1052static void
1053svc_checkidle(SVCGROUP *grp)
1054{
1055	SVCXPRT *xprt, *nxprt;
1056	time_t timo;
1057	struct svcxprt_list cleanup;
1058
1059	TAILQ_INIT(&cleanup);
1060	TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) {
1061		/*
1062		 * Only some transports have idle timers. Don't time
1063		 * something out which is just waking up.
1064		 */
1065		if (!xprt->xp_idletimeout || xprt->xp_thread)
1066			continue;
1067
1068		timo = xprt->xp_lastactive + xprt->xp_idletimeout;
1069		if (time_uptime > timo) {
1070			xprt_unregister_locked(xprt);
1071			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
1072		}
1073	}
1074
1075	mtx_unlock(&grp->sg_lock);
1076	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
1077		SVC_RELEASE(xprt);
1078	}
1079	mtx_lock(&grp->sg_lock);
1080}
1081
1082static void
1083svc_assign_waiting_sockets(SVCPOOL *pool)
1084{
1085	SVCGROUP *grp;
1086	SVCXPRT *xprt;
1087	int g;
1088
1089	for (g = 0; g < pool->sp_groupcount; g++) {
1090		grp = &pool->sp_groups[g];
1091		mtx_lock(&grp->sg_lock);
1092		while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1093			if (xprt_assignthread(xprt))
1094				TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1095			else
1096				break;
1097		}
1098		mtx_unlock(&grp->sg_lock);
1099	}
1100}
1101
1102static void
1103svc_change_space_used(SVCPOOL *pool, long delta)
1104{
1105	unsigned long value;
1106
1107	value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta;
1108	if (delta > 0) {
1109		if (value >= pool->sp_space_high && !pool->sp_space_throttled) {
1110			pool->sp_space_throttled = TRUE;
1111			pool->sp_space_throttle_count++;
1112		}
1113		if (value > pool->sp_space_used_highest)
1114			pool->sp_space_used_highest = value;
1115	} else {
1116		if (value < pool->sp_space_low && pool->sp_space_throttled) {
1117			pool->sp_space_throttled = FALSE;
1118			svc_assign_waiting_sockets(pool);
1119		}
1120	}
1121}
1122
1123static bool_t
1124svc_request_space_available(SVCPOOL *pool)
1125{
1126
1127	if (pool->sp_space_throttled)
1128		return (FALSE);
1129	return (TRUE);
1130}
1131
1132static void
1133svc_run_internal(SVCGROUP *grp, bool_t ismaster)
1134{
1135	SVCPOOL *pool = grp->sg_pool;
1136	SVCTHREAD *st, *stpref;
1137	SVCXPRT *xprt;
1138	enum xprt_stat stat;
1139	struct svc_req *rqstp;
1140	struct proc *p;
1141	long sz;
1142	int error;
1143
1144	st = mem_alloc(sizeof(*st));
1145	mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF);
1146	st->st_pool = pool;
1147	st->st_xprt = NULL;
1148	STAILQ_INIT(&st->st_reqs);
1149	cv_init(&st->st_cond, "rpcsvc");
1150
1151	mtx_lock(&grp->sg_lock);
1152
1153	/*
1154	 * If we are a new thread which was spawned to cope with
1155	 * increased load, set the state back to SVCPOOL_ACTIVE.
1156	 */
1157	if (grp->sg_state == SVCPOOL_THREADSTARTING)
1158		grp->sg_state = SVCPOOL_ACTIVE;
1159
1160	while (grp->sg_state != SVCPOOL_CLOSING) {
1161		/*
1162		 * Create new thread if requested.
1163		 */
1164		if (grp->sg_state == SVCPOOL_THREADWANTED) {
1165			grp->sg_state = SVCPOOL_THREADSTARTING;
1166			grp->sg_lastcreatetime = time_uptime;
1167			mtx_unlock(&grp->sg_lock);
1168			svc_new_thread(grp);
1169			mtx_lock(&grp->sg_lock);
1170			continue;
1171		}
1172
1173		/*
1174		 * Check for idle transports once per second.
1175		 */
1176		if (time_uptime > grp->sg_lastidlecheck) {
1177			grp->sg_lastidlecheck = time_uptime;
1178			svc_checkidle(grp);
1179		}
1180
1181		xprt = st->st_xprt;
1182		if (!xprt) {
1183			/*
1184			 * Enforce maxthreads count.
1185			 */
1186			if (!ismaster && grp->sg_threadcount >
1187			    grp->sg_maxthreads)
1188				break;
1189
1190			/*
1191			 * Before sleeping, see if we can find an
1192			 * active transport which isn't being serviced
1193			 * by a thread.
1194			 */
1195			if (svc_request_space_available(pool) &&
1196			    (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1197				TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1198				SVC_ACQUIRE(xprt);
1199				xprt->xp_thread = st;
1200				st->st_xprt = xprt;
1201				continue;
1202			}
1203
1204			LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink);
1205			if (ismaster || (!ismaster &&
1206			    grp->sg_threadcount > grp->sg_minthreads))
1207				error = cv_timedwait_sig(&st->st_cond,
1208				    &grp->sg_lock, 5 * hz);
1209			else
1210				error = cv_wait_sig(&st->st_cond,
1211				    &grp->sg_lock);
1212			if (st->st_xprt == NULL)
1213				LIST_REMOVE(st, st_ilink);
1214
1215			/*
1216			 * Reduce worker thread count when idle.
1217			 */
1218			if (error == EWOULDBLOCK) {
1219				if (!ismaster
1220				    && (grp->sg_threadcount
1221					> grp->sg_minthreads)
1222					&& !st->st_xprt)
1223					break;
1224			} else if (error != 0) {
1225				KASSERT(error == EINTR || error == ERESTART,
1226				    ("non-signal error %d", error));
1227				mtx_unlock(&grp->sg_lock);
1228				p = curproc;
1229				PROC_LOCK(p);
1230				if (P_SHOULDSTOP(p) ||
1231				    (p->p_flag & P_TOTAL_STOP) != 0) {
1232					thread_suspend_check(0);
1233					PROC_UNLOCK(p);
1234					mtx_lock(&grp->sg_lock);
1235				} else {
1236					PROC_UNLOCK(p);
1237					svc_exit(pool);
1238					mtx_lock(&grp->sg_lock);
1239					break;
1240				}
1241			}
1242			continue;
1243		}
1244		mtx_unlock(&grp->sg_lock);
1245
1246		/*
1247		 * Drain the transport socket and queue up any RPCs.
1248		 */
1249		xprt->xp_lastactive = time_uptime;
1250		do {
1251			if (!svc_request_space_available(pool))
1252				break;
1253			rqstp = NULL;
1254			stat = svc_getreq(xprt, &rqstp);
1255			if (rqstp) {
1256				svc_change_space_used(pool, rqstp->rq_size);
1257				/*
1258				 * See if the application has a preference
1259				 * for some other thread.
1260				 */
1261				if (pool->sp_assign) {
1262					stpref = pool->sp_assign(st, rqstp);
1263					rqstp->rq_thread = stpref;
1264					STAILQ_INSERT_TAIL(&stpref->st_reqs,
1265					    rqstp, rq_link);
1266					mtx_unlock(&stpref->st_lock);
1267					if (stpref != st)
1268						rqstp = NULL;
1269				} else {
1270					rqstp->rq_thread = st;
1271					STAILQ_INSERT_TAIL(&st->st_reqs,
1272					    rqstp, rq_link);
1273				}
1274			}
1275		} while (rqstp == NULL && stat == XPRT_MOREREQS
1276		    && grp->sg_state != SVCPOOL_CLOSING);
1277
1278		/*
1279		 * Move this transport to the end of the active list to
1280		 * ensure fairness when multiple transports are active.
1281		 * If this was the last queued request, svc_getreq will end
1282		 * up calling xprt_inactive to remove from the active list.
1283		 */
1284		mtx_lock(&grp->sg_lock);
1285		xprt->xp_thread = NULL;
1286		st->st_xprt = NULL;
1287		if (xprt->xp_active) {
1288			if (!svc_request_space_available(pool) ||
1289			    !xprt_assignthread(xprt))
1290				TAILQ_INSERT_TAIL(&grp->sg_active,
1291				    xprt, xp_alink);
1292		}
1293		mtx_unlock(&grp->sg_lock);
1294		SVC_RELEASE(xprt);
1295
1296		/*
1297		 * Execute what we have queued.
1298		 */
1299		mtx_lock(&st->st_lock);
1300		while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1301			STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1302			mtx_unlock(&st->st_lock);
1303			sz = (long)rqstp->rq_size;
1304			svc_executereq(rqstp);
1305			svc_change_space_used(pool, -sz);
1306			mtx_lock(&st->st_lock);
1307		}
1308		mtx_unlock(&st->st_lock);
1309		mtx_lock(&grp->sg_lock);
1310	}
1311
1312	if (st->st_xprt) {
1313		xprt = st->st_xprt;
1314		st->st_xprt = NULL;
1315		SVC_RELEASE(xprt);
1316	}
1317	KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1318	mtx_destroy(&st->st_lock);
1319	cv_destroy(&st->st_cond);
1320	mem_free(st, sizeof(*st));
1321
1322	grp->sg_threadcount--;
1323	if (!ismaster)
1324		wakeup(grp);
1325	mtx_unlock(&grp->sg_lock);
1326}
1327
1328static void
1329svc_thread_start(void *arg)
1330{
1331
1332	svc_run_internal((SVCGROUP *) arg, FALSE);
1333	kthread_exit();
1334}
1335
1336static void
1337svc_new_thread(SVCGROUP *grp)
1338{
1339	SVCPOOL *pool = grp->sg_pool;
1340	struct thread *td;
1341
1342	mtx_lock(&grp->sg_lock);
1343	grp->sg_threadcount++;
1344	mtx_unlock(&grp->sg_lock);
1345	kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0,
1346	    "%s: service", pool->sp_name);
1347}
1348
1349void
1350svc_run(SVCPOOL *pool)
1351{
1352	int g, i;
1353	struct proc *p;
1354	struct thread *td;
1355	SVCGROUP *grp;
1356
1357	p = curproc;
1358	td = curthread;
1359	snprintf(td->td_name, sizeof(td->td_name),
1360	    "%s: master", pool->sp_name);
1361	pool->sp_state = SVCPOOL_ACTIVE;
1362	pool->sp_proc = p;
1363
1364	/* Choose group count based on number of threads and CPUs. */
1365	pool->sp_groupcount = max(1, min(SVC_MAXGROUPS,
1366	    min(pool->sp_maxthreads / 2, mp_ncpus) / 6));
1367	for (g = 0; g < pool->sp_groupcount; g++) {
1368		grp = &pool->sp_groups[g];
1369		grp->sg_minthreads = max(1,
1370		    pool->sp_minthreads / pool->sp_groupcount);
1371		grp->sg_maxthreads = max(1,
1372		    pool->sp_maxthreads / pool->sp_groupcount);
1373		grp->sg_lastcreatetime = time_uptime;
1374	}
1375
1376	/* Starting threads */
1377	pool->sp_groups[0].sg_threadcount++;
1378	for (g = 0; g < pool->sp_groupcount; g++) {
1379		grp = &pool->sp_groups[g];
1380		for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++)
1381			svc_new_thread(grp);
1382	}
1383	svc_run_internal(&pool->sp_groups[0], TRUE);
1384
1385	/* Waiting for threads to stop. */
1386	for (g = 0; g < pool->sp_groupcount; g++) {
1387		grp = &pool->sp_groups[g];
1388		mtx_lock(&grp->sg_lock);
1389		while (grp->sg_threadcount > 0)
1390			msleep(grp, &grp->sg_lock, 0, "svcexit", 0);
1391		mtx_unlock(&grp->sg_lock);
1392	}
1393}
1394
1395void
1396svc_exit(SVCPOOL *pool)
1397{
1398	SVCGROUP *grp;
1399	SVCTHREAD *st;
1400	int g;
1401
1402	pool->sp_state = SVCPOOL_CLOSING;
1403	for (g = 0; g < pool->sp_groupcount; g++) {
1404		grp = &pool->sp_groups[g];
1405		mtx_lock(&grp->sg_lock);
1406		if (grp->sg_state != SVCPOOL_CLOSING) {
1407			grp->sg_state = SVCPOOL_CLOSING;
1408			LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink)
1409				cv_signal(&st->st_cond);
1410		}
1411		mtx_unlock(&grp->sg_lock);
1412	}
1413}
1414
1415bool_t
1416svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1417{
1418	struct mbuf *m;
1419	XDR xdrs;
1420	bool_t stat;
1421
1422	m = rqstp->rq_args;
1423	rqstp->rq_args = NULL;
1424
1425	xdrmbuf_create(&xdrs, m, XDR_DECODE);
1426	stat = xargs(&xdrs, args);
1427	XDR_DESTROY(&xdrs);
1428
1429	return (stat);
1430}
1431
1432bool_t
1433svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1434{
1435	XDR xdrs;
1436
1437	if (rqstp->rq_addr) {
1438		free(rqstp->rq_addr, M_SONAME);
1439		rqstp->rq_addr = NULL;
1440	}
1441
1442	xdrs.x_op = XDR_FREE;
1443	return (xargs(&xdrs, args));
1444}
1445
1446void
1447svc_freereq(struct svc_req *rqstp)
1448{
1449	SVCTHREAD *st;
1450	SVCPOOL *pool;
1451
1452	st = rqstp->rq_thread;
1453	if (st) {
1454		pool = st->st_pool;
1455		if (pool->sp_done)
1456			pool->sp_done(st, rqstp);
1457	}
1458
1459	if (rqstp->rq_auth.svc_ah_ops)
1460		SVCAUTH_RELEASE(&rqstp->rq_auth);
1461
1462	if (rqstp->rq_xprt) {
1463		SVC_RELEASE(rqstp->rq_xprt);
1464	}
1465
1466	if (rqstp->rq_addr)
1467		free(rqstp->rq_addr, M_SONAME);
1468
1469	if (rqstp->rq_args)
1470		m_freem(rqstp->rq_args);
1471
1472	free(rqstp, M_RPC);
1473}
1474