clnt_vc.c revision 248195
1226031Sstas/*	$NetBSD: clnt_vc.c,v 1.4 2000/07/14 08:40:42 fvdl Exp $	*/
2226031Sstas
3226031Sstas/*
4226031Sstas * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
5226031Sstas * unrestricted use provided that this legend is included on all tape
6226031Sstas * media and as a part of the software program in whole or part.  Users
7226031Sstas * may copy or modify Sun RPC without charge, but are not authorized
8226031Sstas * to license or distribute it to anyone else except as part of a product or
9226031Sstas * program developed by the user.
10226031Sstas *
11226031Sstas * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
12226031Sstas * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
13226031Sstas * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
14226031Sstas *
15226031Sstas * Sun RPC is provided with no support and without any obligation on the
16226031Sstas * part of Sun Microsystems, Inc. to assist in its use, correction,
17226031Sstas * modification or enhancement.
18226031Sstas *
19226031Sstas * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
20226031Sstas * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
21226031Sstas * OR ANY PART THEREOF.
22226031Sstas *
23226031Sstas * In no event will Sun Microsystems, Inc. be liable for any lost revenue
24226031Sstas * or profits or other special, indirect and consequential damages, even if
25226031Sstas * Sun has been advised of the possibility of such damages.
26226031Sstas *
27226031Sstas * Sun Microsystems, Inc.
28226031Sstas * 2550 Garcia Avenue
29226031Sstas * Mountain View, California  94043
30226031Sstas */
31226031Sstas
32226031Sstas#if defined(LIBC_SCCS) && !defined(lint)
33226031Sstasstatic char *sccsid2 = "@(#)clnt_tcp.c 1.37 87/10/05 Copyr 1984 Sun Micro";
34226031Sstasstatic char *sccsid = "@(#)clnt_tcp.c	2.2 88/08/01 4.0 RPCSRC";
35226031Sstasstatic char sccsid3[] = "@(#)clnt_vc.c 1.19 89/03/16 Copyr 1988 Sun Micro";
36226031Sstas#endif
37226031Sstas#include <sys/cdefs.h>
38226031Sstas__FBSDID("$FreeBSD: head/sys/rpc/clnt_vc.c 248195 2013-03-12 12:17:19Z glebius $");
39226031Sstas
40226031Sstas/*
41226031Sstas * clnt_tcp.c, Implements a TCP/IP based, client side RPC.
42226031Sstas *
43226031Sstas * Copyright (C) 1984, Sun Microsystems, Inc.
44226031Sstas *
45226031Sstas * TCP based RPC supports 'batched calls'.
46226031Sstas * A sequence of calls may be batched-up in a send buffer.  The rpc call
47226031Sstas * return immediately to the client even though the call was not necessarily
48226031Sstas * sent.  The batching occurs if the results' xdr routine is NULL (0) AND
49226031Sstas * the rpc timeout value is zero (see clnt.h, rpc).
50226031Sstas *
51226031Sstas * Clients should NOT casually batch calls that in fact return results; that is,
52226031Sstas * the server side should be aware that a call is batched and not produce any
53226031Sstas * return message.  Batched calls that produce many result messages can
54226031Sstas * deadlock (netlock) the client and the server....
55226031Sstas *
56226031Sstas * Now go hang yourself.
57226031Sstas */
58226031Sstas
59226031Sstas#include <sys/param.h>
60226031Sstas#include <sys/systm.h>
61226031Sstas#include <sys/lock.h>
62226031Sstas#include <sys/malloc.h>
63226031Sstas#include <sys/mbuf.h>
64226031Sstas#include <sys/mutex.h>
65226031Sstas#include <sys/pcpu.h>
66226031Sstas#include <sys/proc.h>
67226031Sstas#include <sys/protosw.h>
68226031Sstas#include <sys/socket.h>
69226031Sstas#include <sys/socketvar.h>
70226031Sstas#include <sys/sx.h>
71226031Sstas#include <sys/syslog.h>
72226031Sstas#include <sys/time.h>
73226031Sstas#include <sys/uio.h>
74226031Sstas
75226031Sstas#include <net/vnet.h>
76226031Sstas
77226031Sstas#include <netinet/tcp.h>
78226031Sstas
79226031Sstas#include <rpc/rpc.h>
80226031Sstas#include <rpc/rpc_com.h>
81226031Sstas#include <rpc/krpc.h>
82226031Sstas
83226031Sstasstruct cmessage {
84226031Sstas        struct cmsghdr cmsg;
85226031Sstas        struct cmsgcred cmcred;
86226031Sstas};
87226031Sstas
88226031Sstasstatic enum clnt_stat clnt_vc_call(CLIENT *, struct rpc_callextra *,
89226031Sstas    rpcproc_t, struct mbuf *, struct mbuf **, struct timeval);
90226031Sstasstatic void clnt_vc_geterr(CLIENT *, struct rpc_err *);
91226031Sstasstatic bool_t clnt_vc_freeres(CLIENT *, xdrproc_t, void *);
92226031Sstasstatic void clnt_vc_abort(CLIENT *);
93226031Sstasstatic bool_t clnt_vc_control(CLIENT *, u_int, void *);
94226031Sstasstatic void clnt_vc_close(CLIENT *);
95226031Sstasstatic void clnt_vc_destroy(CLIENT *);
96226031Sstasstatic bool_t time_not_ok(struct timeval *);
97226031Sstasstatic int clnt_vc_soupcall(struct socket *so, void *arg, int waitflag);
98226031Sstas
99226031Sstasstatic struct clnt_ops clnt_vc_ops = {
100226031Sstas	.cl_call =	clnt_vc_call,
101226031Sstas	.cl_abort =	clnt_vc_abort,
102226031Sstas	.cl_geterr =	clnt_vc_geterr,
103226031Sstas	.cl_freeres =	clnt_vc_freeres,
104226031Sstas	.cl_close =	clnt_vc_close,
105226031Sstas	.cl_destroy =	clnt_vc_destroy,
106226031Sstas	.cl_control =	clnt_vc_control
107226031Sstas};
108226031Sstas
109226031Sstasstatic void clnt_vc_upcallsdone(struct ct_data *);
110226031Sstas
111226031Sstasstatic const char clnt_vc_errstr[] = "%s : %s";
112226031Sstasstatic const char clnt_vc_str[] = "clnt_vc_create";
113226031Sstasstatic const char clnt_read_vc_str[] = "read_vc";
114226031Sstasstatic const char __no_mem_str[] = "out of memory";
115226031Sstas
116226031Sstas/*
117226031Sstas * Create a client handle for a connection.
118226031Sstas * Default options are set, which the user can change using clnt_control()'s.
119226031Sstas * The rpc/vc package does buffering similar to stdio, so the client
120226031Sstas * must pick send and receive buffer sizes, 0 => use the default.
121226031Sstas * NB: fd is copied into a private area.
122226031Sstas * NB: The rpch->cl_auth is set null authentication. Caller may wish to
123226031Sstas * set this something more useful.
124226031Sstas *
125226031Sstas * fd should be an open socket
126226031Sstas */
127226031SstasCLIENT *
128226031Sstasclnt_vc_create(
129226031Sstas	struct socket *so,		/* open file descriptor */
130226031Sstas	struct sockaddr *raddr,		/* servers address */
131226031Sstas	const rpcprog_t prog,		/* program number */
132226031Sstas	const rpcvers_t vers,		/* version number */
133226031Sstas	size_t sendsz,			/* buffer recv size */
134226031Sstas	size_t recvsz,			/* buffer send size */
135226031Sstas	int intrflag)			/* interruptible */
136226031Sstas{
137	CLIENT *cl;			/* client handle */
138	struct ct_data *ct = NULL;	/* client handle */
139	struct timeval now;
140	struct rpc_msg call_msg;
141	static uint32_t disrupt;
142	struct __rpc_sockinfo si;
143	XDR xdrs;
144	int error, interrupted, one = 1, sleep_flag;
145	struct sockopt sopt;
146
147	if (disrupt == 0)
148		disrupt = (uint32_t)(long)raddr;
149
150	cl = (CLIENT *)mem_alloc(sizeof (*cl));
151	ct = (struct ct_data *)mem_alloc(sizeof (*ct));
152
153	mtx_init(&ct->ct_lock, "ct->ct_lock", NULL, MTX_DEF);
154	ct->ct_threads = 0;
155	ct->ct_closing = FALSE;
156	ct->ct_closed = FALSE;
157	ct->ct_upcallrefs = 0;
158
159	if ((so->so_state & (SS_ISCONNECTED|SS_ISCONFIRMING)) == 0) {
160		error = soconnect(so, raddr, curthread);
161		SOCK_LOCK(so);
162		interrupted = 0;
163		sleep_flag = PSOCK;
164		if (intrflag != 0)
165			sleep_flag |= (PCATCH | PBDRY);
166		while ((so->so_state & SS_ISCONNECTING)
167		    && so->so_error == 0) {
168			error = msleep(&so->so_timeo, SOCK_MTX(so),
169			    sleep_flag, "connec", 0);
170			if (error) {
171				if (error == EINTR || error == ERESTART)
172					interrupted = 1;
173				break;
174			}
175		}
176		if (error == 0) {
177			error = so->so_error;
178			so->so_error = 0;
179		}
180		SOCK_UNLOCK(so);
181		if (error) {
182			if (!interrupted)
183				so->so_state &= ~SS_ISCONNECTING;
184			rpc_createerr.cf_stat = RPC_SYSTEMERROR;
185			rpc_createerr.cf_error.re_errno = error;
186			goto err;
187		}
188	}
189
190	if (!__rpc_socket2sockinfo(so, &si)) {
191		goto err;
192	}
193
194	if (so->so_proto->pr_flags & PR_CONNREQUIRED) {
195		bzero(&sopt, sizeof(sopt));
196		sopt.sopt_dir = SOPT_SET;
197		sopt.sopt_level = SOL_SOCKET;
198		sopt.sopt_name = SO_KEEPALIVE;
199		sopt.sopt_val = &one;
200		sopt.sopt_valsize = sizeof(one);
201		sosetopt(so, &sopt);
202	}
203
204	if (so->so_proto->pr_protocol == IPPROTO_TCP) {
205		bzero(&sopt, sizeof(sopt));
206		sopt.sopt_dir = SOPT_SET;
207		sopt.sopt_level = IPPROTO_TCP;
208		sopt.sopt_name = TCP_NODELAY;
209		sopt.sopt_val = &one;
210		sopt.sopt_valsize = sizeof(one);
211		sosetopt(so, &sopt);
212	}
213
214	ct->ct_closeit = FALSE;
215
216	/*
217	 * Set up private data struct
218	 */
219	ct->ct_socket = so;
220	ct->ct_wait.tv_sec = -1;
221	ct->ct_wait.tv_usec = -1;
222	memcpy(&ct->ct_addr, raddr, raddr->sa_len);
223
224	/*
225	 * Initialize call message
226	 */
227	getmicrotime(&now);
228	ct->ct_xid = ((uint32_t)++disrupt) ^ __RPC_GETXID(&now);
229	call_msg.rm_xid = ct->ct_xid;
230	call_msg.rm_direction = CALL;
231	call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION;
232	call_msg.rm_call.cb_prog = (uint32_t)prog;
233	call_msg.rm_call.cb_vers = (uint32_t)vers;
234
235	/*
236	 * pre-serialize the static part of the call msg and stash it away
237	 */
238	xdrmem_create(&xdrs, ct->ct_mcallc, MCALL_MSG_SIZE,
239	    XDR_ENCODE);
240	if (! xdr_callhdr(&xdrs, &call_msg)) {
241		if (ct->ct_closeit) {
242			soclose(ct->ct_socket);
243		}
244		goto err;
245	}
246	ct->ct_mpos = XDR_GETPOS(&xdrs);
247	XDR_DESTROY(&xdrs);
248	ct->ct_waitchan = "rpcrecv";
249	ct->ct_waitflag = 0;
250
251	/*
252	 * Create a client handle which uses xdrrec for serialization
253	 * and authnone for authentication.
254	 */
255	sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz);
256	recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz);
257	error = soreserve(ct->ct_socket, sendsz, recvsz);
258	if (error != 0) {
259		if (ct->ct_closeit) {
260			soclose(ct->ct_socket);
261		}
262		goto err;
263	}
264	cl->cl_refs = 1;
265	cl->cl_ops = &clnt_vc_ops;
266	cl->cl_private = ct;
267	cl->cl_auth = authnone_create();
268
269	SOCKBUF_LOCK(&ct->ct_socket->so_rcv);
270	soupcall_set(ct->ct_socket, SO_RCV, clnt_vc_soupcall, ct);
271	SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv);
272
273	ct->ct_record = NULL;
274	ct->ct_record_resid = 0;
275	TAILQ_INIT(&ct->ct_pending);
276	return (cl);
277
278err:
279	if (cl) {
280		if (ct) {
281			mtx_destroy(&ct->ct_lock);
282			mem_free(ct, sizeof (struct ct_data));
283		}
284		if (cl)
285			mem_free(cl, sizeof (CLIENT));
286	}
287	return ((CLIENT *)NULL);
288}
289
290static enum clnt_stat
291clnt_vc_call(
292	CLIENT		*cl,		/* client handle */
293	struct rpc_callextra *ext,	/* call metadata */
294	rpcproc_t	proc,		/* procedure number */
295	struct mbuf	*args,		/* pointer to args */
296	struct mbuf	**resultsp,	/* pointer to results */
297	struct timeval	utimeout)
298{
299	struct ct_data *ct = (struct ct_data *) cl->cl_private;
300	AUTH *auth;
301	struct rpc_err *errp;
302	enum clnt_stat stat;
303	XDR xdrs;
304	struct rpc_msg reply_msg;
305	bool_t ok;
306	int nrefreshes = 2;		/* number of times to refresh cred */
307	struct timeval timeout;
308	uint32_t xid;
309	struct mbuf *mreq = NULL, *results;
310	struct ct_request *cr;
311	int error;
312
313	cr = malloc(sizeof(struct ct_request), M_RPC, M_WAITOK);
314
315	mtx_lock(&ct->ct_lock);
316
317	if (ct->ct_closing || ct->ct_closed) {
318		mtx_unlock(&ct->ct_lock);
319		free(cr, M_RPC);
320		return (RPC_CANTSEND);
321	}
322	ct->ct_threads++;
323
324	if (ext) {
325		auth = ext->rc_auth;
326		errp = &ext->rc_err;
327	} else {
328		auth = cl->cl_auth;
329		errp = &ct->ct_error;
330	}
331
332	cr->cr_mrep = NULL;
333	cr->cr_error = 0;
334
335	if (ct->ct_wait.tv_usec == -1) {
336		timeout = utimeout;	/* use supplied timeout */
337	} else {
338		timeout = ct->ct_wait;	/* use default timeout */
339	}
340
341call_again:
342	mtx_assert(&ct->ct_lock, MA_OWNED);
343
344	ct->ct_xid++;
345	xid = ct->ct_xid;
346
347	mtx_unlock(&ct->ct_lock);
348
349	/*
350	 * Leave space to pre-pend the record mark.
351	 */
352	mreq = m_gethdr(M_WAITOK, MT_DATA);
353	mreq->m_data += sizeof(uint32_t);
354	KASSERT(ct->ct_mpos + sizeof(uint32_t) <= MHLEN,
355	    ("RPC header too big"));
356	bcopy(ct->ct_mcallc, mreq->m_data, ct->ct_mpos);
357	mreq->m_len = ct->ct_mpos;
358
359	/*
360	 * The XID is the first thing in the request.
361	 */
362	*mtod(mreq, uint32_t *) = htonl(xid);
363
364	xdrmbuf_create(&xdrs, mreq, XDR_ENCODE);
365
366	errp->re_status = stat = RPC_SUCCESS;
367
368	if ((! XDR_PUTINT32(&xdrs, &proc)) ||
369	    (! AUTH_MARSHALL(auth, xid, &xdrs,
370		m_copym(args, 0, M_COPYALL, M_WAITOK)))) {
371		errp->re_status = stat = RPC_CANTENCODEARGS;
372		mtx_lock(&ct->ct_lock);
373		goto out;
374	}
375	mreq->m_pkthdr.len = m_length(mreq, NULL);
376
377	/*
378	 * Prepend a record marker containing the packet length.
379	 */
380	M_PREPEND(mreq, sizeof(uint32_t), M_WAITOK);
381	*mtod(mreq, uint32_t *) =
382		htonl(0x80000000 | (mreq->m_pkthdr.len - sizeof(uint32_t)));
383
384	cr->cr_xid = xid;
385	mtx_lock(&ct->ct_lock);
386	/*
387	 * Check to see if the other end has already started to close down
388	 * the connection. The upcall will have set ct_error.re_status
389	 * to RPC_CANTRECV if this is the case.
390	 * If the other end starts to close down the connection after this
391	 * point, it will be detected later when cr_error is checked,
392	 * since the request is in the ct_pending queue.
393	 */
394	if (ct->ct_error.re_status == RPC_CANTRECV) {
395		if (errp != &ct->ct_error) {
396			errp->re_errno = ct->ct_error.re_errno;
397			errp->re_status = RPC_CANTRECV;
398		}
399		stat = RPC_CANTRECV;
400		goto out;
401	}
402	TAILQ_INSERT_TAIL(&ct->ct_pending, cr, cr_link);
403	mtx_unlock(&ct->ct_lock);
404
405	/*
406	 * sosend consumes mreq.
407	 */
408	error = sosend(ct->ct_socket, NULL, NULL, mreq, NULL, 0, curthread);
409	mreq = NULL;
410	if (error == EMSGSIZE) {
411		SOCKBUF_LOCK(&ct->ct_socket->so_snd);
412		sbwait(&ct->ct_socket->so_snd);
413		SOCKBUF_UNLOCK(&ct->ct_socket->so_snd);
414		AUTH_VALIDATE(auth, xid, NULL, NULL);
415		mtx_lock(&ct->ct_lock);
416		TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
417		goto call_again;
418	}
419
420	reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL;
421	reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf;
422	reply_msg.acpted_rply.ar_verf.oa_length = 0;
423	reply_msg.acpted_rply.ar_results.where = NULL;
424	reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void;
425
426	mtx_lock(&ct->ct_lock);
427	if (error) {
428		TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
429		errp->re_errno = error;
430		errp->re_status = stat = RPC_CANTSEND;
431		goto out;
432	}
433
434	/*
435	 * Check to see if we got an upcall while waiting for the
436	 * lock. In both these cases, the request has been removed
437	 * from ct->ct_pending.
438	 */
439	if (cr->cr_error) {
440		TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
441		errp->re_errno = cr->cr_error;
442		errp->re_status = stat = RPC_CANTRECV;
443		goto out;
444	}
445	if (cr->cr_mrep) {
446		TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
447		goto got_reply;
448	}
449
450	/*
451	 * Hack to provide rpc-based message passing
452	 */
453	if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
454		TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
455		errp->re_status = stat = RPC_TIMEDOUT;
456		goto out;
457	}
458
459	error = msleep(cr, &ct->ct_lock, ct->ct_waitflag, ct->ct_waitchan,
460	    tvtohz(&timeout));
461
462	TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
463
464	if (error) {
465		/*
466		 * The sleep returned an error so our request is still
467		 * on the list. Turn the error code into an
468		 * appropriate client status.
469		 */
470		errp->re_errno = error;
471		switch (error) {
472		case EINTR:
473		case ERESTART:
474			stat = RPC_INTR;
475			break;
476		case EWOULDBLOCK:
477			stat = RPC_TIMEDOUT;
478			break;
479		default:
480			stat = RPC_CANTRECV;
481		}
482		errp->re_status = stat;
483		goto out;
484	} else {
485		/*
486		 * We were woken up by the upcall.  If the
487		 * upcall had a receive error, report that,
488		 * otherwise we have a reply.
489		 */
490		if (cr->cr_error) {
491			errp->re_errno = cr->cr_error;
492			errp->re_status = stat = RPC_CANTRECV;
493			goto out;
494		}
495	}
496
497got_reply:
498	/*
499	 * Now decode and validate the response. We need to drop the
500	 * lock since xdr_replymsg may end up sleeping in malloc.
501	 */
502	mtx_unlock(&ct->ct_lock);
503
504	if (ext && ext->rc_feedback)
505		ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg);
506
507	xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE);
508	ok = xdr_replymsg(&xdrs, &reply_msg);
509	cr->cr_mrep = NULL;
510
511	if (ok) {
512		if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) &&
513		    (reply_msg.acpted_rply.ar_stat == SUCCESS))
514			errp->re_status = stat = RPC_SUCCESS;
515		else
516			stat = _seterr_reply(&reply_msg, errp);
517
518		if (stat == RPC_SUCCESS) {
519			results = xdrmbuf_getall(&xdrs);
520			if (!AUTH_VALIDATE(auth, xid,
521				&reply_msg.acpted_rply.ar_verf,
522				&results)) {
523				errp->re_status = stat = RPC_AUTHERROR;
524				errp->re_why = AUTH_INVALIDRESP;
525			} else {
526				KASSERT(results,
527				    ("auth validated but no result"));
528				*resultsp = results;
529			}
530		}		/* end successful completion */
531		/*
532		 * If unsuccesful AND error is an authentication error
533		 * then refresh credentials and try again, else break
534		 */
535		else if (stat == RPC_AUTHERROR)
536			/* maybe our credentials need to be refreshed ... */
537			if (nrefreshes > 0 &&
538			    AUTH_REFRESH(auth, &reply_msg)) {
539				nrefreshes--;
540				XDR_DESTROY(&xdrs);
541				mtx_lock(&ct->ct_lock);
542				goto call_again;
543			}
544		/* end of unsuccessful completion */
545	}	/* end of valid reply message */
546	else {
547		errp->re_status = stat = RPC_CANTDECODERES;
548	}
549	XDR_DESTROY(&xdrs);
550	mtx_lock(&ct->ct_lock);
551out:
552	mtx_assert(&ct->ct_lock, MA_OWNED);
553
554	KASSERT(stat != RPC_SUCCESS || *resultsp,
555	    ("RPC_SUCCESS without reply"));
556
557	if (mreq)
558		m_freem(mreq);
559	if (cr->cr_mrep)
560		m_freem(cr->cr_mrep);
561
562	ct->ct_threads--;
563	if (ct->ct_closing)
564		wakeup(ct);
565
566	mtx_unlock(&ct->ct_lock);
567
568	if (auth && stat != RPC_SUCCESS)
569		AUTH_VALIDATE(auth, xid, NULL, NULL);
570
571	free(cr, M_RPC);
572
573	return (stat);
574}
575
576static void
577clnt_vc_geterr(CLIENT *cl, struct rpc_err *errp)
578{
579	struct ct_data *ct = (struct ct_data *) cl->cl_private;
580
581	*errp = ct->ct_error;
582}
583
584static bool_t
585clnt_vc_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr)
586{
587	XDR xdrs;
588	bool_t dummy;
589
590	xdrs.x_op = XDR_FREE;
591	dummy = (*xdr_res)(&xdrs, res_ptr);
592
593	return (dummy);
594}
595
596/*ARGSUSED*/
597static void
598clnt_vc_abort(CLIENT *cl)
599{
600}
601
602static bool_t
603clnt_vc_control(CLIENT *cl, u_int request, void *info)
604{
605	struct ct_data *ct = (struct ct_data *)cl->cl_private;
606	void *infop = info;
607	SVCXPRT *xprt;
608
609	mtx_lock(&ct->ct_lock);
610
611	switch (request) {
612	case CLSET_FD_CLOSE:
613		ct->ct_closeit = TRUE;
614		mtx_unlock(&ct->ct_lock);
615		return (TRUE);
616	case CLSET_FD_NCLOSE:
617		ct->ct_closeit = FALSE;
618		mtx_unlock(&ct->ct_lock);
619		return (TRUE);
620	default:
621		break;
622	}
623
624	/* for other requests which use info */
625	if (info == NULL) {
626		mtx_unlock(&ct->ct_lock);
627		return (FALSE);
628	}
629	switch (request) {
630	case CLSET_TIMEOUT:
631		if (time_not_ok((struct timeval *)info)) {
632			mtx_unlock(&ct->ct_lock);
633			return (FALSE);
634		}
635		ct->ct_wait = *(struct timeval *)infop;
636		break;
637	case CLGET_TIMEOUT:
638		*(struct timeval *)infop = ct->ct_wait;
639		break;
640	case CLGET_SERVER_ADDR:
641		(void) memcpy(info, &ct->ct_addr, (size_t)ct->ct_addr.ss_len);
642		break;
643	case CLGET_SVC_ADDR:
644		/*
645		 * Slightly different semantics to userland - we use
646		 * sockaddr instead of netbuf.
647		 */
648		memcpy(info, &ct->ct_addr, ct->ct_addr.ss_len);
649		break;
650	case CLSET_SVC_ADDR:		/* set to new address */
651		mtx_unlock(&ct->ct_lock);
652		return (FALSE);
653	case CLGET_XID:
654		*(uint32_t *)info = ct->ct_xid;
655		break;
656	case CLSET_XID:
657		/* This will set the xid of the NEXT call */
658		/* decrement by 1 as clnt_vc_call() increments once */
659		ct->ct_xid = *(uint32_t *)info - 1;
660		break;
661	case CLGET_VERS:
662		/*
663		 * This RELIES on the information that, in the call body,
664		 * the version number field is the fifth field from the
665		 * begining of the RPC header. MUST be changed if the
666		 * call_struct is changed
667		 */
668		*(uint32_t *)info =
669		    ntohl(*(uint32_t *)(void *)(ct->ct_mcallc +
670		    4 * BYTES_PER_XDR_UNIT));
671		break;
672
673	case CLSET_VERS:
674		*(uint32_t *)(void *)(ct->ct_mcallc +
675		    4 * BYTES_PER_XDR_UNIT) =
676		    htonl(*(uint32_t *)info);
677		break;
678
679	case CLGET_PROG:
680		/*
681		 * This RELIES on the information that, in the call body,
682		 * the program number field is the fourth field from the
683		 * begining of the RPC header. MUST be changed if the
684		 * call_struct is changed
685		 */
686		*(uint32_t *)info =
687		    ntohl(*(uint32_t *)(void *)(ct->ct_mcallc +
688		    3 * BYTES_PER_XDR_UNIT));
689		break;
690
691	case CLSET_PROG:
692		*(uint32_t *)(void *)(ct->ct_mcallc +
693		    3 * BYTES_PER_XDR_UNIT) =
694		    htonl(*(uint32_t *)info);
695		break;
696
697	case CLSET_WAITCHAN:
698		ct->ct_waitchan = (const char *)info;
699		break;
700
701	case CLGET_WAITCHAN:
702		*(const char **) info = ct->ct_waitchan;
703		break;
704
705	case CLSET_INTERRUPTIBLE:
706		if (*(int *) info)
707			ct->ct_waitflag = PCATCH | PBDRY;
708		else
709			ct->ct_waitflag = 0;
710		break;
711
712	case CLGET_INTERRUPTIBLE:
713		if (ct->ct_waitflag)
714			*(int *) info = TRUE;
715		else
716			*(int *) info = FALSE;
717		break;
718
719	case CLSET_BACKCHANNEL:
720		xprt = (SVCXPRT *)info;
721		if (ct->ct_backchannelxprt == NULL) {
722			xprt->xp_p2 = ct;
723			ct->ct_backchannelxprt = xprt;
724		}
725		break;
726
727	default:
728		mtx_unlock(&ct->ct_lock);
729		return (FALSE);
730	}
731
732	mtx_unlock(&ct->ct_lock);
733	return (TRUE);
734}
735
736static void
737clnt_vc_close(CLIENT *cl)
738{
739	struct ct_data *ct = (struct ct_data *) cl->cl_private;
740	struct ct_request *cr;
741
742	mtx_lock(&ct->ct_lock);
743
744	if (ct->ct_closed) {
745		mtx_unlock(&ct->ct_lock);
746		return;
747	}
748
749	if (ct->ct_closing) {
750		while (ct->ct_closing)
751			msleep(ct, &ct->ct_lock, 0, "rpcclose", 0);
752		KASSERT(ct->ct_closed, ("client should be closed"));
753		mtx_unlock(&ct->ct_lock);
754		return;
755	}
756
757	if (ct->ct_socket) {
758		ct->ct_closing = TRUE;
759		mtx_unlock(&ct->ct_lock);
760
761		SOCKBUF_LOCK(&ct->ct_socket->so_rcv);
762		soupcall_clear(ct->ct_socket, SO_RCV);
763		clnt_vc_upcallsdone(ct);
764		SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv);
765
766		/*
767		 * Abort any pending requests and wait until everyone
768		 * has finished with clnt_vc_call.
769		 */
770		mtx_lock(&ct->ct_lock);
771		TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
772			cr->cr_xid = 0;
773			cr->cr_error = ESHUTDOWN;
774			wakeup(cr);
775		}
776
777		while (ct->ct_threads)
778			msleep(ct, &ct->ct_lock, 0, "rpcclose", 0);
779	}
780
781	ct->ct_closing = FALSE;
782	ct->ct_closed = TRUE;
783	mtx_unlock(&ct->ct_lock);
784	wakeup(ct);
785}
786
787static void
788clnt_vc_destroy(CLIENT *cl)
789{
790	struct ct_data *ct = (struct ct_data *) cl->cl_private;
791	struct socket *so = NULL;
792	SVCXPRT *xprt;
793
794	clnt_vc_close(cl);
795
796	mtx_lock(&ct->ct_lock);
797	xprt = ct->ct_backchannelxprt;
798	ct->ct_backchannelxprt = NULL;
799	if (xprt != NULL) {
800		mtx_unlock(&ct->ct_lock);	/* To avoid a LOR. */
801		sx_xlock(&xprt->xp_lock);
802		mtx_lock(&ct->ct_lock);
803		xprt->xp_p2 = NULL;
804		xprt_unregister(xprt);
805	}
806
807	if (ct->ct_socket) {
808		if (ct->ct_closeit) {
809			so = ct->ct_socket;
810		}
811	}
812
813	mtx_unlock(&ct->ct_lock);
814	if (xprt != NULL) {
815		sx_xunlock(&xprt->xp_lock);
816		SVC_RELEASE(xprt);
817	}
818
819	mtx_destroy(&ct->ct_lock);
820	if (so) {
821		soshutdown(so, SHUT_WR);
822		soclose(so);
823	}
824	mem_free(ct, sizeof(struct ct_data));
825	if (cl->cl_netid && cl->cl_netid[0])
826		mem_free(cl->cl_netid, strlen(cl->cl_netid) +1);
827	if (cl->cl_tp && cl->cl_tp[0])
828		mem_free(cl->cl_tp, strlen(cl->cl_tp) +1);
829	mem_free(cl, sizeof(CLIENT));
830}
831
832/*
833 * Make sure that the time is not garbage.   -1 value is disallowed.
834 * Note this is different from time_not_ok in clnt_dg.c
835 */
836static bool_t
837time_not_ok(struct timeval *t)
838{
839	return (t->tv_sec <= -1 || t->tv_sec > 100000000 ||
840		t->tv_usec <= -1 || t->tv_usec > 1000000);
841}
842
843int
844clnt_vc_soupcall(struct socket *so, void *arg, int waitflag)
845{
846	struct ct_data *ct = (struct ct_data *) arg;
847	struct uio uio;
848	struct mbuf *m, *m2;
849	struct ct_request *cr;
850	int error, rcvflag, foundreq;
851	uint32_t xid_plus_direction[2], header;
852	bool_t do_read;
853	SVCXPRT *xprt;
854	struct cf_conn *cd;
855
856	CTASSERT(sizeof(xid_plus_direction) == 2 * sizeof(uint32_t));
857	ct->ct_upcallrefs++;
858	uio.uio_td = curthread;
859	do {
860		/*
861		 * If ct_record_resid is zero, we are waiting for a
862		 * record mark.
863		 */
864		if (ct->ct_record_resid == 0) {
865
866			/*
867			 * Make sure there is either a whole record
868			 * mark in the buffer or there is some other
869			 * error condition
870			 */
871			do_read = FALSE;
872			if (so->so_rcv.sb_cc >= sizeof(uint32_t)
873			    || (so->so_rcv.sb_state & SBS_CANTRCVMORE)
874			    || so->so_error)
875				do_read = TRUE;
876
877			if (!do_read)
878				break;
879
880			SOCKBUF_UNLOCK(&so->so_rcv);
881			uio.uio_resid = sizeof(uint32_t);
882			m = NULL;
883			rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
884			error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
885			SOCKBUF_LOCK(&so->so_rcv);
886
887			if (error == EWOULDBLOCK)
888				break;
889
890			/*
891			 * If there was an error, wake up all pending
892			 * requests.
893			 */
894			if (error || uio.uio_resid > 0) {
895			wakeup_all:
896				mtx_lock(&ct->ct_lock);
897				if (!error) {
898					/*
899					 * We must have got EOF trying
900					 * to read from the stream.
901					 */
902					error = ECONNRESET;
903				}
904				ct->ct_error.re_status = RPC_CANTRECV;
905				ct->ct_error.re_errno = error;
906				TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
907					cr->cr_error = error;
908					wakeup(cr);
909				}
910				mtx_unlock(&ct->ct_lock);
911				break;
912			}
913			m_copydata(m, 0, sizeof(uint32_t), (char *)&header);
914			header = ntohl(header);
915			ct->ct_record = NULL;
916			ct->ct_record_resid = header & 0x7fffffff;
917			ct->ct_record_eor = ((header & 0x80000000) != 0);
918			m_freem(m);
919		} else {
920			/*
921			 * Wait until the socket has the whole record
922			 * buffered.
923			 */
924			do_read = FALSE;
925			if (so->so_rcv.sb_cc >= ct->ct_record_resid
926			    || (so->so_rcv.sb_state & SBS_CANTRCVMORE)
927			    || so->so_error)
928				do_read = TRUE;
929
930			if (!do_read)
931				break;
932
933			/*
934			 * We have the record mark. Read as much as
935			 * the socket has buffered up to the end of
936			 * this record.
937			 */
938			SOCKBUF_UNLOCK(&so->so_rcv);
939			uio.uio_resid = ct->ct_record_resid;
940			m = NULL;
941			rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
942			error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
943			SOCKBUF_LOCK(&so->so_rcv);
944
945			if (error == EWOULDBLOCK)
946				break;
947
948			if (error || uio.uio_resid == ct->ct_record_resid)
949				goto wakeup_all;
950
951			/*
952			 * If we have part of the record already,
953			 * chain this bit onto the end.
954			 */
955			if (ct->ct_record)
956				m_last(ct->ct_record)->m_next = m;
957			else
958				ct->ct_record = m;
959
960			ct->ct_record_resid = uio.uio_resid;
961
962			/*
963			 * If we have the entire record, see if we can
964			 * match it to a request.
965			 */
966			if (ct->ct_record_resid == 0
967			    && ct->ct_record_eor) {
968				/*
969				 * The XID is in the first uint32_t of
970				 * the reply and the message direction
971				 * is the second one.
972				 */
973				if (ct->ct_record->m_len <
974				    sizeof(xid_plus_direction) &&
975				    m_length(ct->ct_record, NULL) <
976				    sizeof(xid_plus_direction)) {
977					m_freem(ct->ct_record);
978					break;
979				}
980				m_copydata(ct->ct_record, 0,
981				    sizeof(xid_plus_direction),
982				    (char *)xid_plus_direction);
983				xid_plus_direction[0] =
984				    ntohl(xid_plus_direction[0]);
985				xid_plus_direction[1] =
986				    ntohl(xid_plus_direction[1]);
987				/* Check message direction. */
988				if (xid_plus_direction[1] == CALL) {
989					/* This is a backchannel request. */
990					mtx_lock(&ct->ct_lock);
991					xprt = ct->ct_backchannelxprt;
992					if (xprt == NULL) {
993						mtx_unlock(&ct->ct_lock);
994						/* Just throw it away. */
995						m_freem(ct->ct_record);
996						ct->ct_record = NULL;
997					} else {
998						cd = (struct cf_conn *)
999						    xprt->xp_p1;
1000						m2 = cd->mreq;
1001						/*
1002						 * The requests are chained
1003						 * in the m_nextpkt list.
1004						 */
1005						while (m2 != NULL &&
1006						    m2->m_nextpkt != NULL)
1007							/* Find end of list. */
1008							m2 = m2->m_nextpkt;
1009						if (m2 != NULL)
1010							m2->m_nextpkt =
1011							    ct->ct_record;
1012						else
1013							cd->mreq =
1014							    ct->ct_record;
1015						ct->ct_record->m_nextpkt =
1016						    NULL;
1017						ct->ct_record = NULL;
1018						xprt_active(xprt);
1019						mtx_unlock(&ct->ct_lock);
1020					}
1021				} else {
1022					mtx_lock(&ct->ct_lock);
1023					foundreq = 0;
1024					TAILQ_FOREACH(cr, &ct->ct_pending,
1025					    cr_link) {
1026						if (cr->cr_xid ==
1027						    xid_plus_direction[0]) {
1028							/*
1029							 * This one
1030							 * matches. We leave
1031							 * the reply mbuf in
1032							 * cr->cr_mrep. Set
1033							 * the XID to zero so
1034							 * that we will ignore
1035							 * any duplicated
1036							 * replies.
1037							 */
1038							cr->cr_xid = 0;
1039							cr->cr_mrep =
1040							    ct->ct_record;
1041							cr->cr_error = 0;
1042							foundreq = 1;
1043							wakeup(cr);
1044							break;
1045						}
1046					}
1047					mtx_unlock(&ct->ct_lock);
1048
1049					if (!foundreq)
1050						m_freem(ct->ct_record);
1051					ct->ct_record = NULL;
1052				}
1053			}
1054		}
1055	} while (m);
1056	ct->ct_upcallrefs--;
1057	if (ct->ct_upcallrefs < 0)
1058		panic("rpcvc upcall refcnt");
1059	if (ct->ct_upcallrefs == 0)
1060		wakeup(&ct->ct_upcallrefs);
1061	return (SU_OK);
1062}
1063
1064/*
1065 * Wait for all upcalls in progress to complete.
1066 */
1067static void
1068clnt_vc_upcallsdone(struct ct_data *ct)
1069{
1070
1071	SOCKBUF_LOCK_ASSERT(&ct->ct_socket->so_rcv);
1072
1073	while (ct->ct_upcallrefs > 0)
1074		(void) msleep(&ct->ct_upcallrefs,
1075		    SOCKBUF_MTX(&ct->ct_socket->so_rcv), 0, "rpcvcup", 0);
1076}
1077