svc.c revision 193603
1/* $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $ */ 2 3/* 4 * Sun RPC is a product of Sun Microsystems, Inc. and is provided for 5 * unrestricted use provided that this legend is included on all tape 6 * media and as a part of the software program in whole or part. Users 7 * may copy or modify Sun RPC without charge, but are not authorized 8 * to license or distribute it to anyone else except as part of a product or 9 * program developed by the user. 10 * 11 * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE 12 * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR 13 * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE. 14 * 15 * Sun RPC is provided with no support and without any obligation on the 16 * part of Sun Microsystems, Inc. to assist in its use, correction, 17 * modification or enhancement. 18 * 19 * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE 20 * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC 21 * OR ANY PART THEREOF. 22 * 23 * In no event will Sun Microsystems, Inc. be liable for any lost revenue 24 * or profits or other special, indirect and consequential damages, even if 25 * Sun has been advised of the possibility of such damages. 26 * 27 * Sun Microsystems, Inc. 28 * 2550 Garcia Avenue 29 * Mountain View, California 94043 30 */ 31 32#if defined(LIBC_SCCS) && !defined(lint) 33static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro"; 34static char *sccsid = "@(#)svc.c 2.4 88/08/11 4.0 RPCSRC"; 35#endif 36#include <sys/cdefs.h> 37__FBSDID("$FreeBSD: head/sys/rpc/svc.c 193603 2009-06-07 01:06:56Z rmacklem $"); 38 39/* 40 * svc.c, Server-side remote procedure call interface. 41 * 42 * There are two sets of procedures here. The xprt routines are 43 * for handling transport handles. The svc routines handle the 44 * list of service routines. 45 * 46 * Copyright (C) 1984, Sun Microsystems, Inc. 47 */ 48 49#include <sys/param.h> 50#include <sys/lock.h> 51#include <sys/kernel.h> 52#include <sys/kthread.h> 53#include <sys/malloc.h> 54#include <sys/mbuf.h> 55#include <sys/mutex.h> 56#include <sys/proc.h> 57#include <sys/queue.h> 58#include <sys/socketvar.h> 59#include <sys/systm.h> 60#include <sys/ucred.h> 61 62#include <rpc/rpc.h> 63#include <rpc/rpcb_clnt.h> 64#include <rpc/replay.h> 65 66#include <rpc/rpc_com.h> 67 68#define SVC_VERSQUIET 0x0001 /* keep quiet about vers mismatch */ 69#define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET) 70 71static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t, 72 char *); 73static void svc_new_thread(SVCPOOL *pool); 74static void xprt_unregister_locked(SVCXPRT *xprt); 75 76/* *************** SVCXPRT related stuff **************** */ 77 78static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS); 79static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS); 80 81SVCPOOL* 82svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base) 83{ 84 SVCPOOL *pool; 85 86 pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO); 87 88 mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF); 89 pool->sp_name = name; 90 pool->sp_state = SVCPOOL_INIT; 91 pool->sp_proc = NULL; 92 TAILQ_INIT(&pool->sp_xlist); 93 TAILQ_INIT(&pool->sp_active); 94 TAILQ_INIT(&pool->sp_callouts); 95 LIST_INIT(&pool->sp_threads); 96 LIST_INIT(&pool->sp_idlethreads); 97 pool->sp_minthreads = 1; 98 pool->sp_maxthreads = 1; 99 pool->sp_threadcount = 0; 100 101 /* 102 * Don't use more than a quarter of mbuf clusters or more than 103 * 45Mb buffering requests. 104 */ 105 pool->sp_space_high = nmbclusters * MCLBYTES / 4; 106 if (pool->sp_space_high > 45 << 20) 107 pool->sp_space_high = 45 << 20; 108 pool->sp_space_low = 2 * pool->sp_space_high / 3; 109 110 sysctl_ctx_init(&pool->sp_sysctl); 111 if (sysctl_base) { 112 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 113 "minthreads", CTLTYPE_INT | CTLFLAG_RW, 114 pool, 0, svcpool_minthread_sysctl, "I", ""); 115 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO, 116 "maxthreads", CTLTYPE_INT | CTLFLAG_RW, 117 pool, 0, svcpool_maxthread_sysctl, "I", ""); 118 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 119 "threads", CTLFLAG_RD, &pool->sp_threadcount, 0, ""); 120 121 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 122 "request_space_used", CTLFLAG_RD, 123 &pool->sp_space_used, 0, 124 "Space in parsed but not handled requests."); 125 126 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 127 "request_space_used_highest", CTLFLAG_RD, 128 &pool->sp_space_used_highest, 0, 129 "Highest space used since reboot."); 130 131 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 132 "request_space_high", CTLFLAG_RW, 133 &pool->sp_space_high, 0, 134 "Maximum space in parsed but not handled requests."); 135 136 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 137 "request_space_low", CTLFLAG_RW, 138 &pool->sp_space_low, 0, 139 "Low water mark for request space."); 140 141 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 142 "request_space_throttled", CTLFLAG_RD, 143 &pool->sp_space_throttled, 0, 144 "Whether nfs requests are currently throttled"); 145 146 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO, 147 "request_space_throttle_count", CTLFLAG_RD, 148 &pool->sp_space_throttle_count, 0, 149 "Count of times throttling based on request space has occurred"); 150 } 151 152 return pool; 153} 154 155void 156svcpool_destroy(SVCPOOL *pool) 157{ 158 SVCXPRT *xprt, *nxprt; 159 struct svc_callout *s; 160 struct svcxprt_list cleanup; 161 162 TAILQ_INIT(&cleanup); 163 mtx_lock(&pool->sp_lock); 164 165 while (TAILQ_FIRST(&pool->sp_xlist)) { 166 xprt = TAILQ_FIRST(&pool->sp_xlist); 167 xprt_unregister_locked(xprt); 168 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); 169 } 170 171 while (TAILQ_FIRST(&pool->sp_callouts)) { 172 s = TAILQ_FIRST(&pool->sp_callouts); 173 mtx_unlock(&pool->sp_lock); 174 svc_unreg(pool, s->sc_prog, s->sc_vers); 175 mtx_lock(&pool->sp_lock); 176 } 177 mtx_unlock(&pool->sp_lock); 178 179 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { 180 SVC_RELEASE(xprt); 181 } 182 183 mtx_destroy(&pool->sp_lock); 184 185 if (pool->sp_rcache) 186 replay_freecache(pool->sp_rcache); 187 188 sysctl_ctx_free(&pool->sp_sysctl); 189 free(pool, M_RPC); 190} 191 192static bool_t 193svcpool_active(SVCPOOL *pool) 194{ 195 enum svcpool_state state = pool->sp_state; 196 197 if (state == SVCPOOL_INIT || state == SVCPOOL_CLOSING) 198 return (FALSE); 199 return (TRUE); 200} 201 202/* 203 * Sysctl handler to set the minimum thread count on a pool 204 */ 205static int 206svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS) 207{ 208 SVCPOOL *pool; 209 int newminthreads, error, n; 210 211 pool = oidp->oid_arg1; 212 newminthreads = pool->sp_minthreads; 213 error = sysctl_handle_int(oidp, &newminthreads, 0, req); 214 if (error == 0 && newminthreads != pool->sp_minthreads) { 215 if (newminthreads > pool->sp_maxthreads) 216 return (EINVAL); 217 mtx_lock(&pool->sp_lock); 218 if (newminthreads > pool->sp_minthreads 219 && svcpool_active(pool)) { 220 /* 221 * If the pool is running and we are 222 * increasing, create some more threads now. 223 */ 224 n = newminthreads - pool->sp_threadcount; 225 if (n > 0) { 226 mtx_unlock(&pool->sp_lock); 227 while (n--) 228 svc_new_thread(pool); 229 mtx_lock(&pool->sp_lock); 230 } 231 } 232 pool->sp_minthreads = newminthreads; 233 mtx_unlock(&pool->sp_lock); 234 } 235 return (error); 236} 237 238/* 239 * Sysctl handler to set the maximum thread count on a pool 240 */ 241static int 242svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS) 243{ 244 SVCPOOL *pool; 245 SVCTHREAD *st; 246 int newmaxthreads, error; 247 248 pool = oidp->oid_arg1; 249 newmaxthreads = pool->sp_maxthreads; 250 error = sysctl_handle_int(oidp, &newmaxthreads, 0, req); 251 if (error == 0 && newmaxthreads != pool->sp_maxthreads) { 252 if (newmaxthreads < pool->sp_minthreads) 253 return (EINVAL); 254 mtx_lock(&pool->sp_lock); 255 if (newmaxthreads < pool->sp_maxthreads 256 && svcpool_active(pool)) { 257 /* 258 * If the pool is running and we are 259 * decreasing, wake up some idle threads to 260 * encourage them to exit. 261 */ 262 LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink) 263 cv_signal(&st->st_cond); 264 } 265 pool->sp_maxthreads = newmaxthreads; 266 mtx_unlock(&pool->sp_lock); 267 } 268 return (error); 269} 270 271/* 272 * Activate a transport handle. 273 */ 274void 275xprt_register(SVCXPRT *xprt) 276{ 277 SVCPOOL *pool = xprt->xp_pool; 278 279 mtx_lock(&pool->sp_lock); 280 xprt->xp_registered = TRUE; 281 xprt->xp_active = FALSE; 282 TAILQ_INSERT_TAIL(&pool->sp_xlist, xprt, xp_link); 283 mtx_unlock(&pool->sp_lock); 284} 285 286/* 287 * De-activate a transport handle. Note: the locked version doesn't 288 * release the transport - caller must do that after dropping the pool 289 * lock. 290 */ 291static void 292xprt_unregister_locked(SVCXPRT *xprt) 293{ 294 SVCPOOL *pool = xprt->xp_pool; 295 296 if (xprt->xp_active) { 297 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink); 298 xprt->xp_active = FALSE; 299 } 300 TAILQ_REMOVE(&pool->sp_xlist, xprt, xp_link); 301 xprt->xp_registered = FALSE; 302} 303 304void 305xprt_unregister(SVCXPRT *xprt) 306{ 307 SVCPOOL *pool = xprt->xp_pool; 308 309 mtx_lock(&pool->sp_lock); 310 xprt_unregister_locked(xprt); 311 mtx_unlock(&pool->sp_lock); 312 313 SVC_RELEASE(xprt); 314} 315 316static void 317xprt_assignthread(SVCXPRT *xprt) 318{ 319 SVCPOOL *pool = xprt->xp_pool; 320 SVCTHREAD *st; 321 322 /* 323 * Attempt to assign a service thread to this 324 * transport. 325 */ 326 LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink) { 327 if (st->st_xprt == NULL && STAILQ_EMPTY(&st->st_reqs)) 328 break; 329 } 330 if (st) { 331 SVC_ACQUIRE(xprt); 332 xprt->xp_thread = st; 333 st->st_xprt = xprt; 334 cv_signal(&st->st_cond); 335 } else { 336 /* 337 * See if we can create a new thread. The 338 * actual thread creation happens in 339 * svc_run_internal because our locking state 340 * is poorly defined (we are typically called 341 * from a socket upcall). Don't create more 342 * than one thread per second. 343 */ 344 if (pool->sp_state == SVCPOOL_ACTIVE 345 && pool->sp_lastcreatetime < time_uptime 346 && pool->sp_threadcount < pool->sp_maxthreads) { 347 pool->sp_state = SVCPOOL_THREADWANTED; 348 } 349 } 350} 351 352void 353xprt_active(SVCXPRT *xprt) 354{ 355 SVCPOOL *pool = xprt->xp_pool; 356 357 mtx_lock(&pool->sp_lock); 358 359 if (!xprt->xp_registered) { 360 /* 361 * Race with xprt_unregister - we lose. 362 */ 363 mtx_unlock(&pool->sp_lock); 364 return; 365 } 366 367 if (!xprt->xp_active) { 368 TAILQ_INSERT_TAIL(&pool->sp_active, xprt, xp_alink); 369 xprt->xp_active = TRUE; 370 xprt_assignthread(xprt); 371 } 372 373 mtx_unlock(&pool->sp_lock); 374} 375 376void 377xprt_inactive_locked(SVCXPRT *xprt) 378{ 379 SVCPOOL *pool = xprt->xp_pool; 380 381 if (xprt->xp_active) { 382 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink); 383 xprt->xp_active = FALSE; 384 } 385} 386 387void 388xprt_inactive(SVCXPRT *xprt) 389{ 390 SVCPOOL *pool = xprt->xp_pool; 391 392 mtx_lock(&pool->sp_lock); 393 xprt_inactive_locked(xprt); 394 mtx_unlock(&pool->sp_lock); 395} 396 397/* 398 * Add a service program to the callout list. 399 * The dispatch routine will be called when a rpc request for this 400 * program number comes in. 401 */ 402bool_t 403svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers, 404 void (*dispatch)(struct svc_req *, SVCXPRT *), 405 const struct netconfig *nconf) 406{ 407 SVCPOOL *pool = xprt->xp_pool; 408 struct svc_callout *s; 409 char *netid = NULL; 410 int flag = 0; 411 412/* VARIABLES PROTECTED BY svc_lock: s, svc_head */ 413 414 if (xprt->xp_netid) { 415 netid = strdup(xprt->xp_netid, M_RPC); 416 flag = 1; 417 } else if (nconf && nconf->nc_netid) { 418 netid = strdup(nconf->nc_netid, M_RPC); 419 flag = 1; 420 } /* must have been created with svc_raw_create */ 421 if ((netid == NULL) && (flag == 1)) { 422 return (FALSE); 423 } 424 425 mtx_lock(&pool->sp_lock); 426 if ((s = svc_find(pool, prog, vers, netid)) != NULL) { 427 if (netid) 428 free(netid, M_RPC); 429 if (s->sc_dispatch == dispatch) 430 goto rpcb_it; /* he is registering another xptr */ 431 mtx_unlock(&pool->sp_lock); 432 return (FALSE); 433 } 434 s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT); 435 if (s == NULL) { 436 if (netid) 437 free(netid, M_RPC); 438 mtx_unlock(&pool->sp_lock); 439 return (FALSE); 440 } 441 442 s->sc_prog = prog; 443 s->sc_vers = vers; 444 s->sc_dispatch = dispatch; 445 s->sc_netid = netid; 446 TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link); 447 448 if ((xprt->xp_netid == NULL) && (flag == 1) && netid) 449 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC); 450 451rpcb_it: 452 mtx_unlock(&pool->sp_lock); 453 /* now register the information with the local binder service */ 454 if (nconf) { 455 bool_t dummy; 456 struct netconfig tnc; 457 struct netbuf nb; 458 tnc = *nconf; 459 nb.buf = &xprt->xp_ltaddr; 460 nb.len = xprt->xp_ltaddr.ss_len; 461 dummy = rpcb_set(prog, vers, &tnc, &nb); 462 return (dummy); 463 } 464 return (TRUE); 465} 466 467/* 468 * Remove a service program from the callout list. 469 */ 470void 471svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers) 472{ 473 struct svc_callout *s; 474 475 /* unregister the information anyway */ 476 (void) rpcb_unset(prog, vers, NULL); 477 mtx_lock(&pool->sp_lock); 478 while ((s = svc_find(pool, prog, vers, NULL)) != NULL) { 479 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link); 480 if (s->sc_netid) 481 mem_free(s->sc_netid, sizeof (s->sc_netid) + 1); 482 mem_free(s, sizeof (struct svc_callout)); 483 } 484 mtx_unlock(&pool->sp_lock); 485} 486 487/* ********************** CALLOUT list related stuff ************* */ 488 489/* 490 * Search the callout list for a program number, return the callout 491 * struct. 492 */ 493static struct svc_callout * 494svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid) 495{ 496 struct svc_callout *s; 497 498 mtx_assert(&pool->sp_lock, MA_OWNED); 499 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) { 500 if (s->sc_prog == prog && s->sc_vers == vers 501 && (netid == NULL || s->sc_netid == NULL || 502 strcmp(netid, s->sc_netid) == 0)) 503 break; 504 } 505 506 return (s); 507} 508 509/* ******************* REPLY GENERATION ROUTINES ************ */ 510 511static bool_t 512svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply, 513 struct mbuf *body) 514{ 515 SVCXPRT *xprt = rqstp->rq_xprt; 516 bool_t ok; 517 518 if (rqstp->rq_args) { 519 m_freem(rqstp->rq_args); 520 rqstp->rq_args = NULL; 521 } 522 523 if (xprt->xp_pool->sp_rcache) 524 replay_setreply(xprt->xp_pool->sp_rcache, 525 rply, svc_getrpccaller(rqstp), body); 526 527 if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body)) 528 return (FALSE); 529 530 ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body); 531 if (rqstp->rq_addr) { 532 free(rqstp->rq_addr, M_SONAME); 533 rqstp->rq_addr = NULL; 534 } 535 536 return (ok); 537} 538 539/* 540 * Send a reply to an rpc request 541 */ 542bool_t 543svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location) 544{ 545 struct rpc_msg rply; 546 struct mbuf *m; 547 XDR xdrs; 548 bool_t ok; 549 550 rply.rm_xid = rqstp->rq_xid; 551 rply.rm_direction = REPLY; 552 rply.rm_reply.rp_stat = MSG_ACCEPTED; 553 rply.acpted_rply.ar_verf = rqstp->rq_verf; 554 rply.acpted_rply.ar_stat = SUCCESS; 555 rply.acpted_rply.ar_results.where = NULL; 556 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void; 557 558 MGET(m, M_WAIT, MT_DATA); 559 MCLGET(m, M_WAIT); 560 m->m_len = 0; 561 xdrmbuf_create(&xdrs, m, XDR_ENCODE); 562 ok = xdr_results(&xdrs, xdr_location); 563 XDR_DESTROY(&xdrs); 564 565 if (ok) { 566 return (svc_sendreply_common(rqstp, &rply, m)); 567 } else { 568 m_freem(m); 569 return (FALSE); 570 } 571} 572 573bool_t 574svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m) 575{ 576 struct rpc_msg rply; 577 578 rply.rm_xid = rqstp->rq_xid; 579 rply.rm_direction = REPLY; 580 rply.rm_reply.rp_stat = MSG_ACCEPTED; 581 rply.acpted_rply.ar_verf = rqstp->rq_verf; 582 rply.acpted_rply.ar_stat = SUCCESS; 583 rply.acpted_rply.ar_results.where = NULL; 584 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void; 585 586 return (svc_sendreply_common(rqstp, &rply, m)); 587} 588 589/* 590 * No procedure error reply 591 */ 592void 593svcerr_noproc(struct svc_req *rqstp) 594{ 595 SVCXPRT *xprt = rqstp->rq_xprt; 596 struct rpc_msg rply; 597 598 rply.rm_xid = rqstp->rq_xid; 599 rply.rm_direction = REPLY; 600 rply.rm_reply.rp_stat = MSG_ACCEPTED; 601 rply.acpted_rply.ar_verf = rqstp->rq_verf; 602 rply.acpted_rply.ar_stat = PROC_UNAVAIL; 603 604 if (xprt->xp_pool->sp_rcache) 605 replay_setreply(xprt->xp_pool->sp_rcache, 606 &rply, svc_getrpccaller(rqstp), NULL); 607 608 svc_sendreply_common(rqstp, &rply, NULL); 609} 610 611/* 612 * Can't decode args error reply 613 */ 614void 615svcerr_decode(struct svc_req *rqstp) 616{ 617 SVCXPRT *xprt = rqstp->rq_xprt; 618 struct rpc_msg rply; 619 620 rply.rm_xid = rqstp->rq_xid; 621 rply.rm_direction = REPLY; 622 rply.rm_reply.rp_stat = MSG_ACCEPTED; 623 rply.acpted_rply.ar_verf = rqstp->rq_verf; 624 rply.acpted_rply.ar_stat = GARBAGE_ARGS; 625 626 if (xprt->xp_pool->sp_rcache) 627 replay_setreply(xprt->xp_pool->sp_rcache, 628 &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL); 629 630 svc_sendreply_common(rqstp, &rply, NULL); 631} 632 633/* 634 * Some system error 635 */ 636void 637svcerr_systemerr(struct svc_req *rqstp) 638{ 639 SVCXPRT *xprt = rqstp->rq_xprt; 640 struct rpc_msg rply; 641 642 rply.rm_xid = rqstp->rq_xid; 643 rply.rm_direction = REPLY; 644 rply.rm_reply.rp_stat = MSG_ACCEPTED; 645 rply.acpted_rply.ar_verf = rqstp->rq_verf; 646 rply.acpted_rply.ar_stat = SYSTEM_ERR; 647 648 if (xprt->xp_pool->sp_rcache) 649 replay_setreply(xprt->xp_pool->sp_rcache, 650 &rply, svc_getrpccaller(rqstp), NULL); 651 652 svc_sendreply_common(rqstp, &rply, NULL); 653} 654 655/* 656 * Authentication error reply 657 */ 658void 659svcerr_auth(struct svc_req *rqstp, enum auth_stat why) 660{ 661 SVCXPRT *xprt = rqstp->rq_xprt; 662 struct rpc_msg rply; 663 664 rply.rm_xid = rqstp->rq_xid; 665 rply.rm_direction = REPLY; 666 rply.rm_reply.rp_stat = MSG_DENIED; 667 rply.rjcted_rply.rj_stat = AUTH_ERROR; 668 rply.rjcted_rply.rj_why = why; 669 670 if (xprt->xp_pool->sp_rcache) 671 replay_setreply(xprt->xp_pool->sp_rcache, 672 &rply, svc_getrpccaller(rqstp), NULL); 673 674 svc_sendreply_common(rqstp, &rply, NULL); 675} 676 677/* 678 * Auth too weak error reply 679 */ 680void 681svcerr_weakauth(struct svc_req *rqstp) 682{ 683 684 svcerr_auth(rqstp, AUTH_TOOWEAK); 685} 686 687/* 688 * Program unavailable error reply 689 */ 690void 691svcerr_noprog(struct svc_req *rqstp) 692{ 693 SVCXPRT *xprt = rqstp->rq_xprt; 694 struct rpc_msg rply; 695 696 rply.rm_xid = rqstp->rq_xid; 697 rply.rm_direction = REPLY; 698 rply.rm_reply.rp_stat = MSG_ACCEPTED; 699 rply.acpted_rply.ar_verf = rqstp->rq_verf; 700 rply.acpted_rply.ar_stat = PROG_UNAVAIL; 701 702 if (xprt->xp_pool->sp_rcache) 703 replay_setreply(xprt->xp_pool->sp_rcache, 704 &rply, svc_getrpccaller(rqstp), NULL); 705 706 svc_sendreply_common(rqstp, &rply, NULL); 707} 708 709/* 710 * Program version mismatch error reply 711 */ 712void 713svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers) 714{ 715 SVCXPRT *xprt = rqstp->rq_xprt; 716 struct rpc_msg rply; 717 718 rply.rm_xid = rqstp->rq_xid; 719 rply.rm_direction = REPLY; 720 rply.rm_reply.rp_stat = MSG_ACCEPTED; 721 rply.acpted_rply.ar_verf = rqstp->rq_verf; 722 rply.acpted_rply.ar_stat = PROG_MISMATCH; 723 rply.acpted_rply.ar_vers.low = (uint32_t)low_vers; 724 rply.acpted_rply.ar_vers.high = (uint32_t)high_vers; 725 726 if (xprt->xp_pool->sp_rcache) 727 replay_setreply(xprt->xp_pool->sp_rcache, 728 &rply, svc_getrpccaller(rqstp), NULL); 729 730 svc_sendreply_common(rqstp, &rply, NULL); 731} 732 733/* 734 * Allocate a new server transport structure. All fields are 735 * initialized to zero and xp_p3 is initialized to point at an 736 * extension structure to hold various flags and authentication 737 * parameters. 738 */ 739SVCXPRT * 740svc_xprt_alloc() 741{ 742 SVCXPRT *xprt; 743 SVCXPRT_EXT *ext; 744 745 xprt = mem_alloc(sizeof(SVCXPRT)); 746 memset(xprt, 0, sizeof(SVCXPRT)); 747 ext = mem_alloc(sizeof(SVCXPRT_EXT)); 748 memset(ext, 0, sizeof(SVCXPRT_EXT)); 749 xprt->xp_p3 = ext; 750 refcount_init(&xprt->xp_refs, 1); 751 752 return (xprt); 753} 754 755/* 756 * Free a server transport structure. 757 */ 758void 759svc_xprt_free(xprt) 760 SVCXPRT *xprt; 761{ 762 763 mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT)); 764 mem_free(xprt, sizeof(SVCXPRT)); 765} 766 767/* ******************* SERVER INPUT STUFF ******************* */ 768 769/* 770 * Read RPC requests from a transport and queue them to be 771 * executed. We handle authentication and replay cache replies here. 772 * Actually dispatching the RPC is deferred till svc_executereq. 773 */ 774static enum xprt_stat 775svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret) 776{ 777 SVCPOOL *pool = xprt->xp_pool; 778 struct svc_req *r; 779 struct rpc_msg msg; 780 struct mbuf *args; 781 enum xprt_stat stat; 782 783 /* now receive msgs from xprtprt (support batch calls) */ 784 r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO); 785 786 msg.rm_call.cb_cred.oa_base = r->rq_credarea; 787 msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES]; 788 r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES]; 789 if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) { 790 enum auth_stat why; 791 792 /* 793 * Handle replays and authenticate before queuing the 794 * request to be executed. 795 */ 796 SVC_ACQUIRE(xprt); 797 r->rq_xprt = xprt; 798 if (pool->sp_rcache) { 799 struct rpc_msg repmsg; 800 struct mbuf *repbody; 801 enum replay_state rs; 802 rs = replay_find(pool->sp_rcache, &msg, 803 svc_getrpccaller(r), &repmsg, &repbody); 804 switch (rs) { 805 case RS_NEW: 806 break; 807 case RS_DONE: 808 SVC_REPLY(xprt, &repmsg, r->rq_addr, 809 repbody); 810 if (r->rq_addr) { 811 free(r->rq_addr, M_SONAME); 812 r->rq_addr = NULL; 813 } 814 goto call_done; 815 816 default: 817 goto call_done; 818 } 819 } 820 821 r->rq_xid = msg.rm_xid; 822 r->rq_prog = msg.rm_call.cb_prog; 823 r->rq_vers = msg.rm_call.cb_vers; 824 r->rq_proc = msg.rm_call.cb_proc; 825 r->rq_size = sizeof(*r) + m_length(args, NULL); 826 r->rq_args = args; 827 if ((why = _authenticate(r, &msg)) != AUTH_OK) { 828 /* 829 * RPCSEC_GSS uses this return code 830 * for requests that form part of its 831 * context establishment protocol and 832 * should not be dispatched to the 833 * application. 834 */ 835 if (why != RPCSEC_GSS_NODISPATCH) 836 svcerr_auth(r, why); 837 goto call_done; 838 } 839 840 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) { 841 svcerr_decode(r); 842 goto call_done; 843 } 844 845 /* 846 * Everything checks out, return request to caller. 847 */ 848 *rqstp_ret = r; 849 r = NULL; 850 } 851call_done: 852 if (r) { 853 svc_freereq(r); 854 r = NULL; 855 } 856 if ((stat = SVC_STAT(xprt)) == XPRT_DIED) { 857 xprt_unregister(xprt); 858 } 859 860 return (stat); 861} 862 863static void 864svc_executereq(struct svc_req *rqstp) 865{ 866 SVCXPRT *xprt = rqstp->rq_xprt; 867 SVCPOOL *pool = xprt->xp_pool; 868 int prog_found; 869 rpcvers_t low_vers; 870 rpcvers_t high_vers; 871 struct svc_callout *s; 872 873 /* now match message with a registered service*/ 874 prog_found = FALSE; 875 low_vers = (rpcvers_t) -1L; 876 high_vers = (rpcvers_t) 0L; 877 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) { 878 if (s->sc_prog == rqstp->rq_prog) { 879 if (s->sc_vers == rqstp->rq_vers) { 880 /* 881 * We hand ownership of r to the 882 * dispatch method - they must call 883 * svc_freereq. 884 */ 885 (*s->sc_dispatch)(rqstp, xprt); 886 return; 887 } /* found correct version */ 888 prog_found = TRUE; 889 if (s->sc_vers < low_vers) 890 low_vers = s->sc_vers; 891 if (s->sc_vers > high_vers) 892 high_vers = s->sc_vers; 893 } /* found correct program */ 894 } 895 896 /* 897 * if we got here, the program or version 898 * is not served ... 899 */ 900 if (prog_found) 901 svcerr_progvers(rqstp, low_vers, high_vers); 902 else 903 svcerr_noprog(rqstp); 904 905 svc_freereq(rqstp); 906} 907 908static void 909svc_checkidle(SVCPOOL *pool) 910{ 911 SVCXPRT *xprt, *nxprt; 912 time_t timo; 913 struct svcxprt_list cleanup; 914 915 TAILQ_INIT(&cleanup); 916 TAILQ_FOREACH_SAFE(xprt, &pool->sp_xlist, xp_link, nxprt) { 917 /* 918 * Only some transports have idle timers. Don't time 919 * something out which is just waking up. 920 */ 921 if (!xprt->xp_idletimeout || xprt->xp_thread) 922 continue; 923 924 timo = xprt->xp_lastactive + xprt->xp_idletimeout; 925 if (time_uptime > timo) { 926 xprt_unregister_locked(xprt); 927 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link); 928 } 929 } 930 931 mtx_unlock(&pool->sp_lock); 932 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) { 933 SVC_RELEASE(xprt); 934 } 935 mtx_lock(&pool->sp_lock); 936 937} 938 939static void 940svc_assign_waiting_sockets(SVCPOOL *pool) 941{ 942 SVCXPRT *xprt; 943 944 TAILQ_FOREACH(xprt, &pool->sp_active, xp_alink) { 945 if (!xprt->xp_thread) { 946 xprt_assignthread(xprt); 947 } 948 } 949} 950 951static bool_t 952svc_request_space_available(SVCPOOL *pool) 953{ 954 955 mtx_assert(&pool->sp_lock, MA_OWNED); 956 957 if (pool->sp_space_throttled) { 958 /* 959 * Below the low-water yet? If so, assign any waiting sockets. 960 */ 961 if (pool->sp_space_used < pool->sp_space_low) { 962 pool->sp_space_throttled = FALSE; 963 svc_assign_waiting_sockets(pool); 964 return TRUE; 965 } 966 967 return FALSE; 968 } else { 969 if (pool->sp_space_used 970 >= pool->sp_space_high) { 971 pool->sp_space_throttled = TRUE; 972 pool->sp_space_throttle_count++; 973 return FALSE; 974 } 975 976 return TRUE; 977 } 978} 979 980static void 981svc_run_internal(SVCPOOL *pool, bool_t ismaster) 982{ 983 SVCTHREAD *st, *stpref; 984 SVCXPRT *xprt; 985 enum xprt_stat stat; 986 struct svc_req *rqstp; 987 int error; 988 989 st = mem_alloc(sizeof(*st)); 990 st->st_xprt = NULL; 991 STAILQ_INIT(&st->st_reqs); 992 cv_init(&st->st_cond, "rpcsvc"); 993 994 mtx_lock(&pool->sp_lock); 995 LIST_INSERT_HEAD(&pool->sp_threads, st, st_link); 996 997 /* 998 * If we are a new thread which was spawned to cope with 999 * increased load, set the state back to SVCPOOL_ACTIVE. 1000 */ 1001 if (pool->sp_state == SVCPOOL_THREADSTARTING) 1002 pool->sp_state = SVCPOOL_ACTIVE; 1003 1004 while (pool->sp_state != SVCPOOL_CLOSING) { 1005 /* 1006 * Check for idle transports once per second. 1007 */ 1008 if (time_uptime > pool->sp_lastidlecheck) { 1009 pool->sp_lastidlecheck = time_uptime; 1010 svc_checkidle(pool); 1011 } 1012 1013 xprt = st->st_xprt; 1014 if (!xprt && STAILQ_EMPTY(&st->st_reqs)) { 1015 /* 1016 * Enforce maxthreads count. 1017 */ 1018 if (pool->sp_threadcount > pool->sp_maxthreads) 1019 break; 1020 1021 /* 1022 * Before sleeping, see if we can find an 1023 * active transport which isn't being serviced 1024 * by a thread. 1025 */ 1026 if (svc_request_space_available(pool)) { 1027 TAILQ_FOREACH(xprt, &pool->sp_active, 1028 xp_alink) { 1029 if (!xprt->xp_thread) { 1030 SVC_ACQUIRE(xprt); 1031 xprt->xp_thread = st; 1032 st->st_xprt = xprt; 1033 break; 1034 } 1035 } 1036 } 1037 if (st->st_xprt) 1038 continue; 1039 1040 LIST_INSERT_HEAD(&pool->sp_idlethreads, st, st_ilink); 1041 error = cv_timedwait_sig(&st->st_cond, &pool->sp_lock, 1042 5 * hz); 1043 LIST_REMOVE(st, st_ilink); 1044 1045 /* 1046 * Reduce worker thread count when idle. 1047 */ 1048 if (error == EWOULDBLOCK) { 1049 if (!ismaster 1050 && (pool->sp_threadcount 1051 > pool->sp_minthreads) 1052 && !st->st_xprt 1053 && STAILQ_EMPTY(&st->st_reqs)) 1054 break; 1055 } 1056 if (error == EWOULDBLOCK) 1057 continue; 1058 if (error) { 1059 if (pool->sp_state != SVCPOOL_CLOSING) { 1060 mtx_unlock(&pool->sp_lock); 1061 svc_exit(pool); 1062 mtx_lock(&pool->sp_lock); 1063 } 1064 break; 1065 } 1066 1067 if (pool->sp_state == SVCPOOL_THREADWANTED) { 1068 pool->sp_state = SVCPOOL_THREADSTARTING; 1069 pool->sp_lastcreatetime = time_uptime; 1070 mtx_unlock(&pool->sp_lock); 1071 svc_new_thread(pool); 1072 mtx_lock(&pool->sp_lock); 1073 } 1074 continue; 1075 } 1076 1077 if (xprt) { 1078 /* 1079 * Drain the transport socket and queue up any 1080 * RPCs. 1081 */ 1082 xprt->xp_lastactive = time_uptime; 1083 stat = XPRT_IDLE; 1084 do { 1085 if (!svc_request_space_available(pool)) 1086 break; 1087 rqstp = NULL; 1088 mtx_unlock(&pool->sp_lock); 1089 stat = svc_getreq(xprt, &rqstp); 1090 mtx_lock(&pool->sp_lock); 1091 if (rqstp) { 1092 /* 1093 * See if the application has 1094 * a preference for some other 1095 * thread. 1096 */ 1097 stpref = st; 1098 if (pool->sp_assign) 1099 stpref = pool->sp_assign(st, 1100 rqstp); 1101 1102 pool->sp_space_used += 1103 rqstp->rq_size; 1104 if (pool->sp_space_used 1105 > pool->sp_space_used_highest) 1106 pool->sp_space_used_highest = 1107 pool->sp_space_used; 1108 rqstp->rq_thread = stpref; 1109 STAILQ_INSERT_TAIL(&stpref->st_reqs, 1110 rqstp, rq_link); 1111 stpref->st_reqcount++; 1112 1113 /* 1114 * If we assigned the request 1115 * to another thread, make 1116 * sure its awake and continue 1117 * reading from the 1118 * socket. Otherwise, try to 1119 * find some other thread to 1120 * read from the socket and 1121 * execute the request 1122 * immediately. 1123 */ 1124 if (stpref != st) { 1125 cv_signal(&stpref->st_cond); 1126 continue; 1127 } else { 1128 break; 1129 } 1130 } 1131 } while (stat == XPRT_MOREREQS 1132 && pool->sp_state != SVCPOOL_CLOSING); 1133 1134 /* 1135 * Move this transport to the end of the 1136 * active list to ensure fairness when 1137 * multiple transports are active. If this was 1138 * the last queued request, svc_getreq will 1139 * end up calling xprt_inactive to remove from 1140 * the active list. 1141 */ 1142 xprt->xp_thread = NULL; 1143 st->st_xprt = NULL; 1144 if (xprt->xp_active) { 1145 xprt_assignthread(xprt); 1146 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink); 1147 TAILQ_INSERT_TAIL(&pool->sp_active, xprt, 1148 xp_alink); 1149 } 1150 mtx_unlock(&pool->sp_lock); 1151 SVC_RELEASE(xprt); 1152 mtx_lock(&pool->sp_lock); 1153 } 1154 1155 /* 1156 * Execute what we have queued. 1157 */ 1158 while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) { 1159 size_t sz = rqstp->rq_size; 1160 mtx_unlock(&pool->sp_lock); 1161 svc_executereq(rqstp); 1162 mtx_lock(&pool->sp_lock); 1163 pool->sp_space_used -= sz; 1164 } 1165 } 1166 1167 if (st->st_xprt) { 1168 xprt = st->st_xprt; 1169 st->st_xprt = NULL; 1170 SVC_RELEASE(xprt); 1171 } 1172 1173 KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit")); 1174 LIST_REMOVE(st, st_link); 1175 pool->sp_threadcount--; 1176 1177 mtx_unlock(&pool->sp_lock); 1178 1179 cv_destroy(&st->st_cond); 1180 mem_free(st, sizeof(*st)); 1181 1182 if (!ismaster) 1183 wakeup(pool); 1184} 1185 1186static void 1187svc_thread_start(void *arg) 1188{ 1189 1190 svc_run_internal((SVCPOOL *) arg, FALSE); 1191 kthread_exit(); 1192} 1193 1194static void 1195svc_new_thread(SVCPOOL *pool) 1196{ 1197 struct thread *td; 1198 1199 pool->sp_threadcount++; 1200 kthread_add(svc_thread_start, pool, 1201 pool->sp_proc, &td, 0, 0, 1202 "%s: service", pool->sp_name); 1203} 1204 1205void 1206svc_run(SVCPOOL *pool) 1207{ 1208 int i; 1209 struct proc *p; 1210 struct thread *td; 1211 1212 p = curproc; 1213 td = curthread; 1214 snprintf(td->td_name, sizeof(td->td_name), 1215 "%s: master", pool->sp_name); 1216 pool->sp_state = SVCPOOL_ACTIVE; 1217 pool->sp_proc = p; 1218 pool->sp_lastcreatetime = time_uptime; 1219 pool->sp_threadcount = 1; 1220 1221 for (i = 1; i < pool->sp_minthreads; i++) { 1222 svc_new_thread(pool); 1223 } 1224 1225 svc_run_internal(pool, TRUE); 1226 1227 mtx_lock(&pool->sp_lock); 1228 while (pool->sp_threadcount > 0) 1229 msleep(pool, &pool->sp_lock, 0, "svcexit", 0); 1230 mtx_unlock(&pool->sp_lock); 1231} 1232 1233void 1234svc_exit(SVCPOOL *pool) 1235{ 1236 SVCTHREAD *st; 1237 1238 mtx_lock(&pool->sp_lock); 1239 1240 pool->sp_state = SVCPOOL_CLOSING; 1241 LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink) 1242 cv_signal(&st->st_cond); 1243 1244 mtx_unlock(&pool->sp_lock); 1245} 1246 1247bool_t 1248svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args) 1249{ 1250 struct mbuf *m; 1251 XDR xdrs; 1252 bool_t stat; 1253 1254 m = rqstp->rq_args; 1255 rqstp->rq_args = NULL; 1256 1257 xdrmbuf_create(&xdrs, m, XDR_DECODE); 1258 stat = xargs(&xdrs, args); 1259 XDR_DESTROY(&xdrs); 1260 1261 return (stat); 1262} 1263 1264bool_t 1265svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args) 1266{ 1267 XDR xdrs; 1268 1269 if (rqstp->rq_addr) { 1270 free(rqstp->rq_addr, M_SONAME); 1271 rqstp->rq_addr = NULL; 1272 } 1273 1274 xdrs.x_op = XDR_FREE; 1275 return (xargs(&xdrs, args)); 1276} 1277 1278void 1279svc_freereq(struct svc_req *rqstp) 1280{ 1281 SVCTHREAD *st; 1282 SVCXPRT *xprt; 1283 SVCPOOL *pool; 1284 1285 st = rqstp->rq_thread; 1286 xprt = rqstp->rq_xprt; 1287 if (xprt) 1288 pool = xprt->xp_pool; 1289 else 1290 pool = NULL; 1291 if (st) { 1292 mtx_lock(&pool->sp_lock); 1293 KASSERT(rqstp == STAILQ_FIRST(&st->st_reqs), 1294 ("Freeing request out of order")); 1295 STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link); 1296 st->st_reqcount--; 1297 if (pool->sp_done) 1298 pool->sp_done(st, rqstp); 1299 mtx_unlock(&pool->sp_lock); 1300 } 1301 1302 if (rqstp->rq_auth.svc_ah_ops) 1303 SVCAUTH_RELEASE(&rqstp->rq_auth); 1304 1305 if (rqstp->rq_xprt) { 1306 SVC_RELEASE(rqstp->rq_xprt); 1307 } 1308 1309 if (rqstp->rq_addr) 1310 free(rqstp->rq_addr, M_SONAME); 1311 1312 if (rqstp->rq_args) 1313 m_freem(rqstp->rq_args); 1314 1315 free(rqstp, M_RPC); 1316} 1317