1/*-
2 * Copyright (c) 2008 Isilon Inc http://www.isilon.com/
3 * Authors: Doug Rabson <dfr@rabson.org>
4 * Developed with Red Inc: Alfred Perlstein <alfred@freebsd.org>
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 *    notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 *
15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25 * SUCH DAMAGE.
26 */
27
28#include <sys/cdefs.h>
29__FBSDID("$FreeBSD$");
30
31#include <sys/param.h>
32#include <sys/hash.h>
33#include <sys/kernel.h>
34#include <sys/lock.h>
35#include <sys/mbuf.h>
36#include <sys/mutex.h>
37#include <sys/queue.h>
38
39#include <rpc/rpc.h>
40#include <rpc/replay.h>
41
42struct replay_cache_entry {
43	int		rce_hash;
44	struct rpc_msg	rce_msg;
45	struct sockaddr_storage rce_addr;
46	struct rpc_msg	rce_repmsg;
47	struct mbuf	*rce_repbody;
48
49	TAILQ_ENTRY(replay_cache_entry) rce_link;
50	TAILQ_ENTRY(replay_cache_entry) rce_alllink;
51};
52TAILQ_HEAD(replay_cache_list, replay_cache_entry);
53
54static struct replay_cache_entry *
55		replay_alloc(struct replay_cache *rc, struct rpc_msg *msg,
56		    struct sockaddr *addr, int h);
57static void	replay_free(struct replay_cache *rc,
58    struct replay_cache_entry *rce);
59static void	replay_prune(struct replay_cache *rc);
60
61#define REPLAY_HASH_SIZE	256
62#define REPLAY_MAX		1024
63
64struct replay_cache {
65	struct replay_cache_list	rc_cache[REPLAY_HASH_SIZE];
66	struct replay_cache_list	rc_all;
67	struct mtx			rc_lock;
68	int				rc_count;
69	size_t				rc_size;
70	size_t				rc_maxsize;
71};
72
73struct replay_cache *
74replay_newcache(size_t maxsize)
75{
76	struct replay_cache *rc;
77	int i;
78
79	rc = malloc(sizeof(*rc), M_RPC, M_WAITOK|M_ZERO);
80	for (i = 0; i < REPLAY_HASH_SIZE; i++)
81		TAILQ_INIT(&rc->rc_cache[i]);
82	TAILQ_INIT(&rc->rc_all);
83	mtx_init(&rc->rc_lock, "rc_lock", NULL, MTX_DEF);
84	rc->rc_maxsize = maxsize;
85
86	return (rc);
87}
88
89void
90replay_setsize(struct replay_cache *rc, size_t newmaxsize)
91{
92
93	mtx_lock(&rc->rc_lock);
94	rc->rc_maxsize = newmaxsize;
95	replay_prune(rc);
96	mtx_unlock(&rc->rc_lock);
97}
98
99void
100replay_freecache(struct replay_cache *rc)
101{
102
103	mtx_lock(&rc->rc_lock);
104	while (TAILQ_FIRST(&rc->rc_all))
105		replay_free(rc, TAILQ_FIRST(&rc->rc_all));
106	mtx_destroy(&rc->rc_lock);
107	free(rc, M_RPC);
108}
109
110static struct replay_cache_entry *
111replay_alloc(struct replay_cache *rc,
112    struct rpc_msg *msg, struct sockaddr *addr, int h)
113{
114	struct replay_cache_entry *rce;
115
116	mtx_assert(&rc->rc_lock, MA_OWNED);
117
118	rc->rc_count++;
119	rce = malloc(sizeof(*rce), M_RPC, M_NOWAIT|M_ZERO);
120	if (!rce)
121		return (NULL);
122	rce->rce_hash = h;
123	rce->rce_msg = *msg;
124	bcopy(addr, &rce->rce_addr, addr->sa_len);
125
126	TAILQ_INSERT_HEAD(&rc->rc_cache[h], rce, rce_link);
127	TAILQ_INSERT_HEAD(&rc->rc_all, rce, rce_alllink);
128
129	return (rce);
130}
131
132static void
133replay_free(struct replay_cache *rc, struct replay_cache_entry *rce)
134{
135
136	mtx_assert(&rc->rc_lock, MA_OWNED);
137
138	rc->rc_count--;
139	TAILQ_REMOVE(&rc->rc_cache[rce->rce_hash], rce, rce_link);
140	TAILQ_REMOVE(&rc->rc_all, rce, rce_alllink);
141	if (rce->rce_repbody) {
142		rc->rc_size -= m_length(rce->rce_repbody, NULL);
143		m_freem(rce->rce_repbody);
144	}
145	free(rce, M_RPC);
146}
147
148static void
149replay_prune(struct replay_cache *rc)
150{
151	struct replay_cache_entry *rce;
152
153	mtx_assert(&rc->rc_lock, MA_OWNED);
154
155	if (rc->rc_count < REPLAY_MAX && rc->rc_size <= rc->rc_maxsize)
156		return;
157
158	do {
159		/*
160		 * Try to free an entry. Don't free in-progress entries.
161		 */
162		TAILQ_FOREACH_REVERSE(rce, &rc->rc_all, replay_cache_list,
163		    rce_alllink) {
164			if (rce->rce_repmsg.rm_xid)
165				break;
166		}
167		if (rce)
168			replay_free(rc, rce);
169	} while (rce && (rc->rc_count >= REPLAY_MAX
170	    || rc->rc_size > rc->rc_maxsize));
171}
172
173enum replay_state
174replay_find(struct replay_cache *rc, struct rpc_msg *msg,
175    struct sockaddr *addr, struct rpc_msg *repmsg, struct mbuf **mp)
176{
177	int h = HASHSTEP(HASHINIT, msg->rm_xid) % REPLAY_HASH_SIZE;
178	struct replay_cache_entry *rce;
179
180	mtx_lock(&rc->rc_lock);
181	TAILQ_FOREACH(rce, &rc->rc_cache[h], rce_link) {
182		if (rce->rce_msg.rm_xid == msg->rm_xid
183		    && rce->rce_msg.rm_call.cb_prog == msg->rm_call.cb_prog
184		    && rce->rce_msg.rm_call.cb_vers == msg->rm_call.cb_vers
185		    && rce->rce_msg.rm_call.cb_proc == msg->rm_call.cb_proc
186		    && rce->rce_addr.ss_len == addr->sa_len
187		    && bcmp(&rce->rce_addr, addr, addr->sa_len) == 0) {
188			if (rce->rce_repmsg.rm_xid) {
189				/*
190				 * We have a reply for this
191				 * message. Copy it and return. Keep
192				 * replay_all LRU sorted
193				 */
194				TAILQ_REMOVE(&rc->rc_all, rce, rce_alllink);
195				TAILQ_INSERT_HEAD(&rc->rc_all, rce,
196				    rce_alllink);
197				*repmsg = rce->rce_repmsg;
198				if (rce->rce_repbody) {
199					*mp = m_copym(rce->rce_repbody,
200					    0, M_COPYALL, M_NOWAIT);
201					mtx_unlock(&rc->rc_lock);
202					if (!*mp)
203						return (RS_ERROR);
204				} else {
205					mtx_unlock(&rc->rc_lock);
206				}
207				return (RS_DONE);
208			} else {
209				mtx_unlock(&rc->rc_lock);
210				return (RS_INPROGRESS);
211			}
212		}
213	}
214
215	replay_prune(rc);
216
217	rce = replay_alloc(rc, msg, addr, h);
218
219	mtx_unlock(&rc->rc_lock);
220
221	if (!rce)
222		return (RS_ERROR);
223	else
224		return (RS_NEW);
225}
226
227void
228replay_setreply(struct replay_cache *rc,
229    struct rpc_msg *repmsg, struct sockaddr *addr, struct mbuf *m)
230{
231	int h = HASHSTEP(HASHINIT, repmsg->rm_xid) % REPLAY_HASH_SIZE;
232	struct replay_cache_entry *rce;
233
234	/*
235	 * Copy the reply before the lock so we can sleep.
236	 */
237	if (m)
238		m = m_copym(m, 0, M_COPYALL, M_WAITOK);
239
240	mtx_lock(&rc->rc_lock);
241	TAILQ_FOREACH(rce, &rc->rc_cache[h], rce_link) {
242		if (rce->rce_msg.rm_xid == repmsg->rm_xid
243		    && rce->rce_addr.ss_len == addr->sa_len
244		    && bcmp(&rce->rce_addr, addr, addr->sa_len) == 0) {
245			break;
246		}
247	}
248	if (rce) {
249		rce->rce_repmsg = *repmsg;
250		rce->rce_repbody = m;
251		if (m)
252			rc->rc_size += m_length(m, NULL);
253	}
254	mtx_unlock(&rc->rc_lock);
255}
256