svc.c revision 261046
1/*	$NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $	*/
2
3/*-
4 * Copyright (c) 2009, Sun Microsystems, Inc.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 * - Redistributions of source code must retain the above copyright notice,
10 *   this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright notice,
12 *   this list of conditions and the following disclaimer in the documentation
13 *   and/or other materials provided with the distribution.
14 * - Neither the name of Sun Microsystems, Inc. nor the names of its
15 *   contributors may be used to endorse or promote products derived
16 *   from this software without specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 */
30
31#if defined(LIBC_SCCS) && !defined(lint)
32static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
33static char *sccsid = "@(#)svc.c	2.4 88/08/11 4.0 RPCSRC";
34#endif
35#include <sys/cdefs.h>
36__FBSDID("$FreeBSD: stable/10/sys/rpc/svc.c 261046 2014-01-22 23:45:27Z mav $");
37
38/*
39 * svc.c, Server-side remote procedure call interface.
40 *
41 * There are two sets of procedures here.  The xprt routines are
42 * for handling transport handles.  The svc routines handle the
43 * list of service routines.
44 *
45 * Copyright (C) 1984, Sun Microsystems, Inc.
46 */
47
48#include <sys/param.h>
49#include <sys/lock.h>
50#include <sys/kernel.h>
51#include <sys/kthread.h>
52#include <sys/malloc.h>
53#include <sys/mbuf.h>
54#include <sys/mutex.h>
55#include <sys/proc.h>
56#include <sys/queue.h>
57#include <sys/socketvar.h>
58#include <sys/systm.h>
59#include <sys/ucred.h>
60
61#include <rpc/rpc.h>
62#include <rpc/rpcb_clnt.h>
63#include <rpc/replay.h>
64
65#include <rpc/rpc_com.h>
66
67#define SVC_VERSQUIET 0x0001		/* keep quiet about vers mismatch */
68#define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
69
70static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
71    char *);
72static void svc_new_thread(SVCPOOL *pool);
73static void xprt_unregister_locked(SVCXPRT *xprt);
74
75/* ***************  SVCXPRT related stuff **************** */
76
77static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
78static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
79
80SVCPOOL*
81svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
82{
83	SVCPOOL *pool;
84
85	pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
86
87	mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
88	pool->sp_name = name;
89	pool->sp_state = SVCPOOL_INIT;
90	pool->sp_proc = NULL;
91	TAILQ_INIT(&pool->sp_xlist);
92	TAILQ_INIT(&pool->sp_active);
93	TAILQ_INIT(&pool->sp_callouts);
94	LIST_INIT(&pool->sp_threads);
95	LIST_INIT(&pool->sp_idlethreads);
96	pool->sp_minthreads = 1;
97	pool->sp_maxthreads = 1;
98	pool->sp_threadcount = 0;
99
100	/*
101	 * Don't use more than a quarter of mbuf clusters or more than
102	 * 45Mb buffering requests.
103	 */
104	pool->sp_space_high = nmbclusters * MCLBYTES / 4;
105	if (pool->sp_space_high > 45 << 20)
106		pool->sp_space_high = 45 << 20;
107	pool->sp_space_low = 2 * pool->sp_space_high / 3;
108
109	sysctl_ctx_init(&pool->sp_sysctl);
110	if (sysctl_base) {
111		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
112		    "minthreads", CTLTYPE_INT | CTLFLAG_RW,
113		    pool, 0, svcpool_minthread_sysctl, "I", "");
114		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
115		    "maxthreads", CTLTYPE_INT | CTLFLAG_RW,
116		    pool, 0, svcpool_maxthread_sysctl, "I", "");
117		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
118		    "threads", CTLFLAG_RD, &pool->sp_threadcount, 0, "");
119
120		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
121		    "request_space_used", CTLFLAG_RD,
122		    &pool->sp_space_used, 0,
123		    "Space in parsed but not handled requests.");
124
125		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
126		    "request_space_used_highest", CTLFLAG_RD,
127		    &pool->sp_space_used_highest, 0,
128		    "Highest space used since reboot.");
129
130		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
131		    "request_space_high", CTLFLAG_RW,
132		    &pool->sp_space_high, 0,
133		    "Maximum space in parsed but not handled requests.");
134
135		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
136		    "request_space_low", CTLFLAG_RW,
137		    &pool->sp_space_low, 0,
138		    "Low water mark for request space.");
139
140		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
141		    "request_space_throttled", CTLFLAG_RD,
142		    &pool->sp_space_throttled, 0,
143		    "Whether nfs requests are currently throttled");
144
145		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
146		    "request_space_throttle_count", CTLFLAG_RD,
147		    &pool->sp_space_throttle_count, 0,
148		    "Count of times throttling based on request space has occurred");
149	}
150
151	return pool;
152}
153
154void
155svcpool_destroy(SVCPOOL *pool)
156{
157	SVCXPRT *xprt, *nxprt;
158	struct svc_callout *s;
159	struct svcxprt_list cleanup;
160
161	TAILQ_INIT(&cleanup);
162	mtx_lock(&pool->sp_lock);
163
164	while (TAILQ_FIRST(&pool->sp_xlist)) {
165		xprt = TAILQ_FIRST(&pool->sp_xlist);
166		xprt_unregister_locked(xprt);
167		TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
168	}
169
170	while (TAILQ_FIRST(&pool->sp_callouts)) {
171		s = TAILQ_FIRST(&pool->sp_callouts);
172		mtx_unlock(&pool->sp_lock);
173		svc_unreg(pool, s->sc_prog, s->sc_vers);
174		mtx_lock(&pool->sp_lock);
175	}
176	mtx_unlock(&pool->sp_lock);
177
178	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
179		SVC_RELEASE(xprt);
180	}
181
182	mtx_destroy(&pool->sp_lock);
183
184	if (pool->sp_rcache)
185		replay_freecache(pool->sp_rcache);
186
187	sysctl_ctx_free(&pool->sp_sysctl);
188	free(pool, M_RPC);
189}
190
191static bool_t
192svcpool_active(SVCPOOL *pool)
193{
194	enum svcpool_state state = pool->sp_state;
195
196	if (state == SVCPOOL_INIT || state == SVCPOOL_CLOSING)
197		return (FALSE);
198	return (TRUE);
199}
200
201/*
202 * Sysctl handler to set the minimum thread count on a pool
203 */
204static int
205svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
206{
207	SVCPOOL *pool;
208	int newminthreads, error, n;
209
210	pool = oidp->oid_arg1;
211	newminthreads = pool->sp_minthreads;
212	error = sysctl_handle_int(oidp, &newminthreads, 0, req);
213	if (error == 0 && newminthreads != pool->sp_minthreads) {
214		if (newminthreads > pool->sp_maxthreads)
215			return (EINVAL);
216		mtx_lock(&pool->sp_lock);
217		if (newminthreads > pool->sp_minthreads
218		    && svcpool_active(pool)) {
219			/*
220			 * If the pool is running and we are
221			 * increasing, create some more threads now.
222			 */
223			n = newminthreads - pool->sp_threadcount;
224			if (n > 0) {
225				mtx_unlock(&pool->sp_lock);
226				while (n--)
227					svc_new_thread(pool);
228				mtx_lock(&pool->sp_lock);
229			}
230		}
231		pool->sp_minthreads = newminthreads;
232		mtx_unlock(&pool->sp_lock);
233	}
234	return (error);
235}
236
237/*
238 * Sysctl handler to set the maximum thread count on a pool
239 */
240static int
241svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
242{
243	SVCPOOL *pool;
244	SVCTHREAD *st;
245	int newmaxthreads, error;
246
247	pool = oidp->oid_arg1;
248	newmaxthreads = pool->sp_maxthreads;
249	error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
250	if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
251		if (newmaxthreads < pool->sp_minthreads)
252			return (EINVAL);
253		mtx_lock(&pool->sp_lock);
254		if (newmaxthreads < pool->sp_maxthreads
255		    && svcpool_active(pool)) {
256			/*
257			 * If the pool is running and we are
258			 * decreasing, wake up some idle threads to
259			 * encourage them to exit.
260			 */
261			LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
262				cv_signal(&st->st_cond);
263		}
264		pool->sp_maxthreads = newmaxthreads;
265		mtx_unlock(&pool->sp_lock);
266	}
267	return (error);
268}
269
270/*
271 * Activate a transport handle.
272 */
273void
274xprt_register(SVCXPRT *xprt)
275{
276	SVCPOOL *pool = xprt->xp_pool;
277
278	SVC_ACQUIRE(xprt);
279	mtx_lock(&pool->sp_lock);
280	xprt->xp_registered = TRUE;
281	xprt->xp_active = FALSE;
282	TAILQ_INSERT_TAIL(&pool->sp_xlist, xprt, xp_link);
283	mtx_unlock(&pool->sp_lock);
284}
285
286/*
287 * De-activate a transport handle. Note: the locked version doesn't
288 * release the transport - caller must do that after dropping the pool
289 * lock.
290 */
291static void
292xprt_unregister_locked(SVCXPRT *xprt)
293{
294	SVCPOOL *pool = xprt->xp_pool;
295
296	KASSERT(xprt->xp_registered == TRUE,
297	    ("xprt_unregister_locked: not registered"));
298	if (xprt->xp_active) {
299		TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
300		xprt->xp_active = FALSE;
301	}
302	TAILQ_REMOVE(&pool->sp_xlist, xprt, xp_link);
303	xprt->xp_registered = FALSE;
304}
305
306void
307xprt_unregister(SVCXPRT *xprt)
308{
309	SVCPOOL *pool = xprt->xp_pool;
310
311	mtx_lock(&pool->sp_lock);
312	if (xprt->xp_registered == FALSE) {
313		/* Already unregistered by another thread */
314		mtx_unlock(&pool->sp_lock);
315		return;
316	}
317	xprt_unregister_locked(xprt);
318	mtx_unlock(&pool->sp_lock);
319
320	SVC_RELEASE(xprt);
321}
322
323static void
324xprt_assignthread(SVCXPRT *xprt)
325{
326	SVCPOOL *pool = xprt->xp_pool;
327	SVCTHREAD *st;
328
329	/*
330	 * Attempt to assign a service thread to this
331	 * transport.
332	 */
333	LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink) {
334		if (st->st_xprt == NULL && STAILQ_EMPTY(&st->st_reqs))
335			break;
336	}
337	if (st) {
338		SVC_ACQUIRE(xprt);
339		xprt->xp_thread = st;
340		st->st_xprt = xprt;
341		cv_signal(&st->st_cond);
342	} else {
343		/*
344		 * See if we can create a new thread. The
345		 * actual thread creation happens in
346		 * svc_run_internal because our locking state
347		 * is poorly defined (we are typically called
348		 * from a socket upcall). Don't create more
349		 * than one thread per second.
350		 */
351		if (pool->sp_state == SVCPOOL_ACTIVE
352		    && pool->sp_lastcreatetime < time_uptime
353		    && pool->sp_threadcount < pool->sp_maxthreads) {
354			pool->sp_state = SVCPOOL_THREADWANTED;
355		}
356	}
357}
358
359void
360xprt_active(SVCXPRT *xprt)
361{
362	SVCPOOL *pool = xprt->xp_pool;
363
364	mtx_lock(&pool->sp_lock);
365
366	if (!xprt->xp_registered) {
367		/*
368		 * Race with xprt_unregister - we lose.
369		 */
370		mtx_unlock(&pool->sp_lock);
371		return;
372	}
373
374	if (!xprt->xp_active) {
375		TAILQ_INSERT_TAIL(&pool->sp_active, xprt, xp_alink);
376		xprt->xp_active = TRUE;
377		xprt_assignthread(xprt);
378	}
379
380	mtx_unlock(&pool->sp_lock);
381}
382
383void
384xprt_inactive_locked(SVCXPRT *xprt)
385{
386	SVCPOOL *pool = xprt->xp_pool;
387
388	if (xprt->xp_active) {
389		TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
390		xprt->xp_active = FALSE;
391	}
392}
393
394void
395xprt_inactive(SVCXPRT *xprt)
396{
397	SVCPOOL *pool = xprt->xp_pool;
398
399	mtx_lock(&pool->sp_lock);
400	xprt_inactive_locked(xprt);
401	mtx_unlock(&pool->sp_lock);
402}
403
404/*
405 * Add a service program to the callout list.
406 * The dispatch routine will be called when a rpc request for this
407 * program number comes in.
408 */
409bool_t
410svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
411    void (*dispatch)(struct svc_req *, SVCXPRT *),
412    const struct netconfig *nconf)
413{
414	SVCPOOL *pool = xprt->xp_pool;
415	struct svc_callout *s;
416	char *netid = NULL;
417	int flag = 0;
418
419/* VARIABLES PROTECTED BY svc_lock: s, svc_head */
420
421	if (xprt->xp_netid) {
422		netid = strdup(xprt->xp_netid, M_RPC);
423		flag = 1;
424	} else if (nconf && nconf->nc_netid) {
425		netid = strdup(nconf->nc_netid, M_RPC);
426		flag = 1;
427	} /* must have been created with svc_raw_create */
428	if ((netid == NULL) && (flag == 1)) {
429		return (FALSE);
430	}
431
432	mtx_lock(&pool->sp_lock);
433	if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
434		if (netid)
435			free(netid, M_RPC);
436		if (s->sc_dispatch == dispatch)
437			goto rpcb_it; /* he is registering another xptr */
438		mtx_unlock(&pool->sp_lock);
439		return (FALSE);
440	}
441	s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
442	if (s == NULL) {
443		if (netid)
444			free(netid, M_RPC);
445		mtx_unlock(&pool->sp_lock);
446		return (FALSE);
447	}
448
449	s->sc_prog = prog;
450	s->sc_vers = vers;
451	s->sc_dispatch = dispatch;
452	s->sc_netid = netid;
453	TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
454
455	if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
456		((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
457
458rpcb_it:
459	mtx_unlock(&pool->sp_lock);
460	/* now register the information with the local binder service */
461	if (nconf) {
462		bool_t dummy;
463		struct netconfig tnc;
464		struct netbuf nb;
465		tnc = *nconf;
466		nb.buf = &xprt->xp_ltaddr;
467		nb.len = xprt->xp_ltaddr.ss_len;
468		dummy = rpcb_set(prog, vers, &tnc, &nb);
469		return (dummy);
470	}
471	return (TRUE);
472}
473
474/*
475 * Remove a service program from the callout list.
476 */
477void
478svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
479{
480	struct svc_callout *s;
481
482	/* unregister the information anyway */
483	(void) rpcb_unset(prog, vers, NULL);
484	mtx_lock(&pool->sp_lock);
485	while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
486		TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
487		if (s->sc_netid)
488			mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
489		mem_free(s, sizeof (struct svc_callout));
490	}
491	mtx_unlock(&pool->sp_lock);
492}
493
494/* ********************** CALLOUT list related stuff ************* */
495
496/*
497 * Search the callout list for a program number, return the callout
498 * struct.
499 */
500static struct svc_callout *
501svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
502{
503	struct svc_callout *s;
504
505	mtx_assert(&pool->sp_lock, MA_OWNED);
506	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
507		if (s->sc_prog == prog && s->sc_vers == vers
508		    && (netid == NULL || s->sc_netid == NULL ||
509			strcmp(netid, s->sc_netid) == 0))
510			break;
511	}
512
513	return (s);
514}
515
516/* ******************* REPLY GENERATION ROUTINES  ************ */
517
518static bool_t
519svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
520    struct mbuf *body)
521{
522	SVCXPRT *xprt = rqstp->rq_xprt;
523	bool_t ok;
524
525	if (rqstp->rq_args) {
526		m_freem(rqstp->rq_args);
527		rqstp->rq_args = NULL;
528	}
529
530	if (xprt->xp_pool->sp_rcache)
531		replay_setreply(xprt->xp_pool->sp_rcache,
532		    rply, svc_getrpccaller(rqstp), body);
533
534	if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
535		return (FALSE);
536
537	ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body);
538	if (rqstp->rq_addr) {
539		free(rqstp->rq_addr, M_SONAME);
540		rqstp->rq_addr = NULL;
541	}
542
543	return (ok);
544}
545
546/*
547 * Send a reply to an rpc request
548 */
549bool_t
550svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
551{
552	struct rpc_msg rply;
553	struct mbuf *m;
554	XDR xdrs;
555	bool_t ok;
556
557	rply.rm_xid = rqstp->rq_xid;
558	rply.rm_direction = REPLY;
559	rply.rm_reply.rp_stat = MSG_ACCEPTED;
560	rply.acpted_rply.ar_verf = rqstp->rq_verf;
561	rply.acpted_rply.ar_stat = SUCCESS;
562	rply.acpted_rply.ar_results.where = NULL;
563	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
564
565	m = m_getcl(M_WAITOK, MT_DATA, 0);
566	xdrmbuf_create(&xdrs, m, XDR_ENCODE);
567	ok = xdr_results(&xdrs, xdr_location);
568	XDR_DESTROY(&xdrs);
569
570	if (ok) {
571		return (svc_sendreply_common(rqstp, &rply, m));
572	} else {
573		m_freem(m);
574		return (FALSE);
575	}
576}
577
578bool_t
579svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
580{
581	struct rpc_msg rply;
582
583	rply.rm_xid = rqstp->rq_xid;
584	rply.rm_direction = REPLY;
585	rply.rm_reply.rp_stat = MSG_ACCEPTED;
586	rply.acpted_rply.ar_verf = rqstp->rq_verf;
587	rply.acpted_rply.ar_stat = SUCCESS;
588	rply.acpted_rply.ar_results.where = NULL;
589	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
590
591	return (svc_sendreply_common(rqstp, &rply, m));
592}
593
594/*
595 * No procedure error reply
596 */
597void
598svcerr_noproc(struct svc_req *rqstp)
599{
600	SVCXPRT *xprt = rqstp->rq_xprt;
601	struct rpc_msg rply;
602
603	rply.rm_xid = rqstp->rq_xid;
604	rply.rm_direction = REPLY;
605	rply.rm_reply.rp_stat = MSG_ACCEPTED;
606	rply.acpted_rply.ar_verf = rqstp->rq_verf;
607	rply.acpted_rply.ar_stat = PROC_UNAVAIL;
608
609	if (xprt->xp_pool->sp_rcache)
610		replay_setreply(xprt->xp_pool->sp_rcache,
611		    &rply, svc_getrpccaller(rqstp), NULL);
612
613	svc_sendreply_common(rqstp, &rply, NULL);
614}
615
616/*
617 * Can't decode args error reply
618 */
619void
620svcerr_decode(struct svc_req *rqstp)
621{
622	SVCXPRT *xprt = rqstp->rq_xprt;
623	struct rpc_msg rply;
624
625	rply.rm_xid = rqstp->rq_xid;
626	rply.rm_direction = REPLY;
627	rply.rm_reply.rp_stat = MSG_ACCEPTED;
628	rply.acpted_rply.ar_verf = rqstp->rq_verf;
629	rply.acpted_rply.ar_stat = GARBAGE_ARGS;
630
631	if (xprt->xp_pool->sp_rcache)
632		replay_setreply(xprt->xp_pool->sp_rcache,
633		    &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
634
635	svc_sendreply_common(rqstp, &rply, NULL);
636}
637
638/*
639 * Some system error
640 */
641void
642svcerr_systemerr(struct svc_req *rqstp)
643{
644	SVCXPRT *xprt = rqstp->rq_xprt;
645	struct rpc_msg rply;
646
647	rply.rm_xid = rqstp->rq_xid;
648	rply.rm_direction = REPLY;
649	rply.rm_reply.rp_stat = MSG_ACCEPTED;
650	rply.acpted_rply.ar_verf = rqstp->rq_verf;
651	rply.acpted_rply.ar_stat = SYSTEM_ERR;
652
653	if (xprt->xp_pool->sp_rcache)
654		replay_setreply(xprt->xp_pool->sp_rcache,
655		    &rply, svc_getrpccaller(rqstp), NULL);
656
657	svc_sendreply_common(rqstp, &rply, NULL);
658}
659
660/*
661 * Authentication error reply
662 */
663void
664svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
665{
666	SVCXPRT *xprt = rqstp->rq_xprt;
667	struct rpc_msg rply;
668
669	rply.rm_xid = rqstp->rq_xid;
670	rply.rm_direction = REPLY;
671	rply.rm_reply.rp_stat = MSG_DENIED;
672	rply.rjcted_rply.rj_stat = AUTH_ERROR;
673	rply.rjcted_rply.rj_why = why;
674
675	if (xprt->xp_pool->sp_rcache)
676		replay_setreply(xprt->xp_pool->sp_rcache,
677		    &rply, svc_getrpccaller(rqstp), NULL);
678
679	svc_sendreply_common(rqstp, &rply, NULL);
680}
681
682/*
683 * Auth too weak error reply
684 */
685void
686svcerr_weakauth(struct svc_req *rqstp)
687{
688
689	svcerr_auth(rqstp, AUTH_TOOWEAK);
690}
691
692/*
693 * Program unavailable error reply
694 */
695void
696svcerr_noprog(struct svc_req *rqstp)
697{
698	SVCXPRT *xprt = rqstp->rq_xprt;
699	struct rpc_msg rply;
700
701	rply.rm_xid = rqstp->rq_xid;
702	rply.rm_direction = REPLY;
703	rply.rm_reply.rp_stat = MSG_ACCEPTED;
704	rply.acpted_rply.ar_verf = rqstp->rq_verf;
705	rply.acpted_rply.ar_stat = PROG_UNAVAIL;
706
707	if (xprt->xp_pool->sp_rcache)
708		replay_setreply(xprt->xp_pool->sp_rcache,
709		    &rply, svc_getrpccaller(rqstp), NULL);
710
711	svc_sendreply_common(rqstp, &rply, NULL);
712}
713
714/*
715 * Program version mismatch error reply
716 */
717void
718svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
719{
720	SVCXPRT *xprt = rqstp->rq_xprt;
721	struct rpc_msg rply;
722
723	rply.rm_xid = rqstp->rq_xid;
724	rply.rm_direction = REPLY;
725	rply.rm_reply.rp_stat = MSG_ACCEPTED;
726	rply.acpted_rply.ar_verf = rqstp->rq_verf;
727	rply.acpted_rply.ar_stat = PROG_MISMATCH;
728	rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
729	rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
730
731	if (xprt->xp_pool->sp_rcache)
732		replay_setreply(xprt->xp_pool->sp_rcache,
733		    &rply, svc_getrpccaller(rqstp), NULL);
734
735	svc_sendreply_common(rqstp, &rply, NULL);
736}
737
738/*
739 * Allocate a new server transport structure. All fields are
740 * initialized to zero and xp_p3 is initialized to point at an
741 * extension structure to hold various flags and authentication
742 * parameters.
743 */
744SVCXPRT *
745svc_xprt_alloc()
746{
747	SVCXPRT *xprt;
748	SVCXPRT_EXT *ext;
749
750	xprt = mem_alloc(sizeof(SVCXPRT));
751	memset(xprt, 0, sizeof(SVCXPRT));
752	ext = mem_alloc(sizeof(SVCXPRT_EXT));
753	memset(ext, 0, sizeof(SVCXPRT_EXT));
754	xprt->xp_p3 = ext;
755	refcount_init(&xprt->xp_refs, 1);
756
757	return (xprt);
758}
759
760/*
761 * Free a server transport structure.
762 */
763void
764svc_xprt_free(xprt)
765	SVCXPRT *xprt;
766{
767
768	mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
769	mem_free(xprt, sizeof(SVCXPRT));
770}
771
772/* ******************* SERVER INPUT STUFF ******************* */
773
774/*
775 * Read RPC requests from a transport and queue them to be
776 * executed. We handle authentication and replay cache replies here.
777 * Actually dispatching the RPC is deferred till svc_executereq.
778 */
779static enum xprt_stat
780svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
781{
782	SVCPOOL *pool = xprt->xp_pool;
783	struct svc_req *r;
784	struct rpc_msg msg;
785	struct mbuf *args;
786	enum xprt_stat stat;
787
788	/* now receive msgs from xprtprt (support batch calls) */
789	r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
790
791	msg.rm_call.cb_cred.oa_base = r->rq_credarea;
792	msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
793	r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
794	if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
795		enum auth_stat why;
796
797		/*
798		 * Handle replays and authenticate before queuing the
799		 * request to be executed.
800		 */
801		SVC_ACQUIRE(xprt);
802		r->rq_xprt = xprt;
803		if (pool->sp_rcache) {
804			struct rpc_msg repmsg;
805			struct mbuf *repbody;
806			enum replay_state rs;
807			rs = replay_find(pool->sp_rcache, &msg,
808			    svc_getrpccaller(r), &repmsg, &repbody);
809			switch (rs) {
810			case RS_NEW:
811				break;
812			case RS_DONE:
813				SVC_REPLY(xprt, &repmsg, r->rq_addr,
814				    repbody);
815				if (r->rq_addr) {
816					free(r->rq_addr, M_SONAME);
817					r->rq_addr = NULL;
818				}
819				m_freem(args);
820				goto call_done;
821
822			default:
823				m_freem(args);
824				goto call_done;
825			}
826		}
827
828		r->rq_xid = msg.rm_xid;
829		r->rq_prog = msg.rm_call.cb_prog;
830		r->rq_vers = msg.rm_call.cb_vers;
831		r->rq_proc = msg.rm_call.cb_proc;
832		r->rq_size = sizeof(*r) + m_length(args, NULL);
833		r->rq_args = args;
834		if ((why = _authenticate(r, &msg)) != AUTH_OK) {
835			/*
836			 * RPCSEC_GSS uses this return code
837			 * for requests that form part of its
838			 * context establishment protocol and
839			 * should not be dispatched to the
840			 * application.
841			 */
842			if (why != RPCSEC_GSS_NODISPATCH)
843				svcerr_auth(r, why);
844			goto call_done;
845		}
846
847		if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
848			svcerr_decode(r);
849			goto call_done;
850		}
851
852		/*
853		 * Everything checks out, return request to caller.
854		 */
855		*rqstp_ret = r;
856		r = NULL;
857	}
858call_done:
859	if (r) {
860		svc_freereq(r);
861		r = NULL;
862	}
863	if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
864		xprt_unregister(xprt);
865	}
866
867	return (stat);
868}
869
870static void
871svc_executereq(struct svc_req *rqstp)
872{
873	SVCXPRT *xprt = rqstp->rq_xprt;
874	SVCPOOL *pool = xprt->xp_pool;
875	int prog_found;
876	rpcvers_t low_vers;
877	rpcvers_t high_vers;
878	struct svc_callout *s;
879
880	/* now match message with a registered service*/
881	prog_found = FALSE;
882	low_vers = (rpcvers_t) -1L;
883	high_vers = (rpcvers_t) 0L;
884	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
885		if (s->sc_prog == rqstp->rq_prog) {
886			if (s->sc_vers == rqstp->rq_vers) {
887				/*
888				 * We hand ownership of r to the
889				 * dispatch method - they must call
890				 * svc_freereq.
891				 */
892				(*s->sc_dispatch)(rqstp, xprt);
893				return;
894			}  /* found correct version */
895			prog_found = TRUE;
896			if (s->sc_vers < low_vers)
897				low_vers = s->sc_vers;
898			if (s->sc_vers > high_vers)
899				high_vers = s->sc_vers;
900		}   /* found correct program */
901	}
902
903	/*
904	 * if we got here, the program or version
905	 * is not served ...
906	 */
907	if (prog_found)
908		svcerr_progvers(rqstp, low_vers, high_vers);
909	else
910		svcerr_noprog(rqstp);
911
912	svc_freereq(rqstp);
913}
914
915static void
916svc_checkidle(SVCPOOL *pool)
917{
918	SVCXPRT *xprt, *nxprt;
919	time_t timo;
920	struct svcxprt_list cleanup;
921
922	TAILQ_INIT(&cleanup);
923	TAILQ_FOREACH_SAFE(xprt, &pool->sp_xlist, xp_link, nxprt) {
924		/*
925		 * Only some transports have idle timers. Don't time
926		 * something out which is just waking up.
927		 */
928		if (!xprt->xp_idletimeout || xprt->xp_thread)
929			continue;
930
931		timo = xprt->xp_lastactive + xprt->xp_idletimeout;
932		if (time_uptime > timo) {
933			xprt_unregister_locked(xprt);
934			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
935		}
936	}
937
938	mtx_unlock(&pool->sp_lock);
939	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
940		SVC_RELEASE(xprt);
941	}
942	mtx_lock(&pool->sp_lock);
943
944}
945
946static void
947svc_assign_waiting_sockets(SVCPOOL *pool)
948{
949	SVCXPRT *xprt;
950
951	TAILQ_FOREACH(xprt, &pool->sp_active, xp_alink) {
952		if (!xprt->xp_thread) {
953			xprt_assignthread(xprt);
954		}
955	}
956}
957
958static bool_t
959svc_request_space_available(SVCPOOL *pool)
960{
961
962	mtx_assert(&pool->sp_lock, MA_OWNED);
963
964	if (pool->sp_space_throttled) {
965		/*
966		 * Below the low-water yet? If so, assign any waiting sockets.
967		 */
968		if (pool->sp_space_used < pool->sp_space_low) {
969			pool->sp_space_throttled = FALSE;
970			svc_assign_waiting_sockets(pool);
971			return TRUE;
972		}
973
974		return FALSE;
975	} else {
976		if (pool->sp_space_used
977		    >= pool->sp_space_high) {
978			pool->sp_space_throttled = TRUE;
979			pool->sp_space_throttle_count++;
980			return FALSE;
981		}
982
983		return TRUE;
984	}
985}
986
987static void
988svc_run_internal(SVCPOOL *pool, bool_t ismaster)
989{
990	SVCTHREAD *st, *stpref;
991	SVCXPRT *xprt;
992	enum xprt_stat stat;
993	struct svc_req *rqstp;
994	int error;
995
996	st = mem_alloc(sizeof(*st));
997	st->st_xprt = NULL;
998	STAILQ_INIT(&st->st_reqs);
999	cv_init(&st->st_cond, "rpcsvc");
1000
1001	mtx_lock(&pool->sp_lock);
1002	LIST_INSERT_HEAD(&pool->sp_threads, st, st_link);
1003
1004	/*
1005	 * If we are a new thread which was spawned to cope with
1006	 * increased load, set the state back to SVCPOOL_ACTIVE.
1007	 */
1008	if (pool->sp_state == SVCPOOL_THREADSTARTING)
1009		pool->sp_state = SVCPOOL_ACTIVE;
1010
1011	while (pool->sp_state != SVCPOOL_CLOSING) {
1012		/*
1013		 * Create new thread if requested.
1014		 */
1015		if (pool->sp_state == SVCPOOL_THREADWANTED) {
1016			pool->sp_state = SVCPOOL_THREADSTARTING;
1017			pool->sp_lastcreatetime = time_uptime;
1018			mtx_unlock(&pool->sp_lock);
1019			svc_new_thread(pool);
1020			mtx_lock(&pool->sp_lock);
1021			continue;
1022		}
1023
1024		/*
1025		 * Check for idle transports once per second.
1026		 */
1027		if (time_uptime > pool->sp_lastidlecheck) {
1028			pool->sp_lastidlecheck = time_uptime;
1029			svc_checkidle(pool);
1030		}
1031
1032		xprt = st->st_xprt;
1033		if (!xprt && STAILQ_EMPTY(&st->st_reqs)) {
1034			/*
1035			 * Enforce maxthreads count.
1036			 */
1037			if (pool->sp_threadcount > pool->sp_maxthreads)
1038				break;
1039
1040			/*
1041			 * Before sleeping, see if we can find an
1042			 * active transport which isn't being serviced
1043			 * by a thread.
1044			 */
1045			if (svc_request_space_available(pool)) {
1046				TAILQ_FOREACH(xprt, &pool->sp_active,
1047				    xp_alink) {
1048					if (!xprt->xp_thread) {
1049						SVC_ACQUIRE(xprt);
1050						xprt->xp_thread = st;
1051						st->st_xprt = xprt;
1052						break;
1053					}
1054				}
1055			}
1056			if (st->st_xprt)
1057				continue;
1058
1059			LIST_INSERT_HEAD(&pool->sp_idlethreads, st, st_ilink);
1060			if (ismaster || (!ismaster &&
1061			    pool->sp_threadcount > pool->sp_minthreads))
1062				error = cv_timedwait_sig(&st->st_cond,
1063				    &pool->sp_lock, 5 * hz);
1064			else
1065				error = cv_wait_sig(&st->st_cond,
1066				    &pool->sp_lock);
1067			LIST_REMOVE(st, st_ilink);
1068
1069			/*
1070			 * Reduce worker thread count when idle.
1071			 */
1072			if (error == EWOULDBLOCK) {
1073				if (!ismaster
1074				    && (pool->sp_threadcount
1075					> pool->sp_minthreads)
1076					&& !st->st_xprt
1077					&& STAILQ_EMPTY(&st->st_reqs))
1078					break;
1079			} else if (error) {
1080				mtx_unlock(&pool->sp_lock);
1081				svc_exit(pool);
1082				mtx_lock(&pool->sp_lock);
1083				break;
1084			}
1085			continue;
1086		}
1087
1088		if (xprt) {
1089			/*
1090			 * Drain the transport socket and queue up any
1091			 * RPCs.
1092			 */
1093			xprt->xp_lastactive = time_uptime;
1094			stat = XPRT_IDLE;
1095			do {
1096				if (!svc_request_space_available(pool))
1097					break;
1098				rqstp = NULL;
1099				mtx_unlock(&pool->sp_lock);
1100				stat = svc_getreq(xprt, &rqstp);
1101				mtx_lock(&pool->sp_lock);
1102				if (rqstp) {
1103					/*
1104					 * See if the application has
1105					 * a preference for some other
1106					 * thread.
1107					 */
1108					stpref = st;
1109					if (pool->sp_assign)
1110						stpref = pool->sp_assign(st,
1111						    rqstp);
1112
1113					pool->sp_space_used +=
1114						rqstp->rq_size;
1115					if (pool->sp_space_used
1116					    > pool->sp_space_used_highest)
1117						pool->sp_space_used_highest =
1118							pool->sp_space_used;
1119					rqstp->rq_thread = stpref;
1120					STAILQ_INSERT_TAIL(&stpref->st_reqs,
1121					    rqstp, rq_link);
1122					stpref->st_reqcount++;
1123
1124					/*
1125					 * If we assigned the request
1126					 * to another thread, make
1127					 * sure its awake and continue
1128					 * reading from the
1129					 * socket. Otherwise, try to
1130					 * find some other thread to
1131					 * read from the socket and
1132					 * execute the request
1133					 * immediately.
1134					 */
1135					if (stpref != st) {
1136						cv_signal(&stpref->st_cond);
1137						continue;
1138					} else {
1139						break;
1140					}
1141				}
1142			} while (stat == XPRT_MOREREQS
1143			    && pool->sp_state != SVCPOOL_CLOSING);
1144
1145			/*
1146			 * Move this transport to the end of the
1147			 * active list to ensure fairness when
1148			 * multiple transports are active. If this was
1149			 * the last queued request, svc_getreq will
1150			 * end up calling xprt_inactive to remove from
1151			 * the active list.
1152			 */
1153			xprt->xp_thread = NULL;
1154			st->st_xprt = NULL;
1155			if (xprt->xp_active) {
1156				xprt_assignthread(xprt);
1157				TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
1158				TAILQ_INSERT_TAIL(&pool->sp_active, xprt,
1159				    xp_alink);
1160			}
1161			mtx_unlock(&pool->sp_lock);
1162			SVC_RELEASE(xprt);
1163			mtx_lock(&pool->sp_lock);
1164		}
1165
1166		/*
1167		 * Execute what we have queued.
1168		 */
1169		while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1170			size_t sz = rqstp->rq_size;
1171			mtx_unlock(&pool->sp_lock);
1172			svc_executereq(rqstp);
1173			mtx_lock(&pool->sp_lock);
1174			pool->sp_space_used -= sz;
1175		}
1176	}
1177
1178	if (st->st_xprt) {
1179		xprt = st->st_xprt;
1180		st->st_xprt = NULL;
1181		SVC_RELEASE(xprt);
1182	}
1183
1184	KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1185	LIST_REMOVE(st, st_link);
1186	pool->sp_threadcount--;
1187
1188	mtx_unlock(&pool->sp_lock);
1189
1190	cv_destroy(&st->st_cond);
1191	mem_free(st, sizeof(*st));
1192
1193	if (!ismaster)
1194		wakeup(pool);
1195}
1196
1197static void
1198svc_thread_start(void *arg)
1199{
1200
1201	svc_run_internal((SVCPOOL *) arg, FALSE);
1202	kthread_exit();
1203}
1204
1205static void
1206svc_new_thread(SVCPOOL *pool)
1207{
1208	struct thread *td;
1209
1210	pool->sp_threadcount++;
1211	kthread_add(svc_thread_start, pool,
1212	    pool->sp_proc, &td, 0, 0,
1213	    "%s: service", pool->sp_name);
1214}
1215
1216void
1217svc_run(SVCPOOL *pool)
1218{
1219	int i;
1220	struct proc *p;
1221	struct thread *td;
1222
1223	p = curproc;
1224	td = curthread;
1225	snprintf(td->td_name, sizeof(td->td_name),
1226	    "%s: master", pool->sp_name);
1227	pool->sp_state = SVCPOOL_ACTIVE;
1228	pool->sp_proc = p;
1229	pool->sp_lastcreatetime = time_uptime;
1230	pool->sp_threadcount = 1;
1231
1232	for (i = 1; i < pool->sp_minthreads; i++) {
1233		svc_new_thread(pool);
1234	}
1235
1236	svc_run_internal(pool, TRUE);
1237
1238	mtx_lock(&pool->sp_lock);
1239	while (pool->sp_threadcount > 0)
1240		msleep(pool, &pool->sp_lock, 0, "svcexit", 0);
1241	mtx_unlock(&pool->sp_lock);
1242}
1243
1244void
1245svc_exit(SVCPOOL *pool)
1246{
1247	SVCTHREAD *st;
1248
1249	mtx_lock(&pool->sp_lock);
1250
1251	if (pool->sp_state != SVCPOOL_CLOSING) {
1252		pool->sp_state = SVCPOOL_CLOSING;
1253		LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
1254			cv_signal(&st->st_cond);
1255	}
1256
1257	mtx_unlock(&pool->sp_lock);
1258}
1259
1260bool_t
1261svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1262{
1263	struct mbuf *m;
1264	XDR xdrs;
1265	bool_t stat;
1266
1267	m = rqstp->rq_args;
1268	rqstp->rq_args = NULL;
1269
1270	xdrmbuf_create(&xdrs, m, XDR_DECODE);
1271	stat = xargs(&xdrs, args);
1272	XDR_DESTROY(&xdrs);
1273
1274	return (stat);
1275}
1276
1277bool_t
1278svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1279{
1280	XDR xdrs;
1281
1282	if (rqstp->rq_addr) {
1283		free(rqstp->rq_addr, M_SONAME);
1284		rqstp->rq_addr = NULL;
1285	}
1286
1287	xdrs.x_op = XDR_FREE;
1288	return (xargs(&xdrs, args));
1289}
1290
1291void
1292svc_freereq(struct svc_req *rqstp)
1293{
1294	SVCTHREAD *st;
1295	SVCXPRT *xprt;
1296	SVCPOOL *pool;
1297
1298	st = rqstp->rq_thread;
1299	xprt = rqstp->rq_xprt;
1300	if (xprt)
1301		pool = xprt->xp_pool;
1302	else
1303		pool = NULL;
1304	if (st) {
1305		mtx_lock(&pool->sp_lock);
1306		KASSERT(rqstp == STAILQ_FIRST(&st->st_reqs),
1307		    ("Freeing request out of order"));
1308		STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1309		st->st_reqcount--;
1310		if (pool->sp_done)
1311			pool->sp_done(st, rqstp);
1312		mtx_unlock(&pool->sp_lock);
1313	}
1314
1315	if (rqstp->rq_auth.svc_ah_ops)
1316		SVCAUTH_RELEASE(&rqstp->rq_auth);
1317
1318	if (rqstp->rq_xprt) {
1319		SVC_RELEASE(rqstp->rq_xprt);
1320	}
1321
1322	if (rqstp->rq_addr)
1323		free(rqstp->rq_addr, M_SONAME);
1324
1325	if (rqstp->rq_args)
1326		m_freem(rqstp->rq_args);
1327
1328	free(rqstp, M_RPC);
1329}
1330