clnt_vc.c revision 259984
1/*	$NetBSD: clnt_vc.c,v 1.4 2000/07/14 08:40:42 fvdl Exp $	*/
2
3/*
4 * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
5 * unrestricted use provided that this legend is included on all tape
6 * media and as a part of the software program in whole or part.  Users
7 * may copy or modify Sun RPC without charge, but are not authorized
8 * to license or distribute it to anyone else except as part of a product or
9 * program developed by the user.
10 *
11 * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
12 * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
13 * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
14 *
15 * Sun RPC is provided with no support and without any obligation on the
16 * part of Sun Microsystems, Inc. to assist in its use, correction,
17 * modification or enhancement.
18 *
19 * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
20 * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
21 * OR ANY PART THEREOF.
22 *
23 * In no event will Sun Microsystems, Inc. be liable for any lost revenue
24 * or profits or other special, indirect and consequential damages, even if
25 * Sun has been advised of the possibility of such damages.
26 *
27 * Sun Microsystems, Inc.
28 * 2550 Garcia Avenue
29 * Mountain View, California  94043
30 */
31
32#if defined(LIBC_SCCS) && !defined(lint)
33static char *sccsid2 = "@(#)clnt_tcp.c 1.37 87/10/05 Copyr 1984 Sun Micro";
34static char *sccsid = "@(#)clnt_tcp.c	2.2 88/08/01 4.0 RPCSRC";
35static char sccsid3[] = "@(#)clnt_vc.c 1.19 89/03/16 Copyr 1988 Sun Micro";
36#endif
37#include <sys/cdefs.h>
38__FBSDID("$FreeBSD: stable/10/sys/rpc/clnt_vc.c 259984 2013-12-28 01:26:26Z dim $");
39
40/*
41 * clnt_tcp.c, Implements a TCP/IP based, client side RPC.
42 *
43 * Copyright (C) 1984, Sun Microsystems, Inc.
44 *
45 * TCP based RPC supports 'batched calls'.
46 * A sequence of calls may be batched-up in a send buffer.  The rpc call
47 * return immediately to the client even though the call was not necessarily
48 * sent.  The batching occurs if the results' xdr routine is NULL (0) AND
49 * the rpc timeout value is zero (see clnt.h, rpc).
50 *
51 * Clients should NOT casually batch calls that in fact return results; that is,
52 * the server side should be aware that a call is batched and not produce any
53 * return message.  Batched calls that produce many result messages can
54 * deadlock (netlock) the client and the server....
55 *
56 * Now go hang yourself.
57 */
58
59#include <sys/param.h>
60#include <sys/systm.h>
61#include <sys/lock.h>
62#include <sys/malloc.h>
63#include <sys/mbuf.h>
64#include <sys/mutex.h>
65#include <sys/pcpu.h>
66#include <sys/proc.h>
67#include <sys/protosw.h>
68#include <sys/socket.h>
69#include <sys/socketvar.h>
70#include <sys/sx.h>
71#include <sys/syslog.h>
72#include <sys/time.h>
73#include <sys/uio.h>
74
75#include <net/vnet.h>
76
77#include <netinet/tcp.h>
78
79#include <rpc/rpc.h>
80#include <rpc/rpc_com.h>
81#include <rpc/krpc.h>
82
83struct cmessage {
84        struct cmsghdr cmsg;
85        struct cmsgcred cmcred;
86};
87
88static enum clnt_stat clnt_vc_call(CLIENT *, struct rpc_callextra *,
89    rpcproc_t, struct mbuf *, struct mbuf **, struct timeval);
90static void clnt_vc_geterr(CLIENT *, struct rpc_err *);
91static bool_t clnt_vc_freeres(CLIENT *, xdrproc_t, void *);
92static void clnt_vc_abort(CLIENT *);
93static bool_t clnt_vc_control(CLIENT *, u_int, void *);
94static void clnt_vc_close(CLIENT *);
95static void clnt_vc_destroy(CLIENT *);
96static bool_t time_not_ok(struct timeval *);
97static int clnt_vc_soupcall(struct socket *so, void *arg, int waitflag);
98
99static struct clnt_ops clnt_vc_ops = {
100	.cl_call =	clnt_vc_call,
101	.cl_abort =	clnt_vc_abort,
102	.cl_geterr =	clnt_vc_geterr,
103	.cl_freeres =	clnt_vc_freeres,
104	.cl_close =	clnt_vc_close,
105	.cl_destroy =	clnt_vc_destroy,
106	.cl_control =	clnt_vc_control
107};
108
109static void clnt_vc_upcallsdone(struct ct_data *);
110
111/*
112 * Create a client handle for a connection.
113 * Default options are set, which the user can change using clnt_control()'s.
114 * The rpc/vc package does buffering similar to stdio, so the client
115 * must pick send and receive buffer sizes, 0 => use the default.
116 * NB: fd is copied into a private area.
117 * NB: The rpch->cl_auth is set null authentication. Caller may wish to
118 * set this something more useful.
119 *
120 * fd should be an open socket
121 */
122CLIENT *
123clnt_vc_create(
124	struct socket *so,		/* open file descriptor */
125	struct sockaddr *raddr,		/* servers address */
126	const rpcprog_t prog,		/* program number */
127	const rpcvers_t vers,		/* version number */
128	size_t sendsz,			/* buffer recv size */
129	size_t recvsz,			/* buffer send size */
130	int intrflag)			/* interruptible */
131{
132	CLIENT *cl;			/* client handle */
133	struct ct_data *ct = NULL;	/* client handle */
134	struct timeval now;
135	struct rpc_msg call_msg;
136	static uint32_t disrupt;
137	struct __rpc_sockinfo si;
138	XDR xdrs;
139	int error, interrupted, one = 1, sleep_flag;
140	struct sockopt sopt;
141
142	if (disrupt == 0)
143		disrupt = (uint32_t)(long)raddr;
144
145	cl = (CLIENT *)mem_alloc(sizeof (*cl));
146	ct = (struct ct_data *)mem_alloc(sizeof (*ct));
147
148	mtx_init(&ct->ct_lock, "ct->ct_lock", NULL, MTX_DEF);
149	ct->ct_threads = 0;
150	ct->ct_closing = FALSE;
151	ct->ct_closed = FALSE;
152	ct->ct_upcallrefs = 0;
153
154	if ((so->so_state & (SS_ISCONNECTED|SS_ISCONFIRMING)) == 0) {
155		error = soconnect(so, raddr, curthread);
156		SOCK_LOCK(so);
157		interrupted = 0;
158		sleep_flag = PSOCK;
159		if (intrflag != 0)
160			sleep_flag |= PCATCH;
161		while ((so->so_state & SS_ISCONNECTING)
162		    && so->so_error == 0) {
163			error = msleep(&so->so_timeo, SOCK_MTX(so),
164			    sleep_flag, "connec", 0);
165			if (error) {
166				if (error == EINTR || error == ERESTART)
167					interrupted = 1;
168				break;
169			}
170		}
171		if (error == 0) {
172			error = so->so_error;
173			so->so_error = 0;
174		}
175		SOCK_UNLOCK(so);
176		if (error) {
177			if (!interrupted)
178				so->so_state &= ~SS_ISCONNECTING;
179			rpc_createerr.cf_stat = RPC_SYSTEMERROR;
180			rpc_createerr.cf_error.re_errno = error;
181			goto err;
182		}
183	}
184
185	if (!__rpc_socket2sockinfo(so, &si)) {
186		goto err;
187	}
188
189	if (so->so_proto->pr_flags & PR_CONNREQUIRED) {
190		bzero(&sopt, sizeof(sopt));
191		sopt.sopt_dir = SOPT_SET;
192		sopt.sopt_level = SOL_SOCKET;
193		sopt.sopt_name = SO_KEEPALIVE;
194		sopt.sopt_val = &one;
195		sopt.sopt_valsize = sizeof(one);
196		sosetopt(so, &sopt);
197	}
198
199	if (so->so_proto->pr_protocol == IPPROTO_TCP) {
200		bzero(&sopt, sizeof(sopt));
201		sopt.sopt_dir = SOPT_SET;
202		sopt.sopt_level = IPPROTO_TCP;
203		sopt.sopt_name = TCP_NODELAY;
204		sopt.sopt_val = &one;
205		sopt.sopt_valsize = sizeof(one);
206		sosetopt(so, &sopt);
207	}
208
209	ct->ct_closeit = FALSE;
210
211	/*
212	 * Set up private data struct
213	 */
214	ct->ct_socket = so;
215	ct->ct_wait.tv_sec = -1;
216	ct->ct_wait.tv_usec = -1;
217	memcpy(&ct->ct_addr, raddr, raddr->sa_len);
218
219	/*
220	 * Initialize call message
221	 */
222	getmicrotime(&now);
223	ct->ct_xid = ((uint32_t)++disrupt) ^ __RPC_GETXID(&now);
224	call_msg.rm_xid = ct->ct_xid;
225	call_msg.rm_direction = CALL;
226	call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION;
227	call_msg.rm_call.cb_prog = (uint32_t)prog;
228	call_msg.rm_call.cb_vers = (uint32_t)vers;
229
230	/*
231	 * pre-serialize the static part of the call msg and stash it away
232	 */
233	xdrmem_create(&xdrs, ct->ct_mcallc, MCALL_MSG_SIZE,
234	    XDR_ENCODE);
235	if (! xdr_callhdr(&xdrs, &call_msg)) {
236		if (ct->ct_closeit) {
237			soclose(ct->ct_socket);
238		}
239		goto err;
240	}
241	ct->ct_mpos = XDR_GETPOS(&xdrs);
242	XDR_DESTROY(&xdrs);
243	ct->ct_waitchan = "rpcrecv";
244	ct->ct_waitflag = 0;
245
246	/*
247	 * Create a client handle which uses xdrrec for serialization
248	 * and authnone for authentication.
249	 */
250	sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz);
251	recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz);
252	error = soreserve(ct->ct_socket, sendsz, recvsz);
253	if (error != 0) {
254		if (ct->ct_closeit) {
255			soclose(ct->ct_socket);
256		}
257		goto err;
258	}
259	cl->cl_refs = 1;
260	cl->cl_ops = &clnt_vc_ops;
261	cl->cl_private = ct;
262	cl->cl_auth = authnone_create();
263
264	SOCKBUF_LOCK(&ct->ct_socket->so_rcv);
265	soupcall_set(ct->ct_socket, SO_RCV, clnt_vc_soupcall, ct);
266	SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv);
267
268	ct->ct_record = NULL;
269	ct->ct_record_resid = 0;
270	TAILQ_INIT(&ct->ct_pending);
271	return (cl);
272
273err:
274	if (cl) {
275		if (ct) {
276			mtx_destroy(&ct->ct_lock);
277			mem_free(ct, sizeof (struct ct_data));
278		}
279		if (cl)
280			mem_free(cl, sizeof (CLIENT));
281	}
282	return ((CLIENT *)NULL);
283}
284
285static enum clnt_stat
286clnt_vc_call(
287	CLIENT		*cl,		/* client handle */
288	struct rpc_callextra *ext,	/* call metadata */
289	rpcproc_t	proc,		/* procedure number */
290	struct mbuf	*args,		/* pointer to args */
291	struct mbuf	**resultsp,	/* pointer to results */
292	struct timeval	utimeout)
293{
294	struct ct_data *ct = (struct ct_data *) cl->cl_private;
295	AUTH *auth;
296	struct rpc_err *errp;
297	enum clnt_stat stat;
298	XDR xdrs;
299	struct rpc_msg reply_msg;
300	bool_t ok;
301	int nrefreshes = 2;		/* number of times to refresh cred */
302	struct timeval timeout;
303	uint32_t xid;
304	struct mbuf *mreq = NULL, *results;
305	struct ct_request *cr;
306	int error;
307
308	cr = malloc(sizeof(struct ct_request), M_RPC, M_WAITOK);
309
310	mtx_lock(&ct->ct_lock);
311
312	if (ct->ct_closing || ct->ct_closed) {
313		mtx_unlock(&ct->ct_lock);
314		free(cr, M_RPC);
315		return (RPC_CANTSEND);
316	}
317	ct->ct_threads++;
318
319	if (ext) {
320		auth = ext->rc_auth;
321		errp = &ext->rc_err;
322	} else {
323		auth = cl->cl_auth;
324		errp = &ct->ct_error;
325	}
326
327	cr->cr_mrep = NULL;
328	cr->cr_error = 0;
329
330	if (ct->ct_wait.tv_usec == -1) {
331		timeout = utimeout;	/* use supplied timeout */
332	} else {
333		timeout = ct->ct_wait;	/* use default timeout */
334	}
335
336call_again:
337	mtx_assert(&ct->ct_lock, MA_OWNED);
338
339	ct->ct_xid++;
340	xid = ct->ct_xid;
341
342	mtx_unlock(&ct->ct_lock);
343
344	/*
345	 * Leave space to pre-pend the record mark.
346	 */
347	mreq = m_gethdr(M_WAITOK, MT_DATA);
348	mreq->m_data += sizeof(uint32_t);
349	KASSERT(ct->ct_mpos + sizeof(uint32_t) <= MHLEN,
350	    ("RPC header too big"));
351	bcopy(ct->ct_mcallc, mreq->m_data, ct->ct_mpos);
352	mreq->m_len = ct->ct_mpos;
353
354	/*
355	 * The XID is the first thing in the request.
356	 */
357	*mtod(mreq, uint32_t *) = htonl(xid);
358
359	xdrmbuf_create(&xdrs, mreq, XDR_ENCODE);
360
361	errp->re_status = stat = RPC_SUCCESS;
362
363	if ((! XDR_PUTINT32(&xdrs, &proc)) ||
364	    (! AUTH_MARSHALL(auth, xid, &xdrs,
365		m_copym(args, 0, M_COPYALL, M_WAITOK)))) {
366		errp->re_status = stat = RPC_CANTENCODEARGS;
367		mtx_lock(&ct->ct_lock);
368		goto out;
369	}
370	mreq->m_pkthdr.len = m_length(mreq, NULL);
371
372	/*
373	 * Prepend a record marker containing the packet length.
374	 */
375	M_PREPEND(mreq, sizeof(uint32_t), M_WAITOK);
376	*mtod(mreq, uint32_t *) =
377		htonl(0x80000000 | (mreq->m_pkthdr.len - sizeof(uint32_t)));
378
379	cr->cr_xid = xid;
380	mtx_lock(&ct->ct_lock);
381	/*
382	 * Check to see if the other end has already started to close down
383	 * the connection. The upcall will have set ct_error.re_status
384	 * to RPC_CANTRECV if this is the case.
385	 * If the other end starts to close down the connection after this
386	 * point, it will be detected later when cr_error is checked,
387	 * since the request is in the ct_pending queue.
388	 */
389	if (ct->ct_error.re_status == RPC_CANTRECV) {
390		if (errp != &ct->ct_error) {
391			errp->re_errno = ct->ct_error.re_errno;
392			errp->re_status = RPC_CANTRECV;
393		}
394		stat = RPC_CANTRECV;
395		goto out;
396	}
397	TAILQ_INSERT_TAIL(&ct->ct_pending, cr, cr_link);
398	mtx_unlock(&ct->ct_lock);
399
400	/*
401	 * sosend consumes mreq.
402	 */
403	error = sosend(ct->ct_socket, NULL, NULL, mreq, NULL, 0, curthread);
404	mreq = NULL;
405	if (error == EMSGSIZE) {
406		SOCKBUF_LOCK(&ct->ct_socket->so_snd);
407		sbwait(&ct->ct_socket->so_snd);
408		SOCKBUF_UNLOCK(&ct->ct_socket->so_snd);
409		AUTH_VALIDATE(auth, xid, NULL, NULL);
410		mtx_lock(&ct->ct_lock);
411		TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
412		goto call_again;
413	}
414
415	reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL;
416	reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf;
417	reply_msg.acpted_rply.ar_verf.oa_length = 0;
418	reply_msg.acpted_rply.ar_results.where = NULL;
419	reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void;
420
421	mtx_lock(&ct->ct_lock);
422	if (error) {
423		TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
424		errp->re_errno = error;
425		errp->re_status = stat = RPC_CANTSEND;
426		goto out;
427	}
428
429	/*
430	 * Check to see if we got an upcall while waiting for the
431	 * lock. In both these cases, the request has been removed
432	 * from ct->ct_pending.
433	 */
434	if (cr->cr_error) {
435		TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
436		errp->re_errno = cr->cr_error;
437		errp->re_status = stat = RPC_CANTRECV;
438		goto out;
439	}
440	if (cr->cr_mrep) {
441		TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
442		goto got_reply;
443	}
444
445	/*
446	 * Hack to provide rpc-based message passing
447	 */
448	if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
449		TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
450		errp->re_status = stat = RPC_TIMEDOUT;
451		goto out;
452	}
453
454	error = msleep(cr, &ct->ct_lock, ct->ct_waitflag, ct->ct_waitchan,
455	    tvtohz(&timeout));
456
457	TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
458
459	if (error) {
460		/*
461		 * The sleep returned an error so our request is still
462		 * on the list. Turn the error code into an
463		 * appropriate client status.
464		 */
465		errp->re_errno = error;
466		switch (error) {
467		case EINTR:
468			stat = RPC_INTR;
469			break;
470		case EWOULDBLOCK:
471			stat = RPC_TIMEDOUT;
472			break;
473		default:
474			stat = RPC_CANTRECV;
475		}
476		errp->re_status = stat;
477		goto out;
478	} else {
479		/*
480		 * We were woken up by the upcall.  If the
481		 * upcall had a receive error, report that,
482		 * otherwise we have a reply.
483		 */
484		if (cr->cr_error) {
485			errp->re_errno = cr->cr_error;
486			errp->re_status = stat = RPC_CANTRECV;
487			goto out;
488		}
489	}
490
491got_reply:
492	/*
493	 * Now decode and validate the response. We need to drop the
494	 * lock since xdr_replymsg may end up sleeping in malloc.
495	 */
496	mtx_unlock(&ct->ct_lock);
497
498	if (ext && ext->rc_feedback)
499		ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg);
500
501	xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE);
502	ok = xdr_replymsg(&xdrs, &reply_msg);
503	cr->cr_mrep = NULL;
504
505	if (ok) {
506		if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) &&
507		    (reply_msg.acpted_rply.ar_stat == SUCCESS))
508			errp->re_status = stat = RPC_SUCCESS;
509		else
510			stat = _seterr_reply(&reply_msg, errp);
511
512		if (stat == RPC_SUCCESS) {
513			results = xdrmbuf_getall(&xdrs);
514			if (!AUTH_VALIDATE(auth, xid,
515				&reply_msg.acpted_rply.ar_verf,
516				&results)) {
517				errp->re_status = stat = RPC_AUTHERROR;
518				errp->re_why = AUTH_INVALIDRESP;
519			} else {
520				KASSERT(results,
521				    ("auth validated but no result"));
522				*resultsp = results;
523			}
524		}		/* end successful completion */
525		/*
526		 * If unsuccesful AND error is an authentication error
527		 * then refresh credentials and try again, else break
528		 */
529		else if (stat == RPC_AUTHERROR)
530			/* maybe our credentials need to be refreshed ... */
531			if (nrefreshes > 0 &&
532			    AUTH_REFRESH(auth, &reply_msg)) {
533				nrefreshes--;
534				XDR_DESTROY(&xdrs);
535				mtx_lock(&ct->ct_lock);
536				goto call_again;
537			}
538		/* end of unsuccessful completion */
539	}	/* end of valid reply message */
540	else {
541		errp->re_status = stat = RPC_CANTDECODERES;
542	}
543	XDR_DESTROY(&xdrs);
544	mtx_lock(&ct->ct_lock);
545out:
546	mtx_assert(&ct->ct_lock, MA_OWNED);
547
548	KASSERT(stat != RPC_SUCCESS || *resultsp,
549	    ("RPC_SUCCESS without reply"));
550
551	if (mreq)
552		m_freem(mreq);
553	if (cr->cr_mrep)
554		m_freem(cr->cr_mrep);
555
556	ct->ct_threads--;
557	if (ct->ct_closing)
558		wakeup(ct);
559
560	mtx_unlock(&ct->ct_lock);
561
562	if (auth && stat != RPC_SUCCESS)
563		AUTH_VALIDATE(auth, xid, NULL, NULL);
564
565	free(cr, M_RPC);
566
567	return (stat);
568}
569
570static void
571clnt_vc_geterr(CLIENT *cl, struct rpc_err *errp)
572{
573	struct ct_data *ct = (struct ct_data *) cl->cl_private;
574
575	*errp = ct->ct_error;
576}
577
578static bool_t
579clnt_vc_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr)
580{
581	XDR xdrs;
582	bool_t dummy;
583
584	xdrs.x_op = XDR_FREE;
585	dummy = (*xdr_res)(&xdrs, res_ptr);
586
587	return (dummy);
588}
589
590/*ARGSUSED*/
591static void
592clnt_vc_abort(CLIENT *cl)
593{
594}
595
596static bool_t
597clnt_vc_control(CLIENT *cl, u_int request, void *info)
598{
599	struct ct_data *ct = (struct ct_data *)cl->cl_private;
600	void *infop = info;
601	SVCXPRT *xprt;
602
603	mtx_lock(&ct->ct_lock);
604
605	switch (request) {
606	case CLSET_FD_CLOSE:
607		ct->ct_closeit = TRUE;
608		mtx_unlock(&ct->ct_lock);
609		return (TRUE);
610	case CLSET_FD_NCLOSE:
611		ct->ct_closeit = FALSE;
612		mtx_unlock(&ct->ct_lock);
613		return (TRUE);
614	default:
615		break;
616	}
617
618	/* for other requests which use info */
619	if (info == NULL) {
620		mtx_unlock(&ct->ct_lock);
621		return (FALSE);
622	}
623	switch (request) {
624	case CLSET_TIMEOUT:
625		if (time_not_ok((struct timeval *)info)) {
626			mtx_unlock(&ct->ct_lock);
627			return (FALSE);
628		}
629		ct->ct_wait = *(struct timeval *)infop;
630		break;
631	case CLGET_TIMEOUT:
632		*(struct timeval *)infop = ct->ct_wait;
633		break;
634	case CLGET_SERVER_ADDR:
635		(void) memcpy(info, &ct->ct_addr, (size_t)ct->ct_addr.ss_len);
636		break;
637	case CLGET_SVC_ADDR:
638		/*
639		 * Slightly different semantics to userland - we use
640		 * sockaddr instead of netbuf.
641		 */
642		memcpy(info, &ct->ct_addr, ct->ct_addr.ss_len);
643		break;
644	case CLSET_SVC_ADDR:		/* set to new address */
645		mtx_unlock(&ct->ct_lock);
646		return (FALSE);
647	case CLGET_XID:
648		*(uint32_t *)info = ct->ct_xid;
649		break;
650	case CLSET_XID:
651		/* This will set the xid of the NEXT call */
652		/* decrement by 1 as clnt_vc_call() increments once */
653		ct->ct_xid = *(uint32_t *)info - 1;
654		break;
655	case CLGET_VERS:
656		/*
657		 * This RELIES on the information that, in the call body,
658		 * the version number field is the fifth field from the
659		 * begining of the RPC header. MUST be changed if the
660		 * call_struct is changed
661		 */
662		*(uint32_t *)info =
663		    ntohl(*(uint32_t *)(void *)(ct->ct_mcallc +
664		    4 * BYTES_PER_XDR_UNIT));
665		break;
666
667	case CLSET_VERS:
668		*(uint32_t *)(void *)(ct->ct_mcallc +
669		    4 * BYTES_PER_XDR_UNIT) =
670		    htonl(*(uint32_t *)info);
671		break;
672
673	case CLGET_PROG:
674		/*
675		 * This RELIES on the information that, in the call body,
676		 * the program number field is the fourth field from the
677		 * begining of the RPC header. MUST be changed if the
678		 * call_struct is changed
679		 */
680		*(uint32_t *)info =
681		    ntohl(*(uint32_t *)(void *)(ct->ct_mcallc +
682		    3 * BYTES_PER_XDR_UNIT));
683		break;
684
685	case CLSET_PROG:
686		*(uint32_t *)(void *)(ct->ct_mcallc +
687		    3 * BYTES_PER_XDR_UNIT) =
688		    htonl(*(uint32_t *)info);
689		break;
690
691	case CLSET_WAITCHAN:
692		ct->ct_waitchan = (const char *)info;
693		break;
694
695	case CLGET_WAITCHAN:
696		*(const char **) info = ct->ct_waitchan;
697		break;
698
699	case CLSET_INTERRUPTIBLE:
700		if (*(int *) info)
701			ct->ct_waitflag = PCATCH;
702		else
703			ct->ct_waitflag = 0;
704		break;
705
706	case CLGET_INTERRUPTIBLE:
707		if (ct->ct_waitflag)
708			*(int *) info = TRUE;
709		else
710			*(int *) info = FALSE;
711		break;
712
713	case CLSET_BACKCHANNEL:
714		xprt = (SVCXPRT *)info;
715		if (ct->ct_backchannelxprt == NULL) {
716			xprt->xp_p2 = ct;
717			ct->ct_backchannelxprt = xprt;
718		}
719		break;
720
721	default:
722		mtx_unlock(&ct->ct_lock);
723		return (FALSE);
724	}
725
726	mtx_unlock(&ct->ct_lock);
727	return (TRUE);
728}
729
730static void
731clnt_vc_close(CLIENT *cl)
732{
733	struct ct_data *ct = (struct ct_data *) cl->cl_private;
734	struct ct_request *cr;
735
736	mtx_lock(&ct->ct_lock);
737
738	if (ct->ct_closed) {
739		mtx_unlock(&ct->ct_lock);
740		return;
741	}
742
743	if (ct->ct_closing) {
744		while (ct->ct_closing)
745			msleep(ct, &ct->ct_lock, 0, "rpcclose", 0);
746		KASSERT(ct->ct_closed, ("client should be closed"));
747		mtx_unlock(&ct->ct_lock);
748		return;
749	}
750
751	if (ct->ct_socket) {
752		ct->ct_closing = TRUE;
753		mtx_unlock(&ct->ct_lock);
754
755		SOCKBUF_LOCK(&ct->ct_socket->so_rcv);
756		soupcall_clear(ct->ct_socket, SO_RCV);
757		clnt_vc_upcallsdone(ct);
758		SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv);
759
760		/*
761		 * Abort any pending requests and wait until everyone
762		 * has finished with clnt_vc_call.
763		 */
764		mtx_lock(&ct->ct_lock);
765		TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
766			cr->cr_xid = 0;
767			cr->cr_error = ESHUTDOWN;
768			wakeup(cr);
769		}
770
771		while (ct->ct_threads)
772			msleep(ct, &ct->ct_lock, 0, "rpcclose", 0);
773	}
774
775	ct->ct_closing = FALSE;
776	ct->ct_closed = TRUE;
777	mtx_unlock(&ct->ct_lock);
778	wakeup(ct);
779}
780
781static void
782clnt_vc_destroy(CLIENT *cl)
783{
784	struct ct_data *ct = (struct ct_data *) cl->cl_private;
785	struct socket *so = NULL;
786	SVCXPRT *xprt;
787
788	clnt_vc_close(cl);
789
790	mtx_lock(&ct->ct_lock);
791	xprt = ct->ct_backchannelxprt;
792	ct->ct_backchannelxprt = NULL;
793	if (xprt != NULL) {
794		mtx_unlock(&ct->ct_lock);	/* To avoid a LOR. */
795		sx_xlock(&xprt->xp_lock);
796		mtx_lock(&ct->ct_lock);
797		xprt->xp_p2 = NULL;
798		xprt_unregister(xprt);
799	}
800
801	if (ct->ct_socket) {
802		if (ct->ct_closeit) {
803			so = ct->ct_socket;
804		}
805	}
806
807	mtx_unlock(&ct->ct_lock);
808	if (xprt != NULL) {
809		sx_xunlock(&xprt->xp_lock);
810		SVC_RELEASE(xprt);
811	}
812
813	mtx_destroy(&ct->ct_lock);
814	if (so) {
815		soshutdown(so, SHUT_WR);
816		soclose(so);
817	}
818	mem_free(ct, sizeof(struct ct_data));
819	if (cl->cl_netid && cl->cl_netid[0])
820		mem_free(cl->cl_netid, strlen(cl->cl_netid) +1);
821	if (cl->cl_tp && cl->cl_tp[0])
822		mem_free(cl->cl_tp, strlen(cl->cl_tp) +1);
823	mem_free(cl, sizeof(CLIENT));
824}
825
826/*
827 * Make sure that the time is not garbage.   -1 value is disallowed.
828 * Note this is different from time_not_ok in clnt_dg.c
829 */
830static bool_t
831time_not_ok(struct timeval *t)
832{
833	return (t->tv_sec <= -1 || t->tv_sec > 100000000 ||
834		t->tv_usec <= -1 || t->tv_usec > 1000000);
835}
836
837int
838clnt_vc_soupcall(struct socket *so, void *arg, int waitflag)
839{
840	struct ct_data *ct = (struct ct_data *) arg;
841	struct uio uio;
842	struct mbuf *m, *m2;
843	struct ct_request *cr;
844	int error, rcvflag, foundreq;
845	uint32_t xid_plus_direction[2], header;
846	bool_t do_read;
847	SVCXPRT *xprt;
848	struct cf_conn *cd;
849
850	CTASSERT(sizeof(xid_plus_direction) == 2 * sizeof(uint32_t));
851	ct->ct_upcallrefs++;
852	uio.uio_td = curthread;
853	do {
854		/*
855		 * If ct_record_resid is zero, we are waiting for a
856		 * record mark.
857		 */
858		if (ct->ct_record_resid == 0) {
859
860			/*
861			 * Make sure there is either a whole record
862			 * mark in the buffer or there is some other
863			 * error condition
864			 */
865			do_read = FALSE;
866			if (so->so_rcv.sb_cc >= sizeof(uint32_t)
867			    || (so->so_rcv.sb_state & SBS_CANTRCVMORE)
868			    || so->so_error)
869				do_read = TRUE;
870
871			if (!do_read)
872				break;
873
874			SOCKBUF_UNLOCK(&so->so_rcv);
875			uio.uio_resid = sizeof(uint32_t);
876			m = NULL;
877			rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
878			error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
879			SOCKBUF_LOCK(&so->so_rcv);
880
881			if (error == EWOULDBLOCK)
882				break;
883
884			/*
885			 * If there was an error, wake up all pending
886			 * requests.
887			 */
888			if (error || uio.uio_resid > 0) {
889			wakeup_all:
890				mtx_lock(&ct->ct_lock);
891				if (!error) {
892					/*
893					 * We must have got EOF trying
894					 * to read from the stream.
895					 */
896					error = ECONNRESET;
897				}
898				ct->ct_error.re_status = RPC_CANTRECV;
899				ct->ct_error.re_errno = error;
900				TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
901					cr->cr_error = error;
902					wakeup(cr);
903				}
904				mtx_unlock(&ct->ct_lock);
905				break;
906			}
907			m_copydata(m, 0, sizeof(uint32_t), (char *)&header);
908			header = ntohl(header);
909			ct->ct_record = NULL;
910			ct->ct_record_resid = header & 0x7fffffff;
911			ct->ct_record_eor = ((header & 0x80000000) != 0);
912			m_freem(m);
913		} else {
914			/*
915			 * Wait until the socket has the whole record
916			 * buffered.
917			 */
918			do_read = FALSE;
919			if (so->so_rcv.sb_cc >= ct->ct_record_resid
920			    || (so->so_rcv.sb_state & SBS_CANTRCVMORE)
921			    || so->so_error)
922				do_read = TRUE;
923
924			if (!do_read)
925				break;
926
927			/*
928			 * We have the record mark. Read as much as
929			 * the socket has buffered up to the end of
930			 * this record.
931			 */
932			SOCKBUF_UNLOCK(&so->so_rcv);
933			uio.uio_resid = ct->ct_record_resid;
934			m = NULL;
935			rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
936			error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
937			SOCKBUF_LOCK(&so->so_rcv);
938
939			if (error == EWOULDBLOCK)
940				break;
941
942			if (error || uio.uio_resid == ct->ct_record_resid)
943				goto wakeup_all;
944
945			/*
946			 * If we have part of the record already,
947			 * chain this bit onto the end.
948			 */
949			if (ct->ct_record)
950				m_last(ct->ct_record)->m_next = m;
951			else
952				ct->ct_record = m;
953
954			ct->ct_record_resid = uio.uio_resid;
955
956			/*
957			 * If we have the entire record, see if we can
958			 * match it to a request.
959			 */
960			if (ct->ct_record_resid == 0
961			    && ct->ct_record_eor) {
962				/*
963				 * The XID is in the first uint32_t of
964				 * the reply and the message direction
965				 * is the second one.
966				 */
967				if (ct->ct_record->m_len <
968				    sizeof(xid_plus_direction) &&
969				    m_length(ct->ct_record, NULL) <
970				    sizeof(xid_plus_direction)) {
971					m_freem(ct->ct_record);
972					break;
973				}
974				m_copydata(ct->ct_record, 0,
975				    sizeof(xid_plus_direction),
976				    (char *)xid_plus_direction);
977				xid_plus_direction[0] =
978				    ntohl(xid_plus_direction[0]);
979				xid_plus_direction[1] =
980				    ntohl(xid_plus_direction[1]);
981				/* Check message direction. */
982				if (xid_plus_direction[1] == CALL) {
983					/* This is a backchannel request. */
984					mtx_lock(&ct->ct_lock);
985					xprt = ct->ct_backchannelxprt;
986					if (xprt == NULL) {
987						mtx_unlock(&ct->ct_lock);
988						/* Just throw it away. */
989						m_freem(ct->ct_record);
990						ct->ct_record = NULL;
991					} else {
992						cd = (struct cf_conn *)
993						    xprt->xp_p1;
994						m2 = cd->mreq;
995						/*
996						 * The requests are chained
997						 * in the m_nextpkt list.
998						 */
999						while (m2 != NULL &&
1000						    m2->m_nextpkt != NULL)
1001							/* Find end of list. */
1002							m2 = m2->m_nextpkt;
1003						if (m2 != NULL)
1004							m2->m_nextpkt =
1005							    ct->ct_record;
1006						else
1007							cd->mreq =
1008							    ct->ct_record;
1009						ct->ct_record->m_nextpkt =
1010						    NULL;
1011						ct->ct_record = NULL;
1012						xprt_active(xprt);
1013						mtx_unlock(&ct->ct_lock);
1014					}
1015				} else {
1016					mtx_lock(&ct->ct_lock);
1017					foundreq = 0;
1018					TAILQ_FOREACH(cr, &ct->ct_pending,
1019					    cr_link) {
1020						if (cr->cr_xid ==
1021						    xid_plus_direction[0]) {
1022							/*
1023							 * This one
1024							 * matches. We leave
1025							 * the reply mbuf in
1026							 * cr->cr_mrep. Set
1027							 * the XID to zero so
1028							 * that we will ignore
1029							 * any duplicated
1030							 * replies.
1031							 */
1032							cr->cr_xid = 0;
1033							cr->cr_mrep =
1034							    ct->ct_record;
1035							cr->cr_error = 0;
1036							foundreq = 1;
1037							wakeup(cr);
1038							break;
1039						}
1040					}
1041					mtx_unlock(&ct->ct_lock);
1042
1043					if (!foundreq)
1044						m_freem(ct->ct_record);
1045					ct->ct_record = NULL;
1046				}
1047			}
1048		}
1049	} while (m);
1050	ct->ct_upcallrefs--;
1051	if (ct->ct_upcallrefs < 0)
1052		panic("rpcvc upcall refcnt");
1053	if (ct->ct_upcallrefs == 0)
1054		wakeup(&ct->ct_upcallrefs);
1055	return (SU_OK);
1056}
1057
1058/*
1059 * Wait for all upcalls in progress to complete.
1060 */
1061static void
1062clnt_vc_upcallsdone(struct ct_data *ct)
1063{
1064
1065	SOCKBUF_LOCK_ASSERT(&ct->ct_socket->so_rcv);
1066
1067	while (ct->ct_upcallrefs > 0)
1068		(void) msleep(&ct->ct_upcallrefs,
1069		    SOCKBUF_MTX(&ct->ct_socket->so_rcv), 0, "rpcvcup", 0);
1070}
1071