clnt_dg.c revision 259984
1/* $NetBSD: clnt_dg.c,v 1.4 2000/07/14 08:40:41 fvdl Exp $ */ 2 3/* 4 * Sun RPC is a product of Sun Microsystems, Inc. and is provided for 5 * unrestricted use provided that this legend is included on all tape 6 * media and as a part of the software program in whole or part. Users 7 * may copy or modify Sun RPC without charge, but are not authorized 8 * to license or distribute it to anyone else except as part of a product or 9 * program developed by the user. 10 * 11 * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE 12 * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR 13 * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE. 14 * 15 * Sun RPC is provided with no support and without any obligation on the 16 * part of Sun Microsystems, Inc. to assist in its use, correction, 17 * modification or enhancement. 18 * 19 * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE 20 * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC 21 * OR ANY PART THEREOF. 22 * 23 * In no event will Sun Microsystems, Inc. be liable for any lost revenue 24 * or profits or other special, indirect and consequential damages, even if 25 * Sun has been advised of the possibility of such damages. 26 * 27 * Sun Microsystems, Inc. 28 * 2550 Garcia Avenue 29 * Mountain View, California 94043 30 */ 31/* 32 * Copyright (c) 1986-1991 by Sun Microsystems Inc. 33 */ 34 35#if defined(LIBC_SCCS) && !defined(lint) 36#ident "@(#)clnt_dg.c 1.23 94/04/22 SMI" 37static char sccsid[] = "@(#)clnt_dg.c 1.19 89/03/16 Copyr 1988 Sun Micro"; 38#endif 39#include <sys/cdefs.h> 40__FBSDID("$FreeBSD: stable/10/sys/rpc/clnt_dg.c 259984 2013-12-28 01:26:26Z dim $"); 41 42/* 43 * Implements a connectionless client side RPC. 44 */ 45 46#include <sys/param.h> 47#include <sys/systm.h> 48#include <sys/kernel.h> 49#include <sys/lock.h> 50#include <sys/malloc.h> 51#include <sys/mbuf.h> 52#include <sys/mutex.h> 53#include <sys/pcpu.h> 54#include <sys/proc.h> 55#include <sys/socket.h> 56#include <sys/socketvar.h> 57#include <sys/time.h> 58#include <sys/uio.h> 59 60#include <net/vnet.h> 61 62#include <rpc/rpc.h> 63#include <rpc/rpc_com.h> 64 65 66#ifdef _FREEFALL_CONFIG 67/* 68 * Disable RPC exponential back-off for FreeBSD.org systems. 69 */ 70#define RPC_MAX_BACKOFF 1 /* second */ 71#else 72#define RPC_MAX_BACKOFF 30 /* seconds */ 73#endif 74 75static bool_t time_not_ok(struct timeval *); 76static enum clnt_stat clnt_dg_call(CLIENT *, struct rpc_callextra *, 77 rpcproc_t, struct mbuf *, struct mbuf **, struct timeval); 78static void clnt_dg_geterr(CLIENT *, struct rpc_err *); 79static bool_t clnt_dg_freeres(CLIENT *, xdrproc_t, void *); 80static void clnt_dg_abort(CLIENT *); 81static bool_t clnt_dg_control(CLIENT *, u_int, void *); 82static void clnt_dg_close(CLIENT *); 83static void clnt_dg_destroy(CLIENT *); 84static int clnt_dg_soupcall(struct socket *so, void *arg, int waitflag); 85 86static struct clnt_ops clnt_dg_ops = { 87 .cl_call = clnt_dg_call, 88 .cl_abort = clnt_dg_abort, 89 .cl_geterr = clnt_dg_geterr, 90 .cl_freeres = clnt_dg_freeres, 91 .cl_close = clnt_dg_close, 92 .cl_destroy = clnt_dg_destroy, 93 .cl_control = clnt_dg_control 94}; 95 96/* 97 * A pending RPC request which awaits a reply. Requests which have 98 * received their reply will have cr_xid set to zero and cr_mrep to 99 * the mbuf chain of the reply. 100 */ 101struct cu_request { 102 TAILQ_ENTRY(cu_request) cr_link; 103 CLIENT *cr_client; /* owner */ 104 uint32_t cr_xid; /* XID of request */ 105 struct mbuf *cr_mrep; /* reply received by upcall */ 106 int cr_error; /* any error from upcall */ 107 char cr_verf[MAX_AUTH_BYTES]; /* reply verf */ 108}; 109 110TAILQ_HEAD(cu_request_list, cu_request); 111 112#define MCALL_MSG_SIZE 24 113 114/* 115 * This structure is pointed to by the socket buffer's sb_upcallarg 116 * member. It is separate from the client private data to facilitate 117 * multiple clients sharing the same socket. The cs_lock mutex is used 118 * to protect all fields of this structure, the socket's receive 119 * buffer SOCKBUF_LOCK is used to ensure that exactly one of these 120 * structures is installed on the socket. 121 */ 122struct cu_socket { 123 struct mtx cs_lock; 124 int cs_refs; /* Count of clients */ 125 struct cu_request_list cs_pending; /* Requests awaiting replies */ 126 int cs_upcallrefs; /* Refcnt of upcalls in prog.*/ 127}; 128 129static void clnt_dg_upcallsdone(struct socket *, struct cu_socket *); 130 131/* 132 * Private data kept per client handle 133 */ 134struct cu_data { 135 int cu_threads; /* # threads in clnt_vc_call */ 136 bool_t cu_closing; /* TRUE if we are closing */ 137 bool_t cu_closed; /* TRUE if we are closed */ 138 struct socket *cu_socket; /* connection socket */ 139 bool_t cu_closeit; /* opened by library */ 140 struct sockaddr_storage cu_raddr; /* remote address */ 141 int cu_rlen; 142 struct timeval cu_wait; /* retransmit interval */ 143 struct timeval cu_total; /* total time for the call */ 144 struct rpc_err cu_error; 145 uint32_t cu_xid; 146 char cu_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */ 147 size_t cu_mcalllen; 148 size_t cu_sendsz; /* send size */ 149 size_t cu_recvsz; /* recv size */ 150 int cu_async; 151 int cu_connect; /* Use connect(). */ 152 int cu_connected; /* Have done connect(). */ 153 const char *cu_waitchan; 154 int cu_waitflag; 155 int cu_cwnd; /* congestion window */ 156 int cu_sent; /* number of in-flight RPCs */ 157 bool_t cu_cwnd_wait; 158}; 159 160#define CWNDSCALE 256 161#define MAXCWND (32 * CWNDSCALE) 162 163/* 164 * Connection less client creation returns with client handle parameters. 165 * Default options are set, which the user can change using clnt_control(). 166 * fd should be open and bound. 167 * NB: The rpch->cl_auth is initialized to null authentication. 168 * Caller may wish to set this something more useful. 169 * 170 * sendsz and recvsz are the maximum allowable packet sizes that can be 171 * sent and received. Normally they are the same, but they can be 172 * changed to improve the program efficiency and buffer allocation. 173 * If they are 0, use the transport default. 174 * 175 * If svcaddr is NULL, returns NULL. 176 */ 177CLIENT * 178clnt_dg_create( 179 struct socket *so, 180 struct sockaddr *svcaddr, /* servers address */ 181 rpcprog_t program, /* program number */ 182 rpcvers_t version, /* version number */ 183 size_t sendsz, /* buffer recv size */ 184 size_t recvsz) /* buffer send size */ 185{ 186 CLIENT *cl = NULL; /* client handle */ 187 struct cu_data *cu = NULL; /* private data */ 188 struct cu_socket *cs = NULL; 189 struct sockbuf *sb; 190 struct timeval now; 191 struct rpc_msg call_msg; 192 struct __rpc_sockinfo si; 193 XDR xdrs; 194 int error; 195 196 if (svcaddr == NULL) { 197 rpc_createerr.cf_stat = RPC_UNKNOWNADDR; 198 return (NULL); 199 } 200 201 if (!__rpc_socket2sockinfo(so, &si)) { 202 rpc_createerr.cf_stat = RPC_TLIERROR; 203 rpc_createerr.cf_error.re_errno = 0; 204 return (NULL); 205 } 206 207 /* 208 * Find the receive and the send size 209 */ 210 sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz); 211 recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz); 212 if ((sendsz == 0) || (recvsz == 0)) { 213 rpc_createerr.cf_stat = RPC_TLIERROR; /* XXX */ 214 rpc_createerr.cf_error.re_errno = 0; 215 return (NULL); 216 } 217 218 cl = mem_alloc(sizeof (CLIENT)); 219 220 /* 221 * Should be multiple of 4 for XDR. 222 */ 223 sendsz = ((sendsz + 3) / 4) * 4; 224 recvsz = ((recvsz + 3) / 4) * 4; 225 cu = mem_alloc(sizeof (*cu)); 226 cu->cu_threads = 0; 227 cu->cu_closing = FALSE; 228 cu->cu_closed = FALSE; 229 (void) memcpy(&cu->cu_raddr, svcaddr, (size_t)svcaddr->sa_len); 230 cu->cu_rlen = svcaddr->sa_len; 231 /* Other values can also be set through clnt_control() */ 232 cu->cu_wait.tv_sec = 3; /* heuristically chosen */ 233 cu->cu_wait.tv_usec = 0; 234 cu->cu_total.tv_sec = -1; 235 cu->cu_total.tv_usec = -1; 236 cu->cu_sendsz = sendsz; 237 cu->cu_recvsz = recvsz; 238 cu->cu_async = FALSE; 239 cu->cu_connect = FALSE; 240 cu->cu_connected = FALSE; 241 cu->cu_waitchan = "rpcrecv"; 242 cu->cu_waitflag = 0; 243 cu->cu_cwnd = MAXCWND / 2; 244 cu->cu_sent = 0; 245 cu->cu_cwnd_wait = FALSE; 246 (void) getmicrotime(&now); 247 cu->cu_xid = __RPC_GETXID(&now); 248 call_msg.rm_xid = cu->cu_xid; 249 call_msg.rm_call.cb_prog = program; 250 call_msg.rm_call.cb_vers = version; 251 xdrmem_create(&xdrs, cu->cu_mcallc, MCALL_MSG_SIZE, XDR_ENCODE); 252 if (! xdr_callhdr(&xdrs, &call_msg)) { 253 rpc_createerr.cf_stat = RPC_CANTENCODEARGS; /* XXX */ 254 rpc_createerr.cf_error.re_errno = 0; 255 goto err2; 256 } 257 cu->cu_mcalllen = XDR_GETPOS(&xdrs); 258 259 /* 260 * By default, closeit is always FALSE. It is users responsibility 261 * to do a close on it, else the user may use clnt_control 262 * to let clnt_destroy do it for him/her. 263 */ 264 cu->cu_closeit = FALSE; 265 cu->cu_socket = so; 266 error = soreserve(so, (u_long)sendsz, (u_long)recvsz); 267 if (error != 0) { 268 rpc_createerr.cf_stat = RPC_FAILED; 269 rpc_createerr.cf_error.re_errno = error; 270 goto err2; 271 } 272 273 sb = &so->so_rcv; 274 SOCKBUF_LOCK(&so->so_rcv); 275recheck_socket: 276 if (sb->sb_upcall) { 277 if (sb->sb_upcall != clnt_dg_soupcall) { 278 SOCKBUF_UNLOCK(&so->so_rcv); 279 printf("clnt_dg_create(): socket already has an incompatible upcall\n"); 280 goto err2; 281 } 282 cs = (struct cu_socket *) sb->sb_upcallarg; 283 mtx_lock(&cs->cs_lock); 284 cs->cs_refs++; 285 mtx_unlock(&cs->cs_lock); 286 } else { 287 /* 288 * We are the first on this socket - allocate the 289 * structure and install it in the socket. 290 */ 291 SOCKBUF_UNLOCK(&so->so_rcv); 292 cs = mem_alloc(sizeof(*cs)); 293 SOCKBUF_LOCK(&so->so_rcv); 294 if (sb->sb_upcall) { 295 /* 296 * We have lost a race with some other client. 297 */ 298 mem_free(cs, sizeof(*cs)); 299 goto recheck_socket; 300 } 301 mtx_init(&cs->cs_lock, "cs->cs_lock", NULL, MTX_DEF); 302 cs->cs_refs = 1; 303 cs->cs_upcallrefs = 0; 304 TAILQ_INIT(&cs->cs_pending); 305 soupcall_set(so, SO_RCV, clnt_dg_soupcall, cs); 306 } 307 SOCKBUF_UNLOCK(&so->so_rcv); 308 309 cl->cl_refs = 1; 310 cl->cl_ops = &clnt_dg_ops; 311 cl->cl_private = (caddr_t)(void *)cu; 312 cl->cl_auth = authnone_create(); 313 cl->cl_tp = NULL; 314 cl->cl_netid = NULL; 315 return (cl); 316err2: 317 if (cl) { 318 mem_free(cl, sizeof (CLIENT)); 319 if (cu) 320 mem_free(cu, sizeof (*cu)); 321 } 322 return (NULL); 323} 324 325static enum clnt_stat 326clnt_dg_call( 327 CLIENT *cl, /* client handle */ 328 struct rpc_callextra *ext, /* call metadata */ 329 rpcproc_t proc, /* procedure number */ 330 struct mbuf *args, /* pointer to args */ 331 struct mbuf **resultsp, /* pointer to results */ 332 struct timeval utimeout) /* seconds to wait before giving up */ 333{ 334 struct cu_data *cu = (struct cu_data *)cl->cl_private; 335 struct cu_socket *cs; 336 struct rpc_timers *rt; 337 AUTH *auth; 338 struct rpc_err *errp; 339 enum clnt_stat stat; 340 XDR xdrs; 341 struct rpc_msg reply_msg; 342 bool_t ok; 343 int retrans; /* number of re-transmits so far */ 344 int nrefreshes = 2; /* number of times to refresh cred */ 345 struct timeval *tvp; 346 int timeout; 347 int retransmit_time; 348 int next_sendtime, starttime, rtt, time_waited, tv = 0; 349 struct sockaddr *sa; 350 socklen_t salen; 351 uint32_t xid = 0; 352 struct mbuf *mreq = NULL, *results; 353 struct cu_request *cr; 354 int error; 355 356 cs = cu->cu_socket->so_rcv.sb_upcallarg; 357 cr = malloc(sizeof(struct cu_request), M_RPC, M_WAITOK); 358 359 mtx_lock(&cs->cs_lock); 360 361 if (cu->cu_closing || cu->cu_closed) { 362 mtx_unlock(&cs->cs_lock); 363 free(cr, M_RPC); 364 return (RPC_CANTSEND); 365 } 366 cu->cu_threads++; 367 368 if (ext) { 369 auth = ext->rc_auth; 370 errp = &ext->rc_err; 371 } else { 372 auth = cl->cl_auth; 373 errp = &cu->cu_error; 374 } 375 376 cr->cr_client = cl; 377 cr->cr_mrep = NULL; 378 cr->cr_error = 0; 379 380 if (cu->cu_total.tv_usec == -1) { 381 tvp = &utimeout; /* use supplied timeout */ 382 } else { 383 tvp = &cu->cu_total; /* use default timeout */ 384 } 385 if (tvp->tv_sec || tvp->tv_usec) 386 timeout = tvtohz(tvp); 387 else 388 timeout = 0; 389 390 if (cu->cu_connect && !cu->cu_connected) { 391 mtx_unlock(&cs->cs_lock); 392 error = soconnect(cu->cu_socket, 393 (struct sockaddr *)&cu->cu_raddr, curthread); 394 mtx_lock(&cs->cs_lock); 395 if (error) { 396 errp->re_errno = error; 397 errp->re_status = stat = RPC_CANTSEND; 398 goto out; 399 } 400 cu->cu_connected = 1; 401 } 402 if (cu->cu_connected) { 403 sa = NULL; 404 salen = 0; 405 } else { 406 sa = (struct sockaddr *)&cu->cu_raddr; 407 salen = cu->cu_rlen; 408 } 409 time_waited = 0; 410 retrans = 0; 411 if (ext && ext->rc_timers) { 412 rt = ext->rc_timers; 413 if (!rt->rt_rtxcur) 414 rt->rt_rtxcur = tvtohz(&cu->cu_wait); 415 retransmit_time = next_sendtime = rt->rt_rtxcur; 416 } else { 417 rt = NULL; 418 retransmit_time = next_sendtime = tvtohz(&cu->cu_wait); 419 } 420 421 starttime = ticks; 422 423call_again: 424 mtx_assert(&cs->cs_lock, MA_OWNED); 425 426 cu->cu_xid++; 427 xid = cu->cu_xid; 428 429send_again: 430 mtx_unlock(&cs->cs_lock); 431 432 mreq = m_gethdr(M_WAITOK, MT_DATA); 433 KASSERT(cu->cu_mcalllen <= MHLEN, ("RPC header too big")); 434 bcopy(cu->cu_mcallc, mreq->m_data, cu->cu_mcalllen); 435 mreq->m_len = cu->cu_mcalllen; 436 437 /* 438 * The XID is the first thing in the request. 439 */ 440 *mtod(mreq, uint32_t *) = htonl(xid); 441 442 xdrmbuf_create(&xdrs, mreq, XDR_ENCODE); 443 444 if (cu->cu_async == TRUE && args == NULL) 445 goto get_reply; 446 447 if ((! XDR_PUTINT32(&xdrs, &proc)) || 448 (! AUTH_MARSHALL(auth, xid, &xdrs, 449 m_copym(args, 0, M_COPYALL, M_WAITOK)))) { 450 errp->re_status = stat = RPC_CANTENCODEARGS; 451 mtx_lock(&cs->cs_lock); 452 goto out; 453 } 454 mreq->m_pkthdr.len = m_length(mreq, NULL); 455 456 cr->cr_xid = xid; 457 mtx_lock(&cs->cs_lock); 458 459 /* 460 * Try to get a place in the congestion window. 461 */ 462 while (cu->cu_sent >= cu->cu_cwnd) { 463 cu->cu_cwnd_wait = TRUE; 464 error = msleep(&cu->cu_cwnd_wait, &cs->cs_lock, 465 cu->cu_waitflag, "rpccwnd", 0); 466 if (error) { 467 errp->re_errno = error; 468 if (error == EINTR || error == ERESTART) 469 errp->re_status = stat = RPC_INTR; 470 else 471 errp->re_status = stat = RPC_CANTSEND; 472 goto out; 473 } 474 } 475 cu->cu_sent += CWNDSCALE; 476 477 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); 478 mtx_unlock(&cs->cs_lock); 479 480 /* 481 * sosend consumes mreq. 482 */ 483 error = sosend(cu->cu_socket, sa, NULL, mreq, NULL, 0, curthread); 484 mreq = NULL; 485 486 /* 487 * sub-optimal code appears here because we have 488 * some clock time to spare while the packets are in flight. 489 * (We assume that this is actually only executed once.) 490 */ 491 reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL; 492 reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf; 493 reply_msg.acpted_rply.ar_verf.oa_length = 0; 494 reply_msg.acpted_rply.ar_results.where = NULL; 495 reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void; 496 497 mtx_lock(&cs->cs_lock); 498 if (error) { 499 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 500 errp->re_errno = error; 501 errp->re_status = stat = RPC_CANTSEND; 502 cu->cu_sent -= CWNDSCALE; 503 if (cu->cu_cwnd_wait) { 504 cu->cu_cwnd_wait = FALSE; 505 wakeup(&cu->cu_cwnd_wait); 506 } 507 goto out; 508 } 509 510 /* 511 * Check to see if we got an upcall while waiting for the 512 * lock. 513 */ 514 if (cr->cr_error) { 515 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 516 errp->re_errno = cr->cr_error; 517 errp->re_status = stat = RPC_CANTRECV; 518 cu->cu_sent -= CWNDSCALE; 519 if (cu->cu_cwnd_wait) { 520 cu->cu_cwnd_wait = FALSE; 521 wakeup(&cu->cu_cwnd_wait); 522 } 523 goto out; 524 } 525 if (cr->cr_mrep) { 526 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 527 cu->cu_sent -= CWNDSCALE; 528 if (cu->cu_cwnd_wait) { 529 cu->cu_cwnd_wait = FALSE; 530 wakeup(&cu->cu_cwnd_wait); 531 } 532 goto got_reply; 533 } 534 535 /* 536 * Hack to provide rpc-based message passing 537 */ 538 if (timeout == 0) { 539 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 540 errp->re_status = stat = RPC_TIMEDOUT; 541 cu->cu_sent -= CWNDSCALE; 542 if (cu->cu_cwnd_wait) { 543 cu->cu_cwnd_wait = FALSE; 544 wakeup(&cu->cu_cwnd_wait); 545 } 546 goto out; 547 } 548 549get_reply: 550 for (;;) { 551 /* Decide how long to wait. */ 552 if (next_sendtime < timeout) 553 tv = next_sendtime; 554 else 555 tv = timeout; 556 tv -= time_waited; 557 558 if (tv > 0) { 559 if (cu->cu_closing || cu->cu_closed) { 560 error = 0; 561 cr->cr_error = ESHUTDOWN; 562 } else { 563 error = msleep(cr, &cs->cs_lock, 564 cu->cu_waitflag, cu->cu_waitchan, tv); 565 } 566 } else { 567 error = EWOULDBLOCK; 568 } 569 570 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link); 571 cu->cu_sent -= CWNDSCALE; 572 if (cu->cu_cwnd_wait) { 573 cu->cu_cwnd_wait = FALSE; 574 wakeup(&cu->cu_cwnd_wait); 575 } 576 577 if (!error) { 578 /* 579 * We were woken up by the upcall. If the 580 * upcall had a receive error, report that, 581 * otherwise we have a reply. 582 */ 583 if (cr->cr_error) { 584 errp->re_errno = cr->cr_error; 585 errp->re_status = stat = RPC_CANTRECV; 586 goto out; 587 } 588 589 cu->cu_cwnd += (CWNDSCALE * CWNDSCALE 590 + cu->cu_cwnd / 2) / cu->cu_cwnd; 591 if (cu->cu_cwnd > MAXCWND) 592 cu->cu_cwnd = MAXCWND; 593 594 if (rt) { 595 /* 596 * Add one to the time since a tick 597 * count of N means that the actual 598 * time taken was somewhere between N 599 * and N+1. 600 */ 601 rtt = ticks - starttime + 1; 602 603 /* 604 * Update our estimate of the round 605 * trip time using roughly the 606 * algorithm described in RFC 607 * 2988. Given an RTT sample R: 608 * 609 * RTTVAR = (1-beta) * RTTVAR + beta * |SRTT-R| 610 * SRTT = (1-alpha) * SRTT + alpha * R 611 * 612 * where alpha = 0.125 and beta = 0.25. 613 * 614 * The initial retransmit timeout is 615 * SRTT + 4*RTTVAR and doubles on each 616 * retransmision. 617 */ 618 if (rt->rt_srtt == 0) { 619 rt->rt_srtt = rtt; 620 rt->rt_deviate = rtt / 2; 621 } else { 622 int32_t error = rtt - rt->rt_srtt; 623 rt->rt_srtt += error / 8; 624 error = abs(error) - rt->rt_deviate; 625 rt->rt_deviate += error / 4; 626 } 627 rt->rt_rtxcur = rt->rt_srtt + 4*rt->rt_deviate; 628 } 629 630 break; 631 } 632 633 /* 634 * The sleep returned an error so our request is still 635 * on the list. If we got EWOULDBLOCK, we may want to 636 * re-send the request. 637 */ 638 if (error != EWOULDBLOCK) { 639 errp->re_errno = error; 640 if (error == EINTR || error == ERESTART) 641 errp->re_status = stat = RPC_INTR; 642 else 643 errp->re_status = stat = RPC_CANTRECV; 644 goto out; 645 } 646 647 time_waited = ticks - starttime; 648 649 /* Check for timeout. */ 650 if (time_waited > timeout) { 651 errp->re_errno = EWOULDBLOCK; 652 errp->re_status = stat = RPC_TIMEDOUT; 653 goto out; 654 } 655 656 /* Retransmit if necessary. */ 657 if (time_waited >= next_sendtime) { 658 cu->cu_cwnd /= 2; 659 if (cu->cu_cwnd < CWNDSCALE) 660 cu->cu_cwnd = CWNDSCALE; 661 if (ext && ext->rc_feedback) { 662 mtx_unlock(&cs->cs_lock); 663 if (retrans == 0) 664 ext->rc_feedback(FEEDBACK_REXMIT1, 665 proc, ext->rc_feedback_arg); 666 else 667 ext->rc_feedback(FEEDBACK_REXMIT2, 668 proc, ext->rc_feedback_arg); 669 mtx_lock(&cs->cs_lock); 670 } 671 if (cu->cu_closing || cu->cu_closed) { 672 errp->re_errno = ESHUTDOWN; 673 errp->re_status = stat = RPC_CANTRECV; 674 goto out; 675 } 676 retrans++; 677 /* update retransmit_time */ 678 if (retransmit_time < RPC_MAX_BACKOFF * hz) 679 retransmit_time = 2 * retransmit_time; 680 next_sendtime += retransmit_time; 681 goto send_again; 682 } 683 cu->cu_sent += CWNDSCALE; 684 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link); 685 } 686 687got_reply: 688 /* 689 * Now decode and validate the response. We need to drop the 690 * lock since xdr_replymsg may end up sleeping in malloc. 691 */ 692 mtx_unlock(&cs->cs_lock); 693 694 if (ext && ext->rc_feedback) 695 ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg); 696 697 xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE); 698 ok = xdr_replymsg(&xdrs, &reply_msg); 699 cr->cr_mrep = NULL; 700 701 if (ok) { 702 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) && 703 (reply_msg.acpted_rply.ar_stat == SUCCESS)) 704 errp->re_status = stat = RPC_SUCCESS; 705 else 706 stat = _seterr_reply(&reply_msg, &(cu->cu_error)); 707 708 if (errp->re_status == RPC_SUCCESS) { 709 results = xdrmbuf_getall(&xdrs); 710 if (! AUTH_VALIDATE(auth, xid, 711 &reply_msg.acpted_rply.ar_verf, 712 &results)) { 713 errp->re_status = stat = RPC_AUTHERROR; 714 errp->re_why = AUTH_INVALIDRESP; 715 if (retrans && 716 auth->ah_cred.oa_flavor == RPCSEC_GSS) { 717 /* 718 * If we retransmitted, its 719 * possible that we will 720 * receive a reply for one of 721 * the earlier transmissions 722 * (which will use an older 723 * RPCSEC_GSS sequence 724 * number). In this case, just 725 * go back and listen for a 726 * new reply. We could keep a 727 * record of all the seq 728 * numbers we have transmitted 729 * so far so that we could 730 * accept a reply for any of 731 * them here. 732 */ 733 XDR_DESTROY(&xdrs); 734 mtx_lock(&cs->cs_lock); 735 cu->cu_sent += CWNDSCALE; 736 TAILQ_INSERT_TAIL(&cs->cs_pending, 737 cr, cr_link); 738 cr->cr_mrep = NULL; 739 goto get_reply; 740 } 741 } else { 742 *resultsp = results; 743 } 744 } /* end successful completion */ 745 /* 746 * If unsuccesful AND error is an authentication error 747 * then refresh credentials and try again, else break 748 */ 749 else if (stat == RPC_AUTHERROR) 750 /* maybe our credentials need to be refreshed ... */ 751 if (nrefreshes > 0 && 752 AUTH_REFRESH(auth, &reply_msg)) { 753 nrefreshes--; 754 XDR_DESTROY(&xdrs); 755 mtx_lock(&cs->cs_lock); 756 goto call_again; 757 } 758 /* end of unsuccessful completion */ 759 } /* end of valid reply message */ 760 else { 761 errp->re_status = stat = RPC_CANTDECODERES; 762 763 } 764 XDR_DESTROY(&xdrs); 765 mtx_lock(&cs->cs_lock); 766out: 767 mtx_assert(&cs->cs_lock, MA_OWNED); 768 769 if (mreq) 770 m_freem(mreq); 771 if (cr->cr_mrep) 772 m_freem(cr->cr_mrep); 773 774 cu->cu_threads--; 775 if (cu->cu_closing) 776 wakeup(cu); 777 778 mtx_unlock(&cs->cs_lock); 779 780 if (auth && stat != RPC_SUCCESS) 781 AUTH_VALIDATE(auth, xid, NULL, NULL); 782 783 free(cr, M_RPC); 784 785 return (stat); 786} 787 788static void 789clnt_dg_geterr(CLIENT *cl, struct rpc_err *errp) 790{ 791 struct cu_data *cu = (struct cu_data *)cl->cl_private; 792 793 *errp = cu->cu_error; 794} 795 796static bool_t 797clnt_dg_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr) 798{ 799 XDR xdrs; 800 bool_t dummy; 801 802 xdrs.x_op = XDR_FREE; 803 dummy = (*xdr_res)(&xdrs, res_ptr); 804 805 return (dummy); 806} 807 808/*ARGSUSED*/ 809static void 810clnt_dg_abort(CLIENT *h) 811{ 812} 813 814static bool_t 815clnt_dg_control(CLIENT *cl, u_int request, void *info) 816{ 817 struct cu_data *cu = (struct cu_data *)cl->cl_private; 818 struct cu_socket *cs; 819 struct sockaddr *addr; 820 821 cs = cu->cu_socket->so_rcv.sb_upcallarg; 822 mtx_lock(&cs->cs_lock); 823 824 switch (request) { 825 case CLSET_FD_CLOSE: 826 cu->cu_closeit = TRUE; 827 mtx_unlock(&cs->cs_lock); 828 return (TRUE); 829 case CLSET_FD_NCLOSE: 830 cu->cu_closeit = FALSE; 831 mtx_unlock(&cs->cs_lock); 832 return (TRUE); 833 } 834 835 /* for other requests which use info */ 836 if (info == NULL) { 837 mtx_unlock(&cs->cs_lock); 838 return (FALSE); 839 } 840 switch (request) { 841 case CLSET_TIMEOUT: 842 if (time_not_ok((struct timeval *)info)) { 843 mtx_unlock(&cs->cs_lock); 844 return (FALSE); 845 } 846 cu->cu_total = *(struct timeval *)info; 847 break; 848 case CLGET_TIMEOUT: 849 *(struct timeval *)info = cu->cu_total; 850 break; 851 case CLSET_RETRY_TIMEOUT: 852 if (time_not_ok((struct timeval *)info)) { 853 mtx_unlock(&cs->cs_lock); 854 return (FALSE); 855 } 856 cu->cu_wait = *(struct timeval *)info; 857 break; 858 case CLGET_RETRY_TIMEOUT: 859 *(struct timeval *)info = cu->cu_wait; 860 break; 861 case CLGET_SVC_ADDR: 862 /* 863 * Slightly different semantics to userland - we use 864 * sockaddr instead of netbuf. 865 */ 866 memcpy(info, &cu->cu_raddr, cu->cu_raddr.ss_len); 867 break; 868 case CLSET_SVC_ADDR: /* set to new address */ 869 addr = (struct sockaddr *)info; 870 (void) memcpy(&cu->cu_raddr, addr, addr->sa_len); 871 break; 872 case CLGET_XID: 873 *(uint32_t *)info = cu->cu_xid; 874 break; 875 876 case CLSET_XID: 877 /* This will set the xid of the NEXT call */ 878 /* decrement by 1 as clnt_dg_call() increments once */ 879 cu->cu_xid = *(uint32_t *)info - 1; 880 break; 881 882 case CLGET_VERS: 883 /* 884 * This RELIES on the information that, in the call body, 885 * the version number field is the fifth field from the 886 * begining of the RPC header. MUST be changed if the 887 * call_struct is changed 888 */ 889 *(uint32_t *)info = 890 ntohl(*(uint32_t *)(void *)(cu->cu_mcallc + 891 4 * BYTES_PER_XDR_UNIT)); 892 break; 893 894 case CLSET_VERS: 895 *(uint32_t *)(void *)(cu->cu_mcallc + 4 * BYTES_PER_XDR_UNIT) 896 = htonl(*(uint32_t *)info); 897 break; 898 899 case CLGET_PROG: 900 /* 901 * This RELIES on the information that, in the call body, 902 * the program number field is the fourth field from the 903 * begining of the RPC header. MUST be changed if the 904 * call_struct is changed 905 */ 906 *(uint32_t *)info = 907 ntohl(*(uint32_t *)(void *)(cu->cu_mcallc + 908 3 * BYTES_PER_XDR_UNIT)); 909 break; 910 911 case CLSET_PROG: 912 *(uint32_t *)(void *)(cu->cu_mcallc + 3 * BYTES_PER_XDR_UNIT) 913 = htonl(*(uint32_t *)info); 914 break; 915 case CLSET_ASYNC: 916 cu->cu_async = *(int *)info; 917 break; 918 case CLSET_CONNECT: 919 cu->cu_connect = *(int *)info; 920 break; 921 case CLSET_WAITCHAN: 922 cu->cu_waitchan = (const char *)info; 923 break; 924 case CLGET_WAITCHAN: 925 *(const char **) info = cu->cu_waitchan; 926 break; 927 case CLSET_INTERRUPTIBLE: 928 if (*(int *) info) 929 cu->cu_waitflag = PCATCH; 930 else 931 cu->cu_waitflag = 0; 932 break; 933 case CLGET_INTERRUPTIBLE: 934 if (cu->cu_waitflag) 935 *(int *) info = TRUE; 936 else 937 *(int *) info = FALSE; 938 break; 939 default: 940 mtx_unlock(&cs->cs_lock); 941 return (FALSE); 942 } 943 mtx_unlock(&cs->cs_lock); 944 return (TRUE); 945} 946 947static void 948clnt_dg_close(CLIENT *cl) 949{ 950 struct cu_data *cu = (struct cu_data *)cl->cl_private; 951 struct cu_socket *cs; 952 struct cu_request *cr; 953 954 cs = cu->cu_socket->so_rcv.sb_upcallarg; 955 mtx_lock(&cs->cs_lock); 956 957 if (cu->cu_closed) { 958 mtx_unlock(&cs->cs_lock); 959 return; 960 } 961 962 if (cu->cu_closing) { 963 while (cu->cu_closing) 964 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0); 965 KASSERT(cu->cu_closed, ("client should be closed")); 966 mtx_unlock(&cs->cs_lock); 967 return; 968 } 969 970 /* 971 * Abort any pending requests and wait until everyone 972 * has finished with clnt_vc_call. 973 */ 974 cu->cu_closing = TRUE; 975 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 976 if (cr->cr_client == cl) { 977 cr->cr_xid = 0; 978 cr->cr_error = ESHUTDOWN; 979 wakeup(cr); 980 } 981 } 982 983 while (cu->cu_threads) 984 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0); 985 986 cu->cu_closing = FALSE; 987 cu->cu_closed = TRUE; 988 989 mtx_unlock(&cs->cs_lock); 990 wakeup(cu); 991} 992 993static void 994clnt_dg_destroy(CLIENT *cl) 995{ 996 struct cu_data *cu = (struct cu_data *)cl->cl_private; 997 struct cu_socket *cs; 998 struct socket *so = NULL; 999 bool_t lastsocketref; 1000 1001 cs = cu->cu_socket->so_rcv.sb_upcallarg; 1002 clnt_dg_close(cl); 1003 1004 SOCKBUF_LOCK(&cu->cu_socket->so_rcv); 1005 mtx_lock(&cs->cs_lock); 1006 1007 cs->cs_refs--; 1008 if (cs->cs_refs == 0) { 1009 mtx_unlock(&cs->cs_lock); 1010 soupcall_clear(cu->cu_socket, SO_RCV); 1011 clnt_dg_upcallsdone(cu->cu_socket, cs); 1012 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv); 1013 mtx_destroy(&cs->cs_lock); 1014 mem_free(cs, sizeof(*cs)); 1015 lastsocketref = TRUE; 1016 } else { 1017 mtx_unlock(&cs->cs_lock); 1018 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv); 1019 lastsocketref = FALSE; 1020 } 1021 1022 if (cu->cu_closeit && lastsocketref) { 1023 so = cu->cu_socket; 1024 cu->cu_socket = NULL; 1025 } 1026 1027 if (so) 1028 soclose(so); 1029 1030 if (cl->cl_netid && cl->cl_netid[0]) 1031 mem_free(cl->cl_netid, strlen(cl->cl_netid) +1); 1032 if (cl->cl_tp && cl->cl_tp[0]) 1033 mem_free(cl->cl_tp, strlen(cl->cl_tp) +1); 1034 mem_free(cu, sizeof (*cu)); 1035 mem_free(cl, sizeof (CLIENT)); 1036} 1037 1038/* 1039 * Make sure that the time is not garbage. -1 value is allowed. 1040 */ 1041static bool_t 1042time_not_ok(struct timeval *t) 1043{ 1044 return (t->tv_sec < -1 || t->tv_sec > 100000000 || 1045 t->tv_usec < -1 || t->tv_usec > 1000000); 1046} 1047 1048int 1049clnt_dg_soupcall(struct socket *so, void *arg, int waitflag) 1050{ 1051 struct cu_socket *cs = (struct cu_socket *) arg; 1052 struct uio uio; 1053 struct mbuf *m; 1054 struct mbuf *control; 1055 struct cu_request *cr; 1056 int error, rcvflag, foundreq; 1057 uint32_t xid; 1058 1059 cs->cs_upcallrefs++; 1060 uio.uio_resid = 1000000000; 1061 uio.uio_td = curthread; 1062 do { 1063 SOCKBUF_UNLOCK(&so->so_rcv); 1064 m = NULL; 1065 control = NULL; 1066 rcvflag = MSG_DONTWAIT; 1067 error = soreceive(so, NULL, &uio, &m, &control, &rcvflag); 1068 if (control) 1069 m_freem(control); 1070 SOCKBUF_LOCK(&so->so_rcv); 1071 1072 if (error == EWOULDBLOCK) 1073 break; 1074 1075 /* 1076 * If there was an error, wake up all pending 1077 * requests. 1078 */ 1079 if (error) { 1080 mtx_lock(&cs->cs_lock); 1081 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 1082 cr->cr_xid = 0; 1083 cr->cr_error = error; 1084 wakeup(cr); 1085 } 1086 mtx_unlock(&cs->cs_lock); 1087 break; 1088 } 1089 1090 /* 1091 * The XID is in the first uint32_t of the reply. 1092 */ 1093 if (m->m_len < sizeof(xid) && m_length(m, NULL) < sizeof(xid)) { 1094 /* 1095 * Should never happen. 1096 */ 1097 m_freem(m); 1098 continue; 1099 } 1100 1101 m_copydata(m, 0, sizeof(xid), (char *)&xid); 1102 xid = ntohl(xid); 1103 1104 /* 1105 * Attempt to match this reply with a pending request. 1106 */ 1107 mtx_lock(&cs->cs_lock); 1108 foundreq = 0; 1109 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) { 1110 if (cr->cr_xid == xid) { 1111 /* 1112 * This one matches. We leave the 1113 * reply mbuf in cr->cr_mrep. Set the 1114 * XID to zero so that we will ignore 1115 * any duplicated replies that arrive 1116 * before clnt_dg_call removes it from 1117 * the queue. 1118 */ 1119 cr->cr_xid = 0; 1120 cr->cr_mrep = m; 1121 cr->cr_error = 0; 1122 foundreq = 1; 1123 wakeup(cr); 1124 break; 1125 } 1126 } 1127 mtx_unlock(&cs->cs_lock); 1128 1129 /* 1130 * If we didn't find the matching request, just drop 1131 * it - its probably a repeated reply. 1132 */ 1133 if (!foundreq) 1134 m_freem(m); 1135 } while (m); 1136 cs->cs_upcallrefs--; 1137 if (cs->cs_upcallrefs < 0) 1138 panic("rpcdg upcall refcnt"); 1139 if (cs->cs_upcallrefs == 0) 1140 wakeup(&cs->cs_upcallrefs); 1141 return (SU_OK); 1142} 1143 1144/* 1145 * Wait for all upcalls in progress to complete. 1146 */ 1147static void 1148clnt_dg_upcallsdone(struct socket *so, struct cu_socket *cs) 1149{ 1150 1151 SOCKBUF_LOCK_ASSERT(&so->so_rcv); 1152 1153 while (cs->cs_upcallrefs > 0) 1154 (void) msleep(&cs->cs_upcallrefs, SOCKBUF_MTX(&so->so_rcv), 0, 1155 "rpcdgup", 0); 1156} 1157