1/*
2 * linux/net/sunrpc/svcsock.c
3 *
4 * These are the RPC server socket internals.
5 *
6 * The server scheduling algorithm does not always distribute the load
7 * evenly when servicing a single client. May need to modify the
8 * svc_sock_enqueue procedure...
9 *
10 * TCP support is largely untested and may be a little slow. The problem
11 * is that we currently do two separate recvfrom's, one for the 4-byte
12 * record length, and the second for the actual record. This could possibly
13 * be improved by always reading a minimum size of around 100 bytes and
14 * tucking any superfluous bytes away in a temporary store. Still, that
15 * leaves write requests out in the rain. An alternative may be to peek at
16 * the first skb in the queue, and if it matches the next TCP sequence
17 * number, to extract the record marker. Yuck.
18 *
19 * Copyright (C) 1995, 1996 Olaf Kirch <okir@monad.swb.de>
20 */
21
22#include <linux/sched.h>
23#include <linux/errno.h>
24#include <linux/fcntl.h>
25#include <linux/net.h>
26#include <linux/in.h>
27#include <linux/inet.h>
28#include <linux/udp.h>
29#include <linux/tcp.h>
30#include <linux/unistd.h>
31#include <linux/slab.h>
32#include <linux/netdevice.h>
33#include <linux/skbuff.h>
34#include <linux/file.h>
35#include <linux/freezer.h>
36#include <net/sock.h>
37#include <net/checksum.h>
38#include <net/ip.h>
39#include <net/ipv6.h>
40#include <net/tcp_states.h>
41#include <asm/uaccess.h>
42#include <asm/ioctls.h>
43
44#include <linux/sunrpc/types.h>
45#include <linux/sunrpc/clnt.h>
46#include <linux/sunrpc/xdr.h>
47#include <linux/sunrpc/svcsock.h>
48#include <linux/sunrpc/stats.h>
49
50/* SMP locking strategy:
51 *
52 *	svc_pool->sp_lock protects most of the fields of that pool.
53 * 	svc_serv->sv_lock protects sv_tempsocks, sv_permsocks, sv_tmpcnt.
54 *	when both need to be taken (rare), svc_serv->sv_lock is first.
55 *	BKL protects svc_serv->sv_nrthread.
56 *	svc_sock->sk_lock protects the svc_sock->sk_deferred list
57 *             and the ->sk_info_authunix cache.
58 *	svc_sock->sk_flags.SK_BUSY prevents a svc_sock being enqueued multiply.
59 *
60 *	Some flags can be set to certain values at any time
61 *	providing that certain rules are followed:
62 *
63 *	SK_CONN, SK_DATA, can be set or cleared at any time.
64 *		after a set, svc_sock_enqueue must be called.
65 *		after a clear, the socket must be read/accepted
66 *		 if this succeeds, it must be set again.
67 *	SK_CLOSE can set at any time. It is never cleared.
68 *      sk_inuse contains a bias of '1' until SK_DEAD is set.
69 *             so when sk_inuse hits zero, we know the socket is dead
70 *             and no-one is using it.
71 *      SK_DEAD can only be set while SK_BUSY is held which ensures
72 *             no other thread will be using the socket or will try to
73 *	       set SK_DEAD.
74 *
75 */
76
77#define RPCDBG_FACILITY	RPCDBG_SVCSOCK
78
79
80static struct svc_sock *svc_setup_socket(struct svc_serv *, struct socket *,
81					 int *errp, int flags);
82static void		svc_delete_socket(struct svc_sock *svsk);
83static void		svc_udp_data_ready(struct sock *, int);
84static int		svc_udp_recvfrom(struct svc_rqst *);
85static int		svc_udp_sendto(struct svc_rqst *);
86static void		svc_close_socket(struct svc_sock *svsk);
87
88static struct svc_deferred_req *svc_deferred_dequeue(struct svc_sock *svsk);
89static int svc_deferred_recv(struct svc_rqst *rqstp);
90static struct cache_deferred_req *svc_defer(struct cache_req *req);
91
92/* apparently the "standard" is that clients close
93 * idle connections after 5 minutes, servers after
94 * 6 minutes
95 *   http://www.connectathon.org/talks96/nfstcp.pdf
96 */
97static int svc_conn_age_period = 6*60;
98
99#ifdef CONFIG_DEBUG_LOCK_ALLOC
100static struct lock_class_key svc_key[2];
101static struct lock_class_key svc_slock_key[2];
102
103static inline void svc_reclassify_socket(struct socket *sock)
104{
105	struct sock *sk = sock->sk;
106	BUG_ON(sk->sk_lock.owner != NULL);
107	switch (sk->sk_family) {
108	case AF_INET:
109		sock_lock_init_class_and_name(sk, "slock-AF_INET-NFSD",
110		    &svc_slock_key[0], "sk_lock-AF_INET-NFSD", &svc_key[0]);
111		break;
112
113	case AF_INET6:
114		sock_lock_init_class_and_name(sk, "slock-AF_INET6-NFSD",
115		    &svc_slock_key[1], "sk_lock-AF_INET6-NFSD", &svc_key[1]);
116		break;
117
118	default:
119		BUG();
120	}
121}
122#else
123static inline void svc_reclassify_socket(struct socket *sock)
124{
125}
126#endif
127
128static char *__svc_print_addr(struct sockaddr *addr, char *buf, size_t len)
129{
130	switch (addr->sa_family) {
131	case AF_INET:
132		snprintf(buf, len, "%u.%u.%u.%u, port=%u",
133			NIPQUAD(((struct sockaddr_in *) addr)->sin_addr),
134			htons(((struct sockaddr_in *) addr)->sin_port));
135		break;
136
137	case AF_INET6:
138		snprintf(buf, len, "%x:%x:%x:%x:%x:%x:%x:%x, port=%u",
139			NIP6(((struct sockaddr_in6 *) addr)->sin6_addr),
140			htons(((struct sockaddr_in6 *) addr)->sin6_port));
141		break;
142
143	default:
144		snprintf(buf, len, "unknown address type: %d", addr->sa_family);
145		break;
146	}
147	return buf;
148}
149
150/**
151 * svc_print_addr - Format rq_addr field for printing
152 * @rqstp: svc_rqst struct containing address to print
153 * @buf: target buffer for formatted address
154 * @len: length of target buffer
155 *
156 */
157char *svc_print_addr(struct svc_rqst *rqstp, char *buf, size_t len)
158{
159	return __svc_print_addr(svc_addr(rqstp), buf, len);
160}
161EXPORT_SYMBOL_GPL(svc_print_addr);
162
163/*
164 * Queue up an idle server thread.  Must have pool->sp_lock held.
165 * Note: this is really a stack rather than a queue, so that we only
166 * use as many different threads as we need, and the rest don't pollute
167 * the cache.
168 */
169static inline void
170svc_thread_enqueue(struct svc_pool *pool, struct svc_rqst *rqstp)
171{
172	list_add(&rqstp->rq_list, &pool->sp_threads);
173}
174
175/*
176 * Dequeue an nfsd thread.  Must have pool->sp_lock held.
177 */
178static inline void
179svc_thread_dequeue(struct svc_pool *pool, struct svc_rqst *rqstp)
180{
181	list_del(&rqstp->rq_list);
182}
183
184/*
185 * Release an skbuff after use
186 */
187static inline void
188svc_release_skb(struct svc_rqst *rqstp)
189{
190	struct sk_buff *skb = rqstp->rq_skbuff;
191	struct svc_deferred_req *dr = rqstp->rq_deferred;
192
193	if (skb) {
194		rqstp->rq_skbuff = NULL;
195
196		dprintk("svc: service %p, releasing skb %p\n", rqstp, skb);
197		skb_free_datagram(rqstp->rq_sock->sk_sk, skb);
198	}
199	if (dr) {
200		rqstp->rq_deferred = NULL;
201		kfree(dr);
202	}
203}
204
205/*
206 * Any space to write?
207 */
208static inline unsigned long
209svc_sock_wspace(struct svc_sock *svsk)
210{
211	int wspace;
212
213	if (svsk->sk_sock->type == SOCK_STREAM)
214		wspace = sk_stream_wspace(svsk->sk_sk);
215	else
216		wspace = sock_wspace(svsk->sk_sk);
217
218	return wspace;
219}
220
221/*
222 * Queue up a socket with data pending. If there are idle nfsd
223 * processes, wake 'em up.
224 *
225 */
226static void
227svc_sock_enqueue(struct svc_sock *svsk)
228{
229	struct svc_serv	*serv = svsk->sk_server;
230	struct svc_pool *pool;
231	struct svc_rqst	*rqstp;
232	int cpu;
233
234	if (!(svsk->sk_flags &
235	      ( (1<<SK_CONN)|(1<<SK_DATA)|(1<<SK_CLOSE)|(1<<SK_DEFERRED)) ))
236		return;
237	if (test_bit(SK_DEAD, &svsk->sk_flags))
238		return;
239
240	cpu = get_cpu();
241	pool = svc_pool_for_cpu(svsk->sk_server, cpu);
242	put_cpu();
243
244	spin_lock_bh(&pool->sp_lock);
245
246	if (!list_empty(&pool->sp_threads) &&
247	    !list_empty(&pool->sp_sockets))
248		printk(KERN_ERR
249			"svc_sock_enqueue: threads and sockets both waiting??\n");
250
251	if (test_bit(SK_DEAD, &svsk->sk_flags)) {
252		/* Don't enqueue dead sockets */
253		dprintk("svc: socket %p is dead, not enqueued\n", svsk->sk_sk);
254		goto out_unlock;
255	}
256
257	/* Mark socket as busy. It will remain in this state until the
258	 * server has processed all pending data and put the socket back
259	 * on the idle list.  We update SK_BUSY atomically because
260	 * it also guards against trying to enqueue the svc_sock twice.
261	 */
262	if (test_and_set_bit(SK_BUSY, &svsk->sk_flags)) {
263		/* Don't enqueue socket while already enqueued */
264		dprintk("svc: socket %p busy, not enqueued\n", svsk->sk_sk);
265		goto out_unlock;
266	}
267	BUG_ON(svsk->sk_pool != NULL);
268	svsk->sk_pool = pool;
269
270	set_bit(SOCK_NOSPACE, &svsk->sk_sock->flags);
271	if (((atomic_read(&svsk->sk_reserved) + serv->sv_max_mesg)*2
272	     > svc_sock_wspace(svsk))
273	    && !test_bit(SK_CLOSE, &svsk->sk_flags)
274	    && !test_bit(SK_CONN, &svsk->sk_flags)) {
275		/* Don't enqueue while not enough space for reply */
276		dprintk("svc: socket %p  no space, %d*2 > %ld, not enqueued\n",
277			svsk->sk_sk, atomic_read(&svsk->sk_reserved)+serv->sv_max_mesg,
278			svc_sock_wspace(svsk));
279		svsk->sk_pool = NULL;
280		clear_bit(SK_BUSY, &svsk->sk_flags);
281		goto out_unlock;
282	}
283	clear_bit(SOCK_NOSPACE, &svsk->sk_sock->flags);
284
285
286	if (!list_empty(&pool->sp_threads)) {
287		rqstp = list_entry(pool->sp_threads.next,
288				   struct svc_rqst,
289				   rq_list);
290		dprintk("svc: socket %p served by daemon %p\n",
291			svsk->sk_sk, rqstp);
292		svc_thread_dequeue(pool, rqstp);
293		if (rqstp->rq_sock)
294			printk(KERN_ERR
295				"svc_sock_enqueue: server %p, rq_sock=%p!\n",
296				rqstp, rqstp->rq_sock);
297		rqstp->rq_sock = svsk;
298		atomic_inc(&svsk->sk_inuse);
299		rqstp->rq_reserved = serv->sv_max_mesg;
300		atomic_add(rqstp->rq_reserved, &svsk->sk_reserved);
301		BUG_ON(svsk->sk_pool != pool);
302		wake_up(&rqstp->rq_wait);
303	} else {
304		dprintk("svc: socket %p put into queue\n", svsk->sk_sk);
305		list_add_tail(&svsk->sk_ready, &pool->sp_sockets);
306		BUG_ON(svsk->sk_pool != pool);
307	}
308
309out_unlock:
310	spin_unlock_bh(&pool->sp_lock);
311}
312
313/*
314 * Dequeue the first socket.  Must be called with the pool->sp_lock held.
315 */
316static inline struct svc_sock *
317svc_sock_dequeue(struct svc_pool *pool)
318{
319	struct svc_sock	*svsk;
320
321	if (list_empty(&pool->sp_sockets))
322		return NULL;
323
324	svsk = list_entry(pool->sp_sockets.next,
325			  struct svc_sock, sk_ready);
326	list_del_init(&svsk->sk_ready);
327
328	dprintk("svc: socket %p dequeued, inuse=%d\n",
329		svsk->sk_sk, atomic_read(&svsk->sk_inuse));
330
331	return svsk;
332}
333
334/*
335 * Having read something from a socket, check whether it
336 * needs to be re-enqueued.
337 * Note: SK_DATA only gets cleared when a read-attempt finds
338 * no (or insufficient) data.
339 */
340static inline void
341svc_sock_received(struct svc_sock *svsk)
342{
343	svsk->sk_pool = NULL;
344	clear_bit(SK_BUSY, &svsk->sk_flags);
345	svc_sock_enqueue(svsk);
346}
347
348
349/**
350 * svc_reserve - change the space reserved for the reply to a request.
351 * @rqstp:  The request in question
352 * @space: new max space to reserve
353 *
354 * Each request reserves some space on the output queue of the socket
355 * to make sure the reply fits.  This function reduces that reserved
356 * space to be the amount of space used already, plus @space.
357 *
358 */
359void svc_reserve(struct svc_rqst *rqstp, int space)
360{
361	space += rqstp->rq_res.head[0].iov_len;
362
363	if (space < rqstp->rq_reserved) {
364		struct svc_sock *svsk = rqstp->rq_sock;
365		atomic_sub((rqstp->rq_reserved - space), &svsk->sk_reserved);
366		rqstp->rq_reserved = space;
367
368		svc_sock_enqueue(svsk);
369	}
370}
371
372/*
373 * Release a socket after use.
374 */
375static inline void
376svc_sock_put(struct svc_sock *svsk)
377{
378	if (atomic_dec_and_test(&svsk->sk_inuse)) {
379		BUG_ON(! test_bit(SK_DEAD, &svsk->sk_flags));
380
381		dprintk("svc: releasing dead socket\n");
382		if (svsk->sk_sock->file)
383			sockfd_put(svsk->sk_sock);
384		else
385			sock_release(svsk->sk_sock);
386		if (svsk->sk_info_authunix != NULL)
387			svcauth_unix_info_release(svsk->sk_info_authunix);
388		kfree(svsk);
389	}
390}
391
392static void
393svc_sock_release(struct svc_rqst *rqstp)
394{
395	struct svc_sock	*svsk = rqstp->rq_sock;
396
397	svc_release_skb(rqstp);
398
399	svc_free_res_pages(rqstp);
400	rqstp->rq_res.page_len = 0;
401	rqstp->rq_res.page_base = 0;
402
403
404	/* Reset response buffer and release
405	 * the reservation.
406	 * But first, check that enough space was reserved
407	 * for the reply, otherwise we have a bug!
408	 */
409	if ((rqstp->rq_res.len) >  rqstp->rq_reserved)
410		printk(KERN_ERR "RPC request reserved %d but used %d\n",
411		       rqstp->rq_reserved,
412		       rqstp->rq_res.len);
413
414	rqstp->rq_res.head[0].iov_len = 0;
415	svc_reserve(rqstp, 0);
416	rqstp->rq_sock = NULL;
417
418	svc_sock_put(svsk);
419}
420
421/*
422 * External function to wake up a server waiting for data
423 * This really only makes sense for services like lockd
424 * which have exactly one thread anyway.
425 */
426void
427svc_wake_up(struct svc_serv *serv)
428{
429	struct svc_rqst	*rqstp;
430	unsigned int i;
431	struct svc_pool *pool;
432
433	for (i = 0; i < serv->sv_nrpools; i++) {
434		pool = &serv->sv_pools[i];
435
436		spin_lock_bh(&pool->sp_lock);
437		if (!list_empty(&pool->sp_threads)) {
438			rqstp = list_entry(pool->sp_threads.next,
439					   struct svc_rqst,
440					   rq_list);
441			dprintk("svc: daemon %p woken up.\n", rqstp);
442			/*
443			svc_thread_dequeue(pool, rqstp);
444			rqstp->rq_sock = NULL;
445			 */
446			wake_up(&rqstp->rq_wait);
447		}
448		spin_unlock_bh(&pool->sp_lock);
449	}
450}
451
452union svc_pktinfo_u {
453	struct in_pktinfo pkti;
454	struct in6_pktinfo pkti6;
455};
456#define SVC_PKTINFO_SPACE \
457	CMSG_SPACE(sizeof(union svc_pktinfo_u))
458
459static void svc_set_cmsg_data(struct svc_rqst *rqstp, struct cmsghdr *cmh)
460{
461	switch (rqstp->rq_sock->sk_sk->sk_family) {
462	case AF_INET: {
463			struct in_pktinfo *pki = CMSG_DATA(cmh);
464
465			cmh->cmsg_level = SOL_IP;
466			cmh->cmsg_type = IP_PKTINFO;
467			pki->ipi_ifindex = 0;
468			pki->ipi_spec_dst.s_addr = rqstp->rq_daddr.addr.s_addr;
469			cmh->cmsg_len = CMSG_LEN(sizeof(*pki));
470		}
471		break;
472
473	case AF_INET6: {
474			struct in6_pktinfo *pki = CMSG_DATA(cmh);
475
476			cmh->cmsg_level = SOL_IPV6;
477			cmh->cmsg_type = IPV6_PKTINFO;
478			pki->ipi6_ifindex = 0;
479			ipv6_addr_copy(&pki->ipi6_addr,
480					&rqstp->rq_daddr.addr6);
481			cmh->cmsg_len = CMSG_LEN(sizeof(*pki));
482		}
483		break;
484	}
485	return;
486}
487
488/*
489 * Generic sendto routine
490 */
491static int
492svc_sendto(struct svc_rqst *rqstp, struct xdr_buf *xdr)
493{
494	struct svc_sock	*svsk = rqstp->rq_sock;
495	struct socket	*sock = svsk->sk_sock;
496	int		slen;
497	union {
498		struct cmsghdr	hdr;
499		long		all[SVC_PKTINFO_SPACE / sizeof(long)];
500	} buffer;
501	struct cmsghdr *cmh = &buffer.hdr;
502	int		len = 0;
503	int		result;
504	int		size;
505	struct page	**ppage = xdr->pages;
506	size_t		base = xdr->page_base;
507	unsigned int	pglen = xdr->page_len;
508	unsigned int	flags = MSG_MORE;
509	char		buf[RPC_MAX_ADDRBUFLEN];
510
511	slen = xdr->len;
512
513	if (rqstp->rq_prot == IPPROTO_UDP) {
514		struct msghdr msg = {
515			.msg_name	= &rqstp->rq_addr,
516			.msg_namelen	= rqstp->rq_addrlen,
517			.msg_control	= cmh,
518			.msg_controllen	= sizeof(buffer),
519			.msg_flags	= MSG_MORE,
520		};
521
522		svc_set_cmsg_data(rqstp, cmh);
523
524		if (sock_sendmsg(sock, &msg, 0) < 0)
525			goto out;
526	}
527
528	/* send head */
529	if (slen == xdr->head[0].iov_len)
530		flags = 0;
531	len = kernel_sendpage(sock, rqstp->rq_respages[0], 0,
532				  xdr->head[0].iov_len, flags);
533	if (len != xdr->head[0].iov_len)
534		goto out;
535	slen -= xdr->head[0].iov_len;
536	if (slen == 0)
537		goto out;
538
539	/* send page data */
540	size = PAGE_SIZE - base < pglen ? PAGE_SIZE - base : pglen;
541	while (pglen > 0) {
542		if (slen == size)
543			flags = 0;
544		result = kernel_sendpage(sock, *ppage, base, size, flags);
545		if (result > 0)
546			len += result;
547		if (result != size)
548			goto out;
549		slen -= size;
550		pglen -= size;
551		size = PAGE_SIZE < pglen ? PAGE_SIZE : pglen;
552		base = 0;
553		ppage++;
554	}
555	/* send tail */
556	if (xdr->tail[0].iov_len) {
557		result = kernel_sendpage(sock, rqstp->rq_respages[0],
558					     ((unsigned long)xdr->tail[0].iov_base)
559						& (PAGE_SIZE-1),
560					     xdr->tail[0].iov_len, 0);
561
562		if (result > 0)
563			len += result;
564	}
565out:
566	dprintk("svc: socket %p sendto([%p %Zu... ], %d) = %d (addr %s)\n",
567		rqstp->rq_sock, xdr->head[0].iov_base, xdr->head[0].iov_len,
568		xdr->len, len, svc_print_addr(rqstp, buf, sizeof(buf)));
569
570	return len;
571}
572
573/*
574 * Report socket names for nfsdfs
575 */
576static int one_sock_name(char *buf, struct svc_sock *svsk)
577{
578	int len;
579
580	switch(svsk->sk_sk->sk_family) {
581	case AF_INET:
582		len = sprintf(buf, "ipv4 %s %u.%u.%u.%u %d\n",
583			      svsk->sk_sk->sk_protocol==IPPROTO_UDP?
584			      "udp" : "tcp",
585			      NIPQUAD(inet_sk(svsk->sk_sk)->rcv_saddr),
586			      inet_sk(svsk->sk_sk)->num);
587		break;
588	default:
589		len = sprintf(buf, "*unknown-%d*\n",
590			       svsk->sk_sk->sk_family);
591	}
592	return len;
593}
594
595int
596svc_sock_names(char *buf, struct svc_serv *serv, char *toclose)
597{
598	struct svc_sock *svsk, *closesk = NULL;
599	int len = 0;
600
601	if (!serv)
602		return 0;
603	spin_lock_bh(&serv->sv_lock);
604	list_for_each_entry(svsk, &serv->sv_permsocks, sk_list) {
605		int onelen = one_sock_name(buf+len, svsk);
606		if (toclose && strcmp(toclose, buf+len) == 0)
607			closesk = svsk;
608		else
609			len += onelen;
610	}
611	spin_unlock_bh(&serv->sv_lock);
612	if (closesk)
613		/* Should unregister with portmap, but you cannot
614		 * unregister just one protocol...
615		 */
616		svc_close_socket(closesk);
617	else if (toclose)
618		return -ENOENT;
619	return len;
620}
621EXPORT_SYMBOL(svc_sock_names);
622
623/*
624 * Check input queue length
625 */
626static int
627svc_recv_available(struct svc_sock *svsk)
628{
629	struct socket	*sock = svsk->sk_sock;
630	int		avail, err;
631
632	err = kernel_sock_ioctl(sock, TIOCINQ, (unsigned long) &avail);
633
634	return (err >= 0)? avail : err;
635}
636
637/*
638 * Generic recvfrom routine.
639 */
640static int
641svc_recvfrom(struct svc_rqst *rqstp, struct kvec *iov, int nr, int buflen)
642{
643	struct svc_sock *svsk = rqstp->rq_sock;
644	struct msghdr msg = {
645		.msg_flags	= MSG_DONTWAIT,
646	};
647	int len;
648
649	len = kernel_recvmsg(svsk->sk_sock, &msg, iov, nr, buflen,
650				msg.msg_flags);
651
652	/* sock_recvmsg doesn't fill in the name/namelen, so we must..
653	 */
654	memcpy(&rqstp->rq_addr, &svsk->sk_remote, svsk->sk_remotelen);
655	rqstp->rq_addrlen = svsk->sk_remotelen;
656
657	dprintk("svc: socket %p recvfrom(%p, %Zu) = %d\n",
658		svsk, iov[0].iov_base, iov[0].iov_len, len);
659
660	return len;
661}
662
663/*
664 * Set socket snd and rcv buffer lengths
665 */
666static inline void
667svc_sock_setbufsize(struct socket *sock, unsigned int snd, unsigned int rcv)
668{
669	/* sock_setsockopt limits use to sysctl_?mem_max,
670	 * which isn't acceptable.  Until that is made conditional
671	 * on not having CAP_SYS_RESOURCE or similar, we go direct...
672	 * DaveM said I could!
673	 */
674	lock_sock(sock->sk);
675	sock->sk->sk_sndbuf = snd * 2;
676	sock->sk->sk_rcvbuf = rcv * 2;
677	sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK|SOCK_RCVBUF_LOCK;
678	release_sock(sock->sk);
679}
680/*
681 * INET callback when data has been received on the socket.
682 */
683static void
684svc_udp_data_ready(struct sock *sk, int count)
685{
686	struct svc_sock	*svsk = (struct svc_sock *)sk->sk_user_data;
687
688	if (svsk) {
689		dprintk("svc: socket %p(inet %p), count=%d, busy=%d\n",
690			svsk, sk, count, test_bit(SK_BUSY, &svsk->sk_flags));
691		set_bit(SK_DATA, &svsk->sk_flags);
692		svc_sock_enqueue(svsk);
693	}
694	if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
695		wake_up_interruptible(sk->sk_sleep);
696}
697
698/*
699 * INET callback when space is newly available on the socket.
700 */
701static void
702svc_write_space(struct sock *sk)
703{
704	struct svc_sock	*svsk = (struct svc_sock *)(sk->sk_user_data);
705
706	if (svsk) {
707		dprintk("svc: socket %p(inet %p), write_space busy=%d\n",
708			svsk, sk, test_bit(SK_BUSY, &svsk->sk_flags));
709		svc_sock_enqueue(svsk);
710	}
711
712	if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) {
713		dprintk("RPC svc_write_space: someone sleeping on %p\n",
714		       svsk);
715		wake_up_interruptible(sk->sk_sleep);
716	}
717}
718
719static inline void svc_udp_get_dest_address(struct svc_rqst *rqstp,
720					    struct cmsghdr *cmh)
721{
722	switch (rqstp->rq_sock->sk_sk->sk_family) {
723	case AF_INET: {
724		struct in_pktinfo *pki = CMSG_DATA(cmh);
725		rqstp->rq_daddr.addr.s_addr = pki->ipi_spec_dst.s_addr;
726		break;
727		}
728	case AF_INET6: {
729		struct in6_pktinfo *pki = CMSG_DATA(cmh);
730		ipv6_addr_copy(&rqstp->rq_daddr.addr6, &pki->ipi6_addr);
731		break;
732		}
733	}
734}
735
736/*
737 * Receive a datagram from a UDP socket.
738 */
739static int
740svc_udp_recvfrom(struct svc_rqst *rqstp)
741{
742	struct svc_sock	*svsk = rqstp->rq_sock;
743	struct svc_serv	*serv = svsk->sk_server;
744	struct sk_buff	*skb;
745	union {
746		struct cmsghdr	hdr;
747		long		all[SVC_PKTINFO_SPACE / sizeof(long)];
748	} buffer;
749	struct cmsghdr *cmh = &buffer.hdr;
750	int		err, len;
751	struct msghdr msg = {
752		.msg_name = svc_addr(rqstp),
753		.msg_control = cmh,
754		.msg_controllen = sizeof(buffer),
755		.msg_flags = MSG_DONTWAIT,
756	};
757
758	if (test_and_clear_bit(SK_CHNGBUF, &svsk->sk_flags))
759	    /* udp sockets need large rcvbuf as all pending
760	     * requests are still in that buffer.  sndbuf must
761	     * also be large enough that there is enough space
762	     * for one reply per thread.  We count all threads
763	     * rather than threads in a particular pool, which
764	     * provides an upper bound on the number of threads
765	     * which will access the socket.
766	     */
767	    svc_sock_setbufsize(svsk->sk_sock,
768				(serv->sv_nrthreads+3) * serv->sv_max_mesg,
769				(serv->sv_nrthreads+3) * serv->sv_max_mesg);
770
771	if ((rqstp->rq_deferred = svc_deferred_dequeue(svsk))) {
772		svc_sock_received(svsk);
773		return svc_deferred_recv(rqstp);
774	}
775
776	if (test_bit(SK_CLOSE, &svsk->sk_flags)) {
777		svc_delete_socket(svsk);
778		return 0;
779	}
780
781	clear_bit(SK_DATA, &svsk->sk_flags);
782	skb = NULL;
783	err = kernel_recvmsg(svsk->sk_sock, &msg, NULL,
784			     0, 0, MSG_PEEK | MSG_DONTWAIT);
785	if (err >= 0)
786		skb = skb_recv_datagram(svsk->sk_sk, 0, 1, &err);
787
788	if (skb == NULL) {
789		if (err != -EAGAIN) {
790			/* possibly an icmp error */
791			dprintk("svc: recvfrom returned error %d\n", -err);
792			set_bit(SK_DATA, &svsk->sk_flags);
793		}
794		svc_sock_received(svsk);
795		return -EAGAIN;
796	}
797	rqstp->rq_addrlen = sizeof(rqstp->rq_addr);
798	if (skb->tstamp.tv64 == 0) {
799		skb->tstamp = ktime_get_real();
800		/* Don't enable netstamp, sunrpc doesn't
801		   need that much accuracy */
802	}
803	svsk->sk_sk->sk_stamp = skb->tstamp;
804	set_bit(SK_DATA, &svsk->sk_flags); /* there may be more data... */
805
806	/*
807	 * Maybe more packets - kick another thread ASAP.
808	 */
809	svc_sock_received(svsk);
810
811	len  = skb->len - sizeof(struct udphdr);
812	rqstp->rq_arg.len = len;
813
814	rqstp->rq_prot = IPPROTO_UDP;
815
816	if (cmh->cmsg_level != IPPROTO_IP ||
817	    cmh->cmsg_type != IP_PKTINFO) {
818		if (net_ratelimit())
819			printk("rpcsvc: received unknown control message:"
820			       "%d/%d\n",
821			       cmh->cmsg_level, cmh->cmsg_type);
822		skb_free_datagram(svsk->sk_sk, skb);
823		return 0;
824	}
825	svc_udp_get_dest_address(rqstp, cmh);
826
827	if (skb_is_nonlinear(skb)) {
828		/* we have to copy */
829		local_bh_disable();
830		if (csum_partial_copy_to_xdr(&rqstp->rq_arg, skb)) {
831			local_bh_enable();
832			/* checksum error */
833			skb_free_datagram(svsk->sk_sk, skb);
834			return 0;
835		}
836		local_bh_enable();
837		skb_free_datagram(svsk->sk_sk, skb);
838	} else {
839		/* we can use it in-place */
840		rqstp->rq_arg.head[0].iov_base = skb->data + sizeof(struct udphdr);
841		rqstp->rq_arg.head[0].iov_len = len;
842		if (skb_checksum_complete(skb)) {
843			skb_free_datagram(svsk->sk_sk, skb);
844			return 0;
845		}
846		rqstp->rq_skbuff = skb;
847	}
848
849	rqstp->rq_arg.page_base = 0;
850	if (len <= rqstp->rq_arg.head[0].iov_len) {
851		rqstp->rq_arg.head[0].iov_len = len;
852		rqstp->rq_arg.page_len = 0;
853		rqstp->rq_respages = rqstp->rq_pages+1;
854	} else {
855		rqstp->rq_arg.page_len = len - rqstp->rq_arg.head[0].iov_len;
856		rqstp->rq_respages = rqstp->rq_pages + 1 +
857			(rqstp->rq_arg.page_len + PAGE_SIZE - 1)/ PAGE_SIZE;
858	}
859
860	if (serv->sv_stats)
861		serv->sv_stats->netudpcnt++;
862
863	return len;
864}
865
866static int
867svc_udp_sendto(struct svc_rqst *rqstp)
868{
869	int		error;
870
871	error = svc_sendto(rqstp, &rqstp->rq_res);
872	if (error == -ECONNREFUSED)
873		/* ICMP error on earlier request. */
874		error = svc_sendto(rqstp, &rqstp->rq_res);
875
876	return error;
877}
878
879static void
880svc_udp_init(struct svc_sock *svsk)
881{
882	int one = 1;
883	mm_segment_t oldfs;
884
885	svsk->sk_sk->sk_data_ready = svc_udp_data_ready;
886	svsk->sk_sk->sk_write_space = svc_write_space;
887	svsk->sk_recvfrom = svc_udp_recvfrom;
888	svsk->sk_sendto = svc_udp_sendto;
889
890	/* initialise setting must have enough space to
891	 * receive and respond to one request.
892	 * svc_udp_recvfrom will re-adjust if necessary
893	 */
894	svc_sock_setbufsize(svsk->sk_sock,
895			    3 * svsk->sk_server->sv_max_mesg,
896			    3 * svsk->sk_server->sv_max_mesg);
897
898	set_bit(SK_DATA, &svsk->sk_flags); /* might have come in before data_ready set up */
899	set_bit(SK_CHNGBUF, &svsk->sk_flags);
900
901	oldfs = get_fs();
902	set_fs(KERNEL_DS);
903	/* make sure we get destination address info */
904	svsk->sk_sock->ops->setsockopt(svsk->sk_sock, IPPROTO_IP, IP_PKTINFO,
905				       (char __user *)&one, sizeof(one));
906	set_fs(oldfs);
907}
908
909/*
910 * A data_ready event on a listening socket means there's a connection
911 * pending. Do not use state_change as a substitute for it.
912 */
913static void
914svc_tcp_listen_data_ready(struct sock *sk, int count_unused)
915{
916	struct svc_sock	*svsk = (struct svc_sock *)sk->sk_user_data;
917
918	dprintk("svc: socket %p TCP (listen) state change %d\n",
919		sk, sk->sk_state);
920
921	/*
922	 * This callback may called twice when a new connection
923	 * is established as a child socket inherits everything
924	 * from a parent LISTEN socket.
925	 * 1) data_ready method of the parent socket will be called
926	 *    when one of child sockets become ESTABLISHED.
927	 * 2) data_ready method of the child socket may be called
928	 *    when it receives data before the socket is accepted.
929	 * In case of 2, we should ignore it silently.
930	 */
931	if (sk->sk_state == TCP_LISTEN) {
932		if (svsk) {
933			set_bit(SK_CONN, &svsk->sk_flags);
934			svc_sock_enqueue(svsk);
935		} else
936			printk("svc: socket %p: no user data\n", sk);
937	}
938
939	if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
940		wake_up_interruptible_all(sk->sk_sleep);
941}
942
943/*
944 * A state change on a connected socket means it's dying or dead.
945 */
946static void
947svc_tcp_state_change(struct sock *sk)
948{
949	struct svc_sock	*svsk = (struct svc_sock *)sk->sk_user_data;
950
951	dprintk("svc: socket %p TCP (connected) state change %d (svsk %p)\n",
952		sk, sk->sk_state, sk->sk_user_data);
953
954	if (!svsk)
955		printk("svc: socket %p: no user data\n", sk);
956	else {
957		set_bit(SK_CLOSE, &svsk->sk_flags);
958		svc_sock_enqueue(svsk);
959	}
960	if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
961		wake_up_interruptible_all(sk->sk_sleep);
962}
963
964static void
965svc_tcp_data_ready(struct sock *sk, int count)
966{
967	struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data;
968
969	dprintk("svc: socket %p TCP data ready (svsk %p)\n",
970		sk, sk->sk_user_data);
971	if (svsk) {
972		set_bit(SK_DATA, &svsk->sk_flags);
973		svc_sock_enqueue(svsk);
974	}
975	if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
976		wake_up_interruptible(sk->sk_sleep);
977}
978
979static inline int svc_port_is_privileged(struct sockaddr *sin)
980{
981	switch (sin->sa_family) {
982	case AF_INET:
983		return ntohs(((struct sockaddr_in *)sin)->sin_port)
984			< PROT_SOCK;
985	case AF_INET6:
986		return ntohs(((struct sockaddr_in6 *)sin)->sin6_port)
987			< PROT_SOCK;
988	default:
989		return 0;
990	}
991}
992
993/*
994 * Accept a TCP connection
995 */
996static void
997svc_tcp_accept(struct svc_sock *svsk)
998{
999	struct sockaddr_storage addr;
1000	struct sockaddr	*sin = (struct sockaddr *) &addr;
1001	struct svc_serv	*serv = svsk->sk_server;
1002	struct socket	*sock = svsk->sk_sock;
1003	struct socket	*newsock;
1004	struct svc_sock	*newsvsk;
1005	int		err, slen;
1006	char		buf[RPC_MAX_ADDRBUFLEN];
1007
1008	dprintk("svc: tcp_accept %p sock %p\n", svsk, sock);
1009	if (!sock)
1010		return;
1011
1012	clear_bit(SK_CONN, &svsk->sk_flags);
1013	err = kernel_accept(sock, &newsock, O_NONBLOCK);
1014	if (err < 0) {
1015		if (err == -ENOMEM)
1016			printk(KERN_WARNING "%s: no more sockets!\n",
1017			       serv->sv_name);
1018		else if (err != -EAGAIN && net_ratelimit())
1019			printk(KERN_WARNING "%s: accept failed (err %d)!\n",
1020				   serv->sv_name, -err);
1021		return;
1022	}
1023
1024	set_bit(SK_CONN, &svsk->sk_flags);
1025	svc_sock_enqueue(svsk);
1026
1027	err = kernel_getpeername(newsock, sin, &slen);
1028	if (err < 0) {
1029		if (net_ratelimit())
1030			printk(KERN_WARNING "%s: peername failed (err %d)!\n",
1031				   serv->sv_name, -err);
1032		goto failed;		/* aborted connection or whatever */
1033	}
1034
1035	/* Ideally, we would want to reject connections from unauthorized
1036	 * hosts here, but when we get encryption, the IP of the host won't
1037	 * tell us anything.  For now just warn about unpriv connections.
1038	 */
1039	if (!svc_port_is_privileged(sin)) {
1040		dprintk(KERN_WARNING
1041			"%s: connect from unprivileged port: %s\n",
1042			serv->sv_name,
1043			__svc_print_addr(sin, buf, sizeof(buf)));
1044	}
1045	dprintk("%s: connect from %s\n", serv->sv_name,
1046		__svc_print_addr(sin, buf, sizeof(buf)));
1047
1048	/* make sure that a write doesn't block forever when
1049	 * low on memory
1050	 */
1051	newsock->sk->sk_sndtimeo = HZ*30;
1052
1053	if (!(newsvsk = svc_setup_socket(serv, newsock, &err,
1054				 (SVC_SOCK_ANONYMOUS | SVC_SOCK_TEMPORARY))))
1055		goto failed;
1056	memcpy(&newsvsk->sk_remote, sin, slen);
1057	newsvsk->sk_remotelen = slen;
1058
1059	svc_sock_received(newsvsk);
1060
1061	/* make sure that we don't have too many active connections.
1062	 * If we have, something must be dropped.
1063	 *
1064	 * There's no point in trying to do random drop here for
1065	 * DoS prevention. The NFS clients does 1 reconnect in 15
1066	 * seconds. An attacker can easily beat that.
1067	 *
1068	 * The only somewhat efficient mechanism would be if drop
1069	 * old connections from the same IP first. But right now
1070	 * we don't even record the client IP in svc_sock.
1071	 */
1072	if (serv->sv_tmpcnt > (serv->sv_nrthreads+3)*20) {
1073		struct svc_sock *svsk = NULL;
1074		spin_lock_bh(&serv->sv_lock);
1075		if (!list_empty(&serv->sv_tempsocks)) {
1076			if (net_ratelimit()) {
1077				/* Try to help the admin */
1078				printk(KERN_NOTICE "%s: too many open TCP "
1079					"sockets, consider increasing the "
1080					"number of nfsd threads\n",
1081						   serv->sv_name);
1082				printk(KERN_NOTICE
1083				       "%s: last TCP connect from %s\n",
1084				       serv->sv_name, buf);
1085			}
1086			/*
1087			 * Always select the oldest socket. It's not fair,
1088			 * but so is life
1089			 */
1090			svsk = list_entry(serv->sv_tempsocks.prev,
1091					  struct svc_sock,
1092					  sk_list);
1093			set_bit(SK_CLOSE, &svsk->sk_flags);
1094			atomic_inc(&svsk->sk_inuse);
1095		}
1096		spin_unlock_bh(&serv->sv_lock);
1097
1098		if (svsk) {
1099			svc_sock_enqueue(svsk);
1100			svc_sock_put(svsk);
1101		}
1102
1103	}
1104
1105	if (serv->sv_stats)
1106		serv->sv_stats->nettcpconn++;
1107
1108	return;
1109
1110failed:
1111	sock_release(newsock);
1112	return;
1113}
1114
1115/*
1116 * Receive data from a TCP socket.
1117 */
1118static int
1119svc_tcp_recvfrom(struct svc_rqst *rqstp)
1120{
1121	struct svc_sock	*svsk = rqstp->rq_sock;
1122	struct svc_serv	*serv = svsk->sk_server;
1123	int		len;
1124	struct kvec *vec;
1125	int pnum, vlen;
1126
1127	dprintk("svc: tcp_recv %p data %d conn %d close %d\n",
1128		svsk, test_bit(SK_DATA, &svsk->sk_flags),
1129		test_bit(SK_CONN, &svsk->sk_flags),
1130		test_bit(SK_CLOSE, &svsk->sk_flags));
1131
1132	if ((rqstp->rq_deferred = svc_deferred_dequeue(svsk))) {
1133		svc_sock_received(svsk);
1134		return svc_deferred_recv(rqstp);
1135	}
1136
1137	if (test_bit(SK_CLOSE, &svsk->sk_flags)) {
1138		svc_delete_socket(svsk);
1139		return 0;
1140	}
1141
1142	if (svsk->sk_sk->sk_state == TCP_LISTEN) {
1143		svc_tcp_accept(svsk);
1144		svc_sock_received(svsk);
1145		return 0;
1146	}
1147
1148	if (test_and_clear_bit(SK_CHNGBUF, &svsk->sk_flags))
1149		/* sndbuf needs to have room for one request
1150		 * per thread, otherwise we can stall even when the
1151		 * network isn't a bottleneck.
1152		 *
1153		 * We count all threads rather than threads in a
1154		 * particular pool, which provides an upper bound
1155		 * on the number of threads which will access the socket.
1156		 *
1157		 * rcvbuf just needs to be able to hold a few requests.
1158		 * Normally they will be removed from the queue
1159		 * as soon a a complete request arrives.
1160		 */
1161		svc_sock_setbufsize(svsk->sk_sock,
1162				    (serv->sv_nrthreads+3) * serv->sv_max_mesg,
1163				    3 * serv->sv_max_mesg);
1164
1165	clear_bit(SK_DATA, &svsk->sk_flags);
1166
1167	/* Receive data. If we haven't got the record length yet, get
1168	 * the next four bytes. Otherwise try to gobble up as much as
1169	 * possible up to the complete record length.
1170	 */
1171	if (svsk->sk_tcplen < 4) {
1172		unsigned long	want = 4 - svsk->sk_tcplen;
1173		struct kvec	iov;
1174
1175		iov.iov_base = ((char *) &svsk->sk_reclen) + svsk->sk_tcplen;
1176		iov.iov_len  = want;
1177		if ((len = svc_recvfrom(rqstp, &iov, 1, want)) < 0)
1178			goto error;
1179		svsk->sk_tcplen += len;
1180
1181		if (len < want) {
1182			dprintk("svc: short recvfrom while reading record length (%d of %lu)\n",
1183				len, want);
1184			svc_sock_received(svsk);
1185			return -EAGAIN; /* record header not complete */
1186		}
1187
1188		svsk->sk_reclen = ntohl(svsk->sk_reclen);
1189		if (!(svsk->sk_reclen & 0x80000000)) {
1190			if (net_ratelimit())
1191				printk(KERN_NOTICE "RPC: bad TCP reclen 0x%08lx"
1192				       " (non-terminal)\n",
1193				       (unsigned long) svsk->sk_reclen);
1194			goto err_delete;
1195		}
1196		svsk->sk_reclen &= 0x7fffffff;
1197		dprintk("svc: TCP record, %d bytes\n", svsk->sk_reclen);
1198		if (svsk->sk_reclen > serv->sv_max_mesg) {
1199			if (net_ratelimit())
1200				printk(KERN_NOTICE "RPC: bad TCP reclen 0x%08lx"
1201				       " (large)\n",
1202				       (unsigned long) svsk->sk_reclen);
1203			goto err_delete;
1204		}
1205	}
1206
1207	/* Check whether enough data is available */
1208	len = svc_recv_available(svsk);
1209	if (len < 0)
1210		goto error;
1211
1212	if (len < svsk->sk_reclen) {
1213		dprintk("svc: incomplete TCP record (%d of %d)\n",
1214			len, svsk->sk_reclen);
1215		svc_sock_received(svsk);
1216		return -EAGAIN;	/* record not complete */
1217	}
1218	len = svsk->sk_reclen;
1219	set_bit(SK_DATA, &svsk->sk_flags);
1220
1221	vec = rqstp->rq_vec;
1222	vec[0] = rqstp->rq_arg.head[0];
1223	vlen = PAGE_SIZE;
1224	pnum = 1;
1225	while (vlen < len) {
1226		vec[pnum].iov_base = page_address(rqstp->rq_pages[pnum]);
1227		vec[pnum].iov_len = PAGE_SIZE;
1228		pnum++;
1229		vlen += PAGE_SIZE;
1230	}
1231	rqstp->rq_respages = &rqstp->rq_pages[pnum];
1232
1233	/* Now receive data */
1234	len = svc_recvfrom(rqstp, vec, pnum, len);
1235	if (len < 0)
1236		goto error;
1237
1238	dprintk("svc: TCP complete record (%d bytes)\n", len);
1239	rqstp->rq_arg.len = len;
1240	rqstp->rq_arg.page_base = 0;
1241	if (len <= rqstp->rq_arg.head[0].iov_len) {
1242		rqstp->rq_arg.head[0].iov_len = len;
1243		rqstp->rq_arg.page_len = 0;
1244	} else {
1245		rqstp->rq_arg.page_len = len - rqstp->rq_arg.head[0].iov_len;
1246	}
1247
1248	rqstp->rq_skbuff      = NULL;
1249	rqstp->rq_prot	      = IPPROTO_TCP;
1250
1251	/* Reset TCP read info */
1252	svsk->sk_reclen = 0;
1253	svsk->sk_tcplen = 0;
1254
1255	svc_sock_received(svsk);
1256	if (serv->sv_stats)
1257		serv->sv_stats->nettcpcnt++;
1258
1259	return len;
1260
1261 err_delete:
1262	svc_delete_socket(svsk);
1263	return -EAGAIN;
1264
1265 error:
1266	if (len == -EAGAIN) {
1267		dprintk("RPC: TCP recvfrom got EAGAIN\n");
1268		svc_sock_received(svsk);
1269	} else {
1270		printk(KERN_NOTICE "%s: recvfrom returned errno %d\n",
1271					svsk->sk_server->sv_name, -len);
1272		goto err_delete;
1273	}
1274
1275	return len;
1276}
1277
1278/*
1279 * Send out data on TCP socket.
1280 */
1281static int
1282svc_tcp_sendto(struct svc_rqst *rqstp)
1283{
1284	struct xdr_buf	*xbufp = &rqstp->rq_res;
1285	int sent;
1286	__be32 reclen;
1287
1288	/* Set up the first element of the reply kvec.
1289	 * Any other kvecs that may be in use have been taken
1290	 * care of by the server implementation itself.
1291	 */
1292	reclen = htonl(0x80000000|((xbufp->len ) - 4));
1293	memcpy(xbufp->head[0].iov_base, &reclen, 4);
1294
1295	if (test_bit(SK_DEAD, &rqstp->rq_sock->sk_flags))
1296		return -ENOTCONN;
1297
1298	sent = svc_sendto(rqstp, &rqstp->rq_res);
1299	if (sent != xbufp->len) {
1300		printk(KERN_NOTICE "rpc-srv/tcp: %s: %s %d when sending %d bytes - shutting down socket\n",
1301		       rqstp->rq_sock->sk_server->sv_name,
1302		       (sent<0)?"got error":"sent only",
1303		       sent, xbufp->len);
1304		set_bit(SK_CLOSE, &rqstp->rq_sock->sk_flags);
1305		svc_sock_enqueue(rqstp->rq_sock);
1306		sent = -EAGAIN;
1307	}
1308	return sent;
1309}
1310
1311static void
1312svc_tcp_init(struct svc_sock *svsk)
1313{
1314	struct sock	*sk = svsk->sk_sk;
1315	struct tcp_sock *tp = tcp_sk(sk);
1316
1317	svsk->sk_recvfrom = svc_tcp_recvfrom;
1318	svsk->sk_sendto = svc_tcp_sendto;
1319
1320	if (sk->sk_state == TCP_LISTEN) {
1321		dprintk("setting up TCP socket for listening\n");
1322		sk->sk_data_ready = svc_tcp_listen_data_ready;
1323		set_bit(SK_CONN, &svsk->sk_flags);
1324	} else {
1325		dprintk("setting up TCP socket for reading\n");
1326		sk->sk_state_change = svc_tcp_state_change;
1327		sk->sk_data_ready = svc_tcp_data_ready;
1328		sk->sk_write_space = svc_write_space;
1329
1330		svsk->sk_reclen = 0;
1331		svsk->sk_tcplen = 0;
1332
1333		tp->nonagle = 1;        /* disable Nagle's algorithm */
1334
1335		/* initialise setting must have enough space to
1336		 * receive and respond to one request.
1337		 * svc_tcp_recvfrom will re-adjust if necessary
1338		 */
1339		svc_sock_setbufsize(svsk->sk_sock,
1340				    3 * svsk->sk_server->sv_max_mesg,
1341				    3 * svsk->sk_server->sv_max_mesg);
1342
1343		set_bit(SK_CHNGBUF, &svsk->sk_flags);
1344		set_bit(SK_DATA, &svsk->sk_flags);
1345		if (sk->sk_state != TCP_ESTABLISHED)
1346			set_bit(SK_CLOSE, &svsk->sk_flags);
1347	}
1348}
1349
1350void
1351svc_sock_update_bufs(struct svc_serv *serv)
1352{
1353	/*
1354	 * The number of server threads has changed. Update
1355	 * rcvbuf and sndbuf accordingly on all sockets
1356	 */
1357	struct list_head *le;
1358
1359	spin_lock_bh(&serv->sv_lock);
1360	list_for_each(le, &serv->sv_permsocks) {
1361		struct svc_sock *svsk =
1362			list_entry(le, struct svc_sock, sk_list);
1363		set_bit(SK_CHNGBUF, &svsk->sk_flags);
1364	}
1365	list_for_each(le, &serv->sv_tempsocks) {
1366		struct svc_sock *svsk =
1367			list_entry(le, struct svc_sock, sk_list);
1368		set_bit(SK_CHNGBUF, &svsk->sk_flags);
1369	}
1370	spin_unlock_bh(&serv->sv_lock);
1371}
1372
1373/*
1374 * Receive the next request on any socket.  This code is carefully
1375 * organised not to touch any cachelines in the shared svc_serv
1376 * structure, only cachelines in the local svc_pool.
1377 */
1378int
1379svc_recv(struct svc_rqst *rqstp, long timeout)
1380{
1381	struct svc_sock		*svsk = NULL;
1382	struct svc_serv		*serv = rqstp->rq_server;
1383	struct svc_pool		*pool = rqstp->rq_pool;
1384	int			len, i;
1385	int 			pages;
1386	struct xdr_buf		*arg;
1387	DECLARE_WAITQUEUE(wait, current);
1388
1389	dprintk("svc: server %p waiting for data (to = %ld)\n",
1390		rqstp, timeout);
1391
1392	if (rqstp->rq_sock)
1393		printk(KERN_ERR
1394			"svc_recv: service %p, socket not NULL!\n",
1395			 rqstp);
1396	if (waitqueue_active(&rqstp->rq_wait))
1397		printk(KERN_ERR
1398			"svc_recv: service %p, wait queue active!\n",
1399			 rqstp);
1400
1401
1402	/* now allocate needed pages.  If we get a failure, sleep briefly */
1403	pages = (serv->sv_max_mesg + PAGE_SIZE) / PAGE_SIZE;
1404	for (i=0; i < pages ; i++)
1405		while (rqstp->rq_pages[i] == NULL) {
1406			struct page *p = alloc_page(GFP_KERNEL);
1407			if (!p)
1408				schedule_timeout_uninterruptible(msecs_to_jiffies(500));
1409			rqstp->rq_pages[i] = p;
1410		}
1411	rqstp->rq_pages[i++] = NULL; /* this might be seen in nfs_read_actor */
1412	BUG_ON(pages >= RPCSVC_MAXPAGES);
1413
1414	/* Make arg->head point to first page and arg->pages point to rest */
1415	arg = &rqstp->rq_arg;
1416	arg->head[0].iov_base = page_address(rqstp->rq_pages[0]);
1417	arg->head[0].iov_len = PAGE_SIZE;
1418	arg->pages = rqstp->rq_pages + 1;
1419	arg->page_base = 0;
1420	/* save at least one page for response */
1421	arg->page_len = (pages-2)*PAGE_SIZE;
1422	arg->len = (pages-1)*PAGE_SIZE;
1423	arg->tail[0].iov_len = 0;
1424
1425	try_to_freeze();
1426	cond_resched();
1427	if (signalled())
1428		return -EINTR;
1429
1430	spin_lock_bh(&pool->sp_lock);
1431	if ((svsk = svc_sock_dequeue(pool)) != NULL) {
1432		rqstp->rq_sock = svsk;
1433		atomic_inc(&svsk->sk_inuse);
1434		rqstp->rq_reserved = serv->sv_max_mesg;
1435		atomic_add(rqstp->rq_reserved, &svsk->sk_reserved);
1436	} else {
1437		/* No data pending. Go to sleep */
1438		svc_thread_enqueue(pool, rqstp);
1439
1440		/*
1441		 * We have to be able to interrupt this wait
1442		 * to bring down the daemons ...
1443		 */
1444		set_current_state(TASK_INTERRUPTIBLE);
1445		add_wait_queue(&rqstp->rq_wait, &wait);
1446		spin_unlock_bh(&pool->sp_lock);
1447
1448		schedule_timeout(timeout);
1449
1450		try_to_freeze();
1451
1452		spin_lock_bh(&pool->sp_lock);
1453		remove_wait_queue(&rqstp->rq_wait, &wait);
1454
1455		if (!(svsk = rqstp->rq_sock)) {
1456			svc_thread_dequeue(pool, rqstp);
1457			spin_unlock_bh(&pool->sp_lock);
1458			dprintk("svc: server %p, no data yet\n", rqstp);
1459			return signalled()? -EINTR : -EAGAIN;
1460		}
1461	}
1462	spin_unlock_bh(&pool->sp_lock);
1463
1464	dprintk("svc: server %p, pool %u, socket %p, inuse=%d\n",
1465		 rqstp, pool->sp_id, svsk, atomic_read(&svsk->sk_inuse));
1466	len = svsk->sk_recvfrom(rqstp);
1467	dprintk("svc: got len=%d\n", len);
1468
1469	/* No data, incomplete (TCP) read, or accept() */
1470	if (len == 0 || len == -EAGAIN) {
1471		rqstp->rq_res.len = 0;
1472		svc_sock_release(rqstp);
1473		return -EAGAIN;
1474	}
1475	svsk->sk_lastrecv = get_seconds();
1476	clear_bit(SK_OLD, &svsk->sk_flags);
1477
1478	rqstp->rq_secure = svc_port_is_privileged(svc_addr(rqstp));
1479	rqstp->rq_chandle.defer = svc_defer;
1480
1481	if (serv->sv_stats)
1482		serv->sv_stats->netcnt++;
1483	return len;
1484}
1485
1486/*
1487 * Drop request
1488 */
1489void
1490svc_drop(struct svc_rqst *rqstp)
1491{
1492	dprintk("svc: socket %p dropped request\n", rqstp->rq_sock);
1493	svc_sock_release(rqstp);
1494}
1495
1496/*
1497 * Return reply to client.
1498 */
1499int
1500svc_send(struct svc_rqst *rqstp)
1501{
1502	struct svc_sock	*svsk;
1503	int		len;
1504	struct xdr_buf	*xb;
1505
1506	if ((svsk = rqstp->rq_sock) == NULL) {
1507		printk(KERN_WARNING "NULL socket pointer in %s:%d\n",
1508				__FILE__, __LINE__);
1509		return -EFAULT;
1510	}
1511
1512	/* release the receive skb before sending the reply */
1513	svc_release_skb(rqstp);
1514
1515	/* calculate over-all length */
1516	xb = & rqstp->rq_res;
1517	xb->len = xb->head[0].iov_len +
1518		xb->page_len +
1519		xb->tail[0].iov_len;
1520
1521	/* Grab svsk->sk_mutex to serialize outgoing data. */
1522	mutex_lock(&svsk->sk_mutex);
1523	if (test_bit(SK_DEAD, &svsk->sk_flags))
1524		len = -ENOTCONN;
1525	else
1526		len = svsk->sk_sendto(rqstp);
1527	mutex_unlock(&svsk->sk_mutex);
1528	svc_sock_release(rqstp);
1529
1530	if (len == -ECONNREFUSED || len == -ENOTCONN || len == -EAGAIN)
1531		return 0;
1532	return len;
1533}
1534
1535/*
1536 * Timer function to close old temporary sockets, using
1537 * a mark-and-sweep algorithm.
1538 */
1539static void
1540svc_age_temp_sockets(unsigned long closure)
1541{
1542	struct svc_serv *serv = (struct svc_serv *)closure;
1543	struct svc_sock *svsk;
1544	struct list_head *le, *next;
1545	LIST_HEAD(to_be_aged);
1546
1547	dprintk("svc_age_temp_sockets\n");
1548
1549	if (!spin_trylock_bh(&serv->sv_lock)) {
1550		/* busy, try again 1 sec later */
1551		dprintk("svc_age_temp_sockets: busy\n");
1552		mod_timer(&serv->sv_temptimer, jiffies + HZ);
1553		return;
1554	}
1555
1556	list_for_each_safe(le, next, &serv->sv_tempsocks) {
1557		svsk = list_entry(le, struct svc_sock, sk_list);
1558
1559		if (!test_and_set_bit(SK_OLD, &svsk->sk_flags))
1560			continue;
1561		if (atomic_read(&svsk->sk_inuse) || test_bit(SK_BUSY, &svsk->sk_flags))
1562			continue;
1563		atomic_inc(&svsk->sk_inuse);
1564		list_move(le, &to_be_aged);
1565		set_bit(SK_CLOSE, &svsk->sk_flags);
1566		set_bit(SK_DETACHED, &svsk->sk_flags);
1567	}
1568	spin_unlock_bh(&serv->sv_lock);
1569
1570	while (!list_empty(&to_be_aged)) {
1571		le = to_be_aged.next;
1572		/* fiddling the sk_list node is safe 'cos we're SK_DETACHED */
1573		list_del_init(le);
1574		svsk = list_entry(le, struct svc_sock, sk_list);
1575
1576		dprintk("queuing svsk %p for closing, %lu seconds old\n",
1577			svsk, get_seconds() - svsk->sk_lastrecv);
1578
1579		/* a thread will dequeue and close it soon */
1580		svc_sock_enqueue(svsk);
1581		svc_sock_put(svsk);
1582	}
1583
1584	mod_timer(&serv->sv_temptimer, jiffies + svc_conn_age_period * HZ);
1585}
1586
1587static struct svc_sock *svc_setup_socket(struct svc_serv *serv,
1588						struct socket *sock,
1589						int *errp, int flags)
1590{
1591	struct svc_sock	*svsk;
1592	struct sock	*inet;
1593	int		pmap_register = !(flags & SVC_SOCK_ANONYMOUS);
1594	int		is_temporary = flags & SVC_SOCK_TEMPORARY;
1595
1596	dprintk("svc: svc_setup_socket %p\n", sock);
1597	if (!(svsk = kzalloc(sizeof(*svsk), GFP_KERNEL))) {
1598		*errp = -ENOMEM;
1599		return NULL;
1600	}
1601
1602	inet = sock->sk;
1603
1604	/* Register socket with portmapper */
1605	if (*errp >= 0 && pmap_register)
1606		*errp = svc_register(serv, inet->sk_protocol,
1607				     ntohs(inet_sk(inet)->sport));
1608
1609	if (*errp < 0) {
1610		kfree(svsk);
1611		return NULL;
1612	}
1613
1614	set_bit(SK_BUSY, &svsk->sk_flags);
1615	inet->sk_user_data = svsk;
1616	svsk->sk_sock = sock;
1617	svsk->sk_sk = inet;
1618	svsk->sk_ostate = inet->sk_state_change;
1619	svsk->sk_odata = inet->sk_data_ready;
1620	svsk->sk_owspace = inet->sk_write_space;
1621	svsk->sk_server = serv;
1622	atomic_set(&svsk->sk_inuse, 1);
1623	svsk->sk_lastrecv = get_seconds();
1624	spin_lock_init(&svsk->sk_lock);
1625	INIT_LIST_HEAD(&svsk->sk_deferred);
1626	INIT_LIST_HEAD(&svsk->sk_ready);
1627	mutex_init(&svsk->sk_mutex);
1628
1629	/* Initialize the socket */
1630	if (sock->type == SOCK_DGRAM)
1631		svc_udp_init(svsk);
1632	else
1633		svc_tcp_init(svsk);
1634
1635	spin_lock_bh(&serv->sv_lock);
1636	if (is_temporary) {
1637		set_bit(SK_TEMP, &svsk->sk_flags);
1638		list_add(&svsk->sk_list, &serv->sv_tempsocks);
1639		serv->sv_tmpcnt++;
1640		if (serv->sv_temptimer.function == NULL) {
1641			/* setup timer to age temp sockets */
1642			setup_timer(&serv->sv_temptimer, svc_age_temp_sockets,
1643					(unsigned long)serv);
1644			mod_timer(&serv->sv_temptimer,
1645					jiffies + svc_conn_age_period * HZ);
1646		}
1647	} else {
1648		clear_bit(SK_TEMP, &svsk->sk_flags);
1649		list_add(&svsk->sk_list, &serv->sv_permsocks);
1650	}
1651	spin_unlock_bh(&serv->sv_lock);
1652
1653	dprintk("svc: svc_setup_socket created %p (inet %p)\n",
1654				svsk, svsk->sk_sk);
1655
1656	return svsk;
1657}
1658
1659int svc_addsock(struct svc_serv *serv,
1660		int fd,
1661		char *name_return,
1662		int *proto)
1663{
1664	int err = 0;
1665	struct socket *so = sockfd_lookup(fd, &err);
1666	struct svc_sock *svsk = NULL;
1667
1668	if (!so)
1669		return err;
1670	if (so->sk->sk_family != AF_INET)
1671		err =  -EAFNOSUPPORT;
1672	else if (so->sk->sk_protocol != IPPROTO_TCP &&
1673	    so->sk->sk_protocol != IPPROTO_UDP)
1674		err =  -EPROTONOSUPPORT;
1675	else if (so->state > SS_UNCONNECTED)
1676		err = -EISCONN;
1677	else {
1678		svsk = svc_setup_socket(serv, so, &err, SVC_SOCK_DEFAULTS);
1679		if (svsk) {
1680			svc_sock_received(svsk);
1681			err = 0;
1682		}
1683	}
1684	if (err) {
1685		sockfd_put(so);
1686		return err;
1687	}
1688	if (proto) *proto = so->sk->sk_protocol;
1689	return one_sock_name(name_return, svsk);
1690}
1691EXPORT_SYMBOL_GPL(svc_addsock);
1692
1693/*
1694 * Create socket for RPC service.
1695 */
1696static int svc_create_socket(struct svc_serv *serv, int protocol,
1697				struct sockaddr *sin, int len, int flags)
1698{
1699	struct svc_sock	*svsk;
1700	struct socket	*sock;
1701	int		error;
1702	int		type;
1703	char		buf[RPC_MAX_ADDRBUFLEN];
1704
1705	dprintk("svc: svc_create_socket(%s, %d, %s)\n",
1706			serv->sv_program->pg_name, protocol,
1707			__svc_print_addr(sin, buf, sizeof(buf)));
1708
1709	if (protocol != IPPROTO_UDP && protocol != IPPROTO_TCP) {
1710		printk(KERN_WARNING "svc: only UDP and TCP "
1711				"sockets supported\n");
1712		return -EINVAL;
1713	}
1714	type = (protocol == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
1715
1716	error = sock_create_kern(sin->sa_family, type, protocol, &sock);
1717	if (error < 0)
1718		return error;
1719
1720	svc_reclassify_socket(sock);
1721
1722	if (type == SOCK_STREAM)
1723		sock->sk->sk_reuse = 1;		/* allow address reuse */
1724	error = kernel_bind(sock, sin, len);
1725	if (error < 0)
1726		goto bummer;
1727
1728	if (protocol == IPPROTO_TCP) {
1729		if ((error = kernel_listen(sock, 64)) < 0)
1730			goto bummer;
1731	}
1732
1733	if ((svsk = svc_setup_socket(serv, sock, &error, flags)) != NULL) {
1734		svc_sock_received(svsk);
1735		return ntohs(inet_sk(svsk->sk_sk)->sport);
1736	}
1737
1738bummer:
1739	dprintk("svc: svc_create_socket error = %d\n", -error);
1740	sock_release(sock);
1741	return error;
1742}
1743
1744/*
1745 * Remove a dead socket
1746 */
1747static void
1748svc_delete_socket(struct svc_sock *svsk)
1749{
1750	struct svc_serv	*serv;
1751	struct sock	*sk;
1752
1753	dprintk("svc: svc_delete_socket(%p)\n", svsk);
1754
1755	serv = svsk->sk_server;
1756	sk = svsk->sk_sk;
1757
1758	sk->sk_state_change = svsk->sk_ostate;
1759	sk->sk_data_ready = svsk->sk_odata;
1760	sk->sk_write_space = svsk->sk_owspace;
1761
1762	spin_lock_bh(&serv->sv_lock);
1763
1764	if (!test_and_set_bit(SK_DETACHED, &svsk->sk_flags))
1765		list_del_init(&svsk->sk_list);
1766	/*
1767	 * We used to delete the svc_sock from whichever list
1768	 * it's sk_ready node was on, but we don't actually
1769	 * need to.  This is because the only time we're called
1770	 * while still attached to a queue, the queue itself
1771	 * is about to be destroyed (in svc_destroy).
1772	 */
1773	if (!test_and_set_bit(SK_DEAD, &svsk->sk_flags)) {
1774		BUG_ON(atomic_read(&svsk->sk_inuse)<2);
1775		atomic_dec(&svsk->sk_inuse);
1776		if (test_bit(SK_TEMP, &svsk->sk_flags))
1777			serv->sv_tmpcnt--;
1778	}
1779
1780	spin_unlock_bh(&serv->sv_lock);
1781}
1782
1783static void svc_close_socket(struct svc_sock *svsk)
1784{
1785	set_bit(SK_CLOSE, &svsk->sk_flags);
1786	if (test_and_set_bit(SK_BUSY, &svsk->sk_flags))
1787		/* someone else will have to effect the close */
1788		return;
1789
1790	atomic_inc(&svsk->sk_inuse);
1791	svc_delete_socket(svsk);
1792	clear_bit(SK_BUSY, &svsk->sk_flags);
1793	svc_sock_put(svsk);
1794}
1795
1796void svc_force_close_socket(struct svc_sock *svsk)
1797{
1798	set_bit(SK_CLOSE, &svsk->sk_flags);
1799	if (test_bit(SK_BUSY, &svsk->sk_flags)) {
1800		/* Waiting to be processed, but no threads left,
1801		 * So just remove it from the waiting list
1802		 */
1803		list_del_init(&svsk->sk_ready);
1804		clear_bit(SK_BUSY, &svsk->sk_flags);
1805	}
1806	svc_close_socket(svsk);
1807}
1808
1809/**
1810 * svc_makesock - Make a socket for nfsd and lockd
1811 * @serv: RPC server structure
1812 * @protocol: transport protocol to use
1813 * @port: port to use
1814 * @flags: requested socket characteristics
1815 *
1816 */
1817int svc_makesock(struct svc_serv *serv, int protocol, unsigned short port,
1818			int flags)
1819{
1820	struct sockaddr_in sin = {
1821		.sin_family		= AF_INET,
1822		.sin_addr.s_addr	= INADDR_ANY,
1823		.sin_port		= htons(port),
1824	};
1825
1826	dprintk("svc: creating socket proto = %d\n", protocol);
1827	return svc_create_socket(serv, protocol, (struct sockaddr *) &sin,
1828							sizeof(sin), flags);
1829}
1830
1831/*
1832 * Handle defer and revisit of requests
1833 */
1834
1835static void svc_revisit(struct cache_deferred_req *dreq, int too_many)
1836{
1837	struct svc_deferred_req *dr = container_of(dreq, struct svc_deferred_req, handle);
1838	struct svc_sock *svsk;
1839
1840	if (too_many) {
1841		svc_sock_put(dr->svsk);
1842		kfree(dr);
1843		return;
1844	}
1845	dprintk("revisit queued\n");
1846	svsk = dr->svsk;
1847	dr->svsk = NULL;
1848	spin_lock(&svsk->sk_lock);
1849	list_add(&dr->handle.recent, &svsk->sk_deferred);
1850	spin_unlock(&svsk->sk_lock);
1851	set_bit(SK_DEFERRED, &svsk->sk_flags);
1852	svc_sock_enqueue(svsk);
1853	svc_sock_put(svsk);
1854}
1855
1856static struct cache_deferred_req *
1857svc_defer(struct cache_req *req)
1858{
1859	struct svc_rqst *rqstp = container_of(req, struct svc_rqst, rq_chandle);
1860	int size = sizeof(struct svc_deferred_req) + (rqstp->rq_arg.len);
1861	struct svc_deferred_req *dr;
1862
1863	if (rqstp->rq_arg.page_len)
1864		return NULL;
1865	if (rqstp->rq_deferred) {
1866		dr = rqstp->rq_deferred;
1867		rqstp->rq_deferred = NULL;
1868	} else {
1869		int skip  = rqstp->rq_arg.len - rqstp->rq_arg.head[0].iov_len;
1870		dr = kmalloc(size, GFP_KERNEL);
1871		if (dr == NULL)
1872			return NULL;
1873
1874		dr->handle.owner = rqstp->rq_server;
1875		dr->prot = rqstp->rq_prot;
1876		memcpy(&dr->addr, &rqstp->rq_addr, rqstp->rq_addrlen);
1877		dr->addrlen = rqstp->rq_addrlen;
1878		dr->daddr = rqstp->rq_daddr;
1879		dr->argslen = rqstp->rq_arg.len >> 2;
1880		memcpy(dr->args, rqstp->rq_arg.head[0].iov_base-skip, dr->argslen<<2);
1881	}
1882	atomic_inc(&rqstp->rq_sock->sk_inuse);
1883	dr->svsk = rqstp->rq_sock;
1884
1885	dr->handle.revisit = svc_revisit;
1886	return &dr->handle;
1887}
1888
1889/*
1890 * recv data from a deferred request into an active one
1891 */
1892static int svc_deferred_recv(struct svc_rqst *rqstp)
1893{
1894	struct svc_deferred_req *dr = rqstp->rq_deferred;
1895
1896	rqstp->rq_arg.head[0].iov_base = dr->args;
1897	rqstp->rq_arg.head[0].iov_len = dr->argslen<<2;
1898	rqstp->rq_arg.page_len = 0;
1899	rqstp->rq_arg.len = dr->argslen<<2;
1900	rqstp->rq_prot        = dr->prot;
1901	memcpy(&rqstp->rq_addr, &dr->addr, dr->addrlen);
1902	rqstp->rq_addrlen     = dr->addrlen;
1903	rqstp->rq_daddr       = dr->daddr;
1904	rqstp->rq_respages    = rqstp->rq_pages;
1905	return dr->argslen<<2;
1906}
1907
1908
1909static struct svc_deferred_req *svc_deferred_dequeue(struct svc_sock *svsk)
1910{
1911	struct svc_deferred_req *dr = NULL;
1912
1913	if (!test_bit(SK_DEFERRED, &svsk->sk_flags))
1914		return NULL;
1915	spin_lock(&svsk->sk_lock);
1916	clear_bit(SK_DEFERRED, &svsk->sk_flags);
1917	if (!list_empty(&svsk->sk_deferred)) {
1918		dr = list_entry(svsk->sk_deferred.next,
1919				struct svc_deferred_req,
1920				handle.recent);
1921		list_del_init(&dr->handle.recent);
1922		set_bit(SK_DEFERRED, &svsk->sk_flags);
1923	}
1924	spin_unlock(&svsk->sk_lock);
1925	return dr;
1926}
1927