1/*
2 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
3 * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 *    notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 * 3. The name of the author may not be used to endorse or promote products
15 *    derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28#include "evconfig-private.h"
29
30#include <sys/types.h>
31#include <limits.h>
32#include <string.h>
33#include <stdlib.h>
34
35#include "event2/event.h"
36#include "event2/event_struct.h"
37#include "event2/util.h"
38#include "event2/bufferevent.h"
39#include "event2/bufferevent_struct.h"
40#include "event2/buffer.h"
41
42#include "ratelim-internal.h"
43
44#include "bufferevent-internal.h"
45#include "mm-internal.h"
46#include "util-internal.h"
47#include "event-internal.h"
48
49int
50ev_token_bucket_init_(struct ev_token_bucket *bucket,
51    const struct ev_token_bucket_cfg *cfg,
52    ev_uint32_t current_tick,
53    int reinitialize)
54{
55	if (reinitialize) {
56		/* on reinitialization, we only clip downwards, since we've
57		   already used who-knows-how-much bandwidth this tick.  We
58		   leave "last_updated" as it is; the next update will add the
59		   appropriate amount of bandwidth to the bucket.
60		*/
61		if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
62			bucket->read_limit = cfg->read_maximum;
63		if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
64			bucket->write_limit = cfg->write_maximum;
65	} else {
66		bucket->read_limit = cfg->read_rate;
67		bucket->write_limit = cfg->write_rate;
68		bucket->last_updated = current_tick;
69	}
70	return 0;
71}
72
73int
74ev_token_bucket_update_(struct ev_token_bucket *bucket,
75    const struct ev_token_bucket_cfg *cfg,
76    ev_uint32_t current_tick)
77{
78	/* It's okay if the tick number overflows, since we'll just
79	 * wrap around when we do the unsigned substraction. */
80	unsigned n_ticks = current_tick - bucket->last_updated;
81
82	/* Make sure some ticks actually happened, and that time didn't
83	 * roll back. */
84	if (n_ticks == 0 || n_ticks > INT_MAX)
85		return 0;
86
87	/* Naively, we would say
88		bucket->limit += n_ticks * cfg->rate;
89
90		if (bucket->limit > cfg->maximum)
91			bucket->limit = cfg->maximum;
92
93	   But we're worried about overflow, so we do it like this:
94	*/
95
96	if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
97		bucket->read_limit = cfg->read_maximum;
98	else
99		bucket->read_limit += n_ticks * cfg->read_rate;
100
101
102	if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
103		bucket->write_limit = cfg->write_maximum;
104	else
105		bucket->write_limit += n_ticks * cfg->write_rate;
106
107
108	bucket->last_updated = current_tick;
109
110	return 1;
111}
112
113static inline void
114bufferevent_update_buckets(struct bufferevent_private *bev)
115{
116	/* Must hold lock on bev. */
117	struct timeval now;
118	unsigned tick;
119	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
120	tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg);
121	if (tick != bev->rate_limiting->limit.last_updated)
122		ev_token_bucket_update_(&bev->rate_limiting->limit,
123		    bev->rate_limiting->cfg, tick);
124}
125
126ev_uint32_t
127ev_token_bucket_get_tick_(const struct timeval *tv,
128    const struct ev_token_bucket_cfg *cfg)
129{
130	/* This computation uses two multiplies and a divide.  We could do
131	 * fewer if we knew that the tick length was an integer number of
132	 * seconds, or if we knew it divided evenly into a second.  We should
133	 * investigate that more.
134	 */
135
136	/* We cast to an ev_uint64_t first, since we don't want to overflow
137	 * before we do the final divide. */
138	ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
139	return (unsigned)(msec / cfg->msec_per_tick);
140}
141
142struct ev_token_bucket_cfg *
143ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
144    size_t write_rate, size_t write_burst,
145    const struct timeval *tick_len)
146{
147	struct ev_token_bucket_cfg *r;
148	struct timeval g;
149	if (! tick_len) {
150		g.tv_sec = 1;
151		g.tv_usec = 0;
152		tick_len = &g;
153	}
154	if (read_rate > read_burst || write_rate > write_burst ||
155	    read_rate < 1 || write_rate < 1)
156		return NULL;
157	if (read_rate > EV_RATE_LIMIT_MAX ||
158	    write_rate > EV_RATE_LIMIT_MAX ||
159	    read_burst > EV_RATE_LIMIT_MAX ||
160	    write_burst > EV_RATE_LIMIT_MAX)
161		return NULL;
162	r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
163	if (!r)
164		return NULL;
165	r->read_rate = read_rate;
166	r->write_rate = write_rate;
167	r->read_maximum = read_burst;
168	r->write_maximum = write_burst;
169	memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
170	r->msec_per_tick = (tick_len->tv_sec * 1000) +
171	    (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
172	return r;
173}
174
175void
176ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
177{
178	mm_free(cfg);
179}
180
181/* Default values for max_single_read & max_single_write variables. */
182#define MAX_SINGLE_READ_DEFAULT 16384
183#define MAX_SINGLE_WRITE_DEFAULT 16384
184
185#define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
186#define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
187
188static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g);
189static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g);
190static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g);
191static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g);
192
193/** Helper: figure out the maximum amount we should write if is_write, or
194    the maximum amount we should read if is_read.  Return that maximum, or
195    0 if our bucket is wholly exhausted.
196 */
197static inline ev_ssize_t
198bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write)
199{
200	/* needs lock on bev. */
201	ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;
202
203#define LIM(x)						\
204	(is_write ? (x).write_limit : (x).read_limit)
205
206#define GROUP_SUSPENDED(g)			\
207	(is_write ? (g)->write_suspended : (g)->read_suspended)
208
209	/* Sets max_so_far to MIN(x, max_so_far) */
210#define CLAMPTO(x)				\
211	do {					\
212		if (max_so_far > (x))		\
213			max_so_far = (x);	\
214	} while (0);
215
216	if (!bev->rate_limiting)
217		return max_so_far;
218
219	/* If rate-limiting is enabled at all, update the appropriate
220	   bucket, and take the smaller of our rate limit and the group
221	   rate limit.
222	 */
223
224	if (bev->rate_limiting->cfg) {
225		bufferevent_update_buckets(bev);
226		max_so_far = LIM(bev->rate_limiting->limit);
227	}
228	if (bev->rate_limiting->group) {
229		struct bufferevent_rate_limit_group *g =
230		    bev->rate_limiting->group;
231		ev_ssize_t share;
232		LOCK_GROUP(g);
233		if (GROUP_SUSPENDED(g)) {
234			/* We can get here if we failed to lock this
235			 * particular bufferevent while suspending the whole
236			 * group. */
237			if (is_write)
238				bufferevent_suspend_write_(&bev->bev,
239				    BEV_SUSPEND_BW_GROUP);
240			else
241				bufferevent_suspend_read_(&bev->bev,
242				    BEV_SUSPEND_BW_GROUP);
243			share = 0;
244		} else {
245			/* XXXX probably we should divide among the active
246			 * members, not the total members. */
247			share = LIM(g->rate_limit) / g->n_members;
248			if (share < g->min_share)
249				share = g->min_share;
250		}
251		UNLOCK_GROUP(g);
252		CLAMPTO(share);
253	}
254
255	if (max_so_far < 0)
256		max_so_far = 0;
257	return max_so_far;
258}
259
260ev_ssize_t
261bufferevent_get_read_max_(struct bufferevent_private *bev)
262{
263	return bufferevent_get_rlim_max_(bev, 0);
264}
265
266ev_ssize_t
267bufferevent_get_write_max_(struct bufferevent_private *bev)
268{
269	return bufferevent_get_rlim_max_(bev, 1);
270}
271
272int
273bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
274{
275	/* XXXXX Make sure all users of this function check its return value */
276	int r = 0;
277	/* need to hold lock on bev */
278	if (!bev->rate_limiting)
279		return 0;
280
281	if (bev->rate_limiting->cfg) {
282		bev->rate_limiting->limit.read_limit -= bytes;
283		if (bev->rate_limiting->limit.read_limit <= 0) {
284			bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW);
285			if (event_add(&bev->rate_limiting->refill_bucket_event,
286				&bev->rate_limiting->cfg->tick_timeout) < 0)
287				r = -1;
288		} else if (bev->read_suspended & BEV_SUSPEND_BW) {
289			if (!(bev->write_suspended & BEV_SUSPEND_BW))
290				event_del(&bev->rate_limiting->refill_bucket_event);
291			bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
292		}
293	}
294
295	if (bev->rate_limiting->group) {
296		LOCK_GROUP(bev->rate_limiting->group);
297		bev->rate_limiting->group->rate_limit.read_limit -= bytes;
298		bev->rate_limiting->group->total_read += bytes;
299		if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
300			bev_group_suspend_reading_(bev->rate_limiting->group);
301		} else if (bev->rate_limiting->group->read_suspended) {
302			bev_group_unsuspend_reading_(bev->rate_limiting->group);
303		}
304		UNLOCK_GROUP(bev->rate_limiting->group);
305	}
306
307	return r;
308}
309
310int
311bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
312{
313	/* XXXXX Make sure all users of this function check its return value */
314	int r = 0;
315	/* need to hold lock */
316	if (!bev->rate_limiting)
317		return 0;
318
319	if (bev->rate_limiting->cfg) {
320		bev->rate_limiting->limit.write_limit -= bytes;
321		if (bev->rate_limiting->limit.write_limit <= 0) {
322			bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW);
323			if (event_add(&bev->rate_limiting->refill_bucket_event,
324				&bev->rate_limiting->cfg->tick_timeout) < 0)
325				r = -1;
326		} else if (bev->write_suspended & BEV_SUSPEND_BW) {
327			if (!(bev->read_suspended & BEV_SUSPEND_BW))
328				event_del(&bev->rate_limiting->refill_bucket_event);
329			bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
330		}
331	}
332
333	if (bev->rate_limiting->group) {
334		LOCK_GROUP(bev->rate_limiting->group);
335		bev->rate_limiting->group->rate_limit.write_limit -= bytes;
336		bev->rate_limiting->group->total_written += bytes;
337		if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
338			bev_group_suspend_writing_(bev->rate_limiting->group);
339		} else if (bev->rate_limiting->group->write_suspended) {
340			bev_group_unsuspend_writing_(bev->rate_limiting->group);
341		}
342		UNLOCK_GROUP(bev->rate_limiting->group);
343	}
344
345	return r;
346}
347
348/** Stop reading on every bufferevent in <b>g</b> */
349static int
350bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g)
351{
352	/* Needs group lock */
353	struct bufferevent_private *bev;
354	g->read_suspended = 1;
355	g->pending_unsuspend_read = 0;
356
357	/* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK,
358	   to prevent a deadlock.  (Ordinarily, the group lock nests inside
359	   the bufferevent locks.  If we are unable to lock any individual
360	   bufferevent, it will find out later when it looks at its limit
361	   and sees that its group is suspended.)
362	*/
363	LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
364		if (EVLOCK_TRY_LOCK_(bev->lock)) {
365			bufferevent_suspend_read_(&bev->bev,
366			    BEV_SUSPEND_BW_GROUP);
367			EVLOCK_UNLOCK(bev->lock, 0);
368		}
369	}
370	return 0;
371}
372
373/** Stop writing on every bufferevent in <b>g</b> */
374static int
375bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g)
376{
377	/* Needs group lock */
378	struct bufferevent_private *bev;
379	g->write_suspended = 1;
380	g->pending_unsuspend_write = 0;
381	LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
382		if (EVLOCK_TRY_LOCK_(bev->lock)) {
383			bufferevent_suspend_write_(&bev->bev,
384			    BEV_SUSPEND_BW_GROUP);
385			EVLOCK_UNLOCK(bev->lock, 0);
386		}
387	}
388	return 0;
389}
390
391/** Timer callback invoked on a single bufferevent with one or more exhausted
392    buckets when they are ready to refill. */
393static void
394bev_refill_callback_(evutil_socket_t fd, short what, void *arg)
395{
396	unsigned tick;
397	struct timeval now;
398	struct bufferevent_private *bev = arg;
399	int again = 0;
400	BEV_LOCK(&bev->bev);
401	if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
402		BEV_UNLOCK(&bev->bev);
403		return;
404	}
405
406	/* First, update the bucket */
407	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
408	tick = ev_token_bucket_get_tick_(&now,
409	    bev->rate_limiting->cfg);
410	ev_token_bucket_update_(&bev->rate_limiting->limit,
411	    bev->rate_limiting->cfg,
412	    tick);
413
414	/* Now unsuspend any read/write operations as appropriate. */
415	if ((bev->read_suspended & BEV_SUSPEND_BW)) {
416		if (bev->rate_limiting->limit.read_limit > 0)
417			bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
418		else
419			again = 1;
420	}
421	if ((bev->write_suspended & BEV_SUSPEND_BW)) {
422		if (bev->rate_limiting->limit.write_limit > 0)
423			bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
424		else
425			again = 1;
426	}
427	if (again) {
428		/* One or more of the buckets may need another refill if they
429		   started negative.
430
431		   XXXX if we need to be quiet for more ticks, we should
432		   maybe figure out what timeout we really want.
433		*/
434		/* XXXX Handle event_add failure somehow */
435		event_add(&bev->rate_limiting->refill_bucket_event,
436		    &bev->rate_limiting->cfg->tick_timeout);
437	}
438	BEV_UNLOCK(&bev->bev);
439}
440
441/** Helper: grab a random element from a bufferevent group.
442 *
443 * Requires that we hold the lock on the group.
444 */
445static struct bufferevent_private *
446bev_group_random_element_(struct bufferevent_rate_limit_group *group)
447{
448	int which;
449	struct bufferevent_private *bev;
450
451	/* requires group lock */
452
453	if (!group->n_members)
454		return NULL;
455
456	EVUTIL_ASSERT(! LIST_EMPTY(&group->members));
457
458	which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members);
459
460	bev = LIST_FIRST(&group->members);
461	while (which--)
462		bev = LIST_NEXT(bev, rate_limiting->next_in_group);
463
464	return bev;
465}
466
467/** Iterate over the elements of a rate-limiting group 'g' with a random
468    starting point, assigning each to the variable 'bev', and executing the
469    block 'block'.
470
471    We do this in a half-baked effort to get fairness among group members.
472    XXX Round-robin or some kind of priority queue would be even more fair.
473 */
474#define FOREACH_RANDOM_ORDER(block)			\
475	do {						\
476		first = bev_group_random_element_(g);	\
477		for (bev = first; bev != LIST_END(&g->members); \
478		    bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
479			block ;					 \
480		}						 \
481		for (bev = LIST_FIRST(&g->members); bev && bev != first; \
482		    bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
483			block ;						\
484		}							\
485	} while (0)
486
487static void
488bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g)
489{
490	int again = 0;
491	struct bufferevent_private *bev, *first;
492
493	g->read_suspended = 0;
494	FOREACH_RANDOM_ORDER({
495		if (EVLOCK_TRY_LOCK_(bev->lock)) {
496			bufferevent_unsuspend_read_(&bev->bev,
497			    BEV_SUSPEND_BW_GROUP);
498			EVLOCK_UNLOCK(bev->lock, 0);
499		} else {
500			again = 1;
501		}
502	});
503	g->pending_unsuspend_read = again;
504}
505
506static void
507bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g)
508{
509	int again = 0;
510	struct bufferevent_private *bev, *first;
511	g->write_suspended = 0;
512
513	FOREACH_RANDOM_ORDER({
514		if (EVLOCK_TRY_LOCK_(bev->lock)) {
515			bufferevent_unsuspend_write_(&bev->bev,
516			    BEV_SUSPEND_BW_GROUP);
517			EVLOCK_UNLOCK(bev->lock, 0);
518		} else {
519			again = 1;
520		}
521	});
522	g->pending_unsuspend_write = again;
523}
524
525/** Callback invoked every tick to add more elements to the group bucket
526    and unsuspend group members as needed.
527 */
528static void
529bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg)
530{
531	struct bufferevent_rate_limit_group *g = arg;
532	unsigned tick;
533	struct timeval now;
534
535	event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
536
537	LOCK_GROUP(g);
538
539	tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg);
540	ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick);
541
542	if (g->pending_unsuspend_read ||
543	    (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
544		bev_group_unsuspend_reading_(g);
545	}
546	if (g->pending_unsuspend_write ||
547	    (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
548		bev_group_unsuspend_writing_(g);
549	}
550
551	/* XXXX Rather than waiting to the next tick to unsuspend stuff
552	 * with pending_unsuspend_write/read, we should do it on the
553	 * next iteration of the mainloop.
554	 */
555
556	UNLOCK_GROUP(g);
557}
558
559int
560bufferevent_set_rate_limit(struct bufferevent *bev,
561    struct ev_token_bucket_cfg *cfg)
562{
563	struct bufferevent_private *bevp = BEV_UPCAST(bev);
564	int r = -1;
565	struct bufferevent_rate_limit *rlim;
566	struct timeval now;
567	ev_uint32_t tick;
568	int reinit = 0, suspended = 0;
569	/* XXX reference-count cfg */
570
571	BEV_LOCK(bev);
572
573	if (cfg == NULL) {
574		if (bevp->rate_limiting) {
575			rlim = bevp->rate_limiting;
576			rlim->cfg = NULL;
577			bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
578			bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
579			if (event_initialized(&rlim->refill_bucket_event))
580				event_del(&rlim->refill_bucket_event);
581		}
582		r = 0;
583		goto done;
584	}
585
586	event_base_gettimeofday_cached(bev->ev_base, &now);
587	tick = ev_token_bucket_get_tick_(&now, cfg);
588
589	if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
590		/* no-op */
591		r = 0;
592		goto done;
593	}
594	if (bevp->rate_limiting == NULL) {
595		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
596		if (!rlim)
597			goto done;
598		bevp->rate_limiting = rlim;
599	} else {
600		rlim = bevp->rate_limiting;
601	}
602	reinit = rlim->cfg != NULL;
603
604	rlim->cfg = cfg;
605	ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit);
606
607	if (reinit) {
608		EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
609		event_del(&rlim->refill_bucket_event);
610	}
611	event_assign(&rlim->refill_bucket_event, bev->ev_base,
612	    -1, EV_FINALIZE, bev_refill_callback_, bevp);
613
614	if (rlim->limit.read_limit > 0) {
615		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
616	} else {
617		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
618		suspended=1;
619	}
620	if (rlim->limit.write_limit > 0) {
621		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
622	} else {
623		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
624		suspended = 1;
625	}
626
627	if (suspended)
628		event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
629
630	r = 0;
631
632done:
633	BEV_UNLOCK(bev);
634	return r;
635}
636
637struct bufferevent_rate_limit_group *
638bufferevent_rate_limit_group_new(struct event_base *base,
639    const struct ev_token_bucket_cfg *cfg)
640{
641	struct bufferevent_rate_limit_group *g;
642	struct timeval now;
643	ev_uint32_t tick;
644
645	event_base_gettimeofday_cached(base, &now);
646	tick = ev_token_bucket_get_tick_(&now, cfg);
647
648	g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
649	if (!g)
650		return NULL;
651	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
652	LIST_INIT(&g->members);
653
654	ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
655
656	event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
657	    bev_group_refill_callback_, g);
658	/*XXXX handle event_add failure */
659	event_add(&g->master_refill_event, &cfg->tick_timeout);
660
661	EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
662
663	bufferevent_rate_limit_group_set_min_share(g, 64);
664
665	evutil_weakrand_seed_(&g->weakrand_seed,
666	    (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g));
667
668	return g;
669}
670
671int
672bufferevent_rate_limit_group_set_cfg(
673	struct bufferevent_rate_limit_group *g,
674	const struct ev_token_bucket_cfg *cfg)
675{
676	int same_tick;
677	if (!g || !cfg)
678		return -1;
679
680	LOCK_GROUP(g);
681	same_tick = evutil_timercmp(
682		&g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
683	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
684
685	if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
686		g->rate_limit.read_limit = cfg->read_maximum;
687	if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
688		g->rate_limit.write_limit = cfg->write_maximum;
689
690	if (!same_tick) {
691		/* This can cause a hiccup in the schedule */
692		event_add(&g->master_refill_event, &cfg->tick_timeout);
693	}
694
695	/* The new limits might force us to adjust min_share differently. */
696	bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
697
698	UNLOCK_GROUP(g);
699	return 0;
700}
701
702int
703bufferevent_rate_limit_group_set_min_share(
704	struct bufferevent_rate_limit_group *g,
705	size_t share)
706{
707	if (share > EV_SSIZE_MAX)
708		return -1;
709
710	g->configured_min_share = share;
711
712	/* Can't set share to less than the one-tick maximum.  IOW, at steady
713	 * state, at least one connection can go per tick. */
714	if (share > g->rate_limit_cfg.read_rate)
715		share = g->rate_limit_cfg.read_rate;
716	if (share > g->rate_limit_cfg.write_rate)
717		share = g->rate_limit_cfg.write_rate;
718
719	g->min_share = share;
720	return 0;
721}
722
723void
724bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
725{
726	LOCK_GROUP(g);
727	EVUTIL_ASSERT(0 == g->n_members);
728	event_del(&g->master_refill_event);
729	UNLOCK_GROUP(g);
730	EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
731	mm_free(g);
732}
733
734int
735bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
736    struct bufferevent_rate_limit_group *g)
737{
738	int wsuspend, rsuspend;
739	struct bufferevent_private *bevp = BEV_UPCAST(bev);
740	BEV_LOCK(bev);
741
742	if (!bevp->rate_limiting) {
743		struct bufferevent_rate_limit *rlim;
744		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
745		if (!rlim) {
746			BEV_UNLOCK(bev);
747			return -1;
748		}
749		event_assign(&rlim->refill_bucket_event, bev->ev_base,
750		    -1, EV_FINALIZE, bev_refill_callback_, bevp);
751		bevp->rate_limiting = rlim;
752	}
753
754	if (bevp->rate_limiting->group == g) {
755		BEV_UNLOCK(bev);
756		return 0;
757	}
758	if (bevp->rate_limiting->group)
759		bufferevent_remove_from_rate_limit_group(bev);
760
761	LOCK_GROUP(g);
762	bevp->rate_limiting->group = g;
763	++g->n_members;
764	LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group);
765
766	rsuspend = g->read_suspended;
767	wsuspend = g->write_suspended;
768
769	UNLOCK_GROUP(g);
770
771	if (rsuspend)
772		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP);
773	if (wsuspend)
774		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP);
775
776	BEV_UNLOCK(bev);
777	return 0;
778}
779
780int
781bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
782{
783	return bufferevent_remove_from_rate_limit_group_internal_(bev, 1);
784}
785
786int
787bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev,
788    int unsuspend)
789{
790	struct bufferevent_private *bevp = BEV_UPCAST(bev);
791	BEV_LOCK(bev);
792	if (bevp->rate_limiting && bevp->rate_limiting->group) {
793		struct bufferevent_rate_limit_group *g =
794		    bevp->rate_limiting->group;
795		LOCK_GROUP(g);
796		bevp->rate_limiting->group = NULL;
797		--g->n_members;
798		LIST_REMOVE(bevp, rate_limiting->next_in_group);
799		UNLOCK_GROUP(g);
800	}
801	if (unsuspend) {
802		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP);
803		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP);
804	}
805	BEV_UNLOCK(bev);
806	return 0;
807}
808
809/* ===
810 * API functions to expose rate limits.
811 *
812 * Don't use these from inside Libevent; they're meant to be for use by
813 * the program.
814 * === */
815
816/* Mostly you don't want to use this function from inside libevent;
817 * bufferevent_get_read_max_() is more likely what you want*/
818ev_ssize_t
819bufferevent_get_read_limit(struct bufferevent *bev)
820{
821	ev_ssize_t r;
822	struct bufferevent_private *bevp;
823	BEV_LOCK(bev);
824	bevp = BEV_UPCAST(bev);
825	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
826		bufferevent_update_buckets(bevp);
827		r = bevp->rate_limiting->limit.read_limit;
828	} else {
829		r = EV_SSIZE_MAX;
830	}
831	BEV_UNLOCK(bev);
832	return r;
833}
834
835/* Mostly you don't want to use this function from inside libevent;
836 * bufferevent_get_write_max_() is more likely what you want*/
837ev_ssize_t
838bufferevent_get_write_limit(struct bufferevent *bev)
839{
840	ev_ssize_t r;
841	struct bufferevent_private *bevp;
842	BEV_LOCK(bev);
843	bevp = BEV_UPCAST(bev);
844	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
845		bufferevent_update_buckets(bevp);
846		r = bevp->rate_limiting->limit.write_limit;
847	} else {
848		r = EV_SSIZE_MAX;
849	}
850	BEV_UNLOCK(bev);
851	return r;
852}
853
854int
855bufferevent_set_max_single_read(struct bufferevent *bev, size_t size)
856{
857	struct bufferevent_private *bevp;
858	BEV_LOCK(bev);
859	bevp = BEV_UPCAST(bev);
860	if (size == 0 || size > EV_SSIZE_MAX)
861		bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;
862	else
863		bevp->max_single_read = size;
864	BEV_UNLOCK(bev);
865	return 0;
866}
867
868int
869bufferevent_set_max_single_write(struct bufferevent *bev, size_t size)
870{
871	struct bufferevent_private *bevp;
872	BEV_LOCK(bev);
873	bevp = BEV_UPCAST(bev);
874	if (size == 0 || size > EV_SSIZE_MAX)
875		bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
876	else
877		bevp->max_single_write = size;
878	BEV_UNLOCK(bev);
879	return 0;
880}
881
882ev_ssize_t
883bufferevent_get_max_single_read(struct bufferevent *bev)
884{
885	ev_ssize_t r;
886
887	BEV_LOCK(bev);
888	r = BEV_UPCAST(bev)->max_single_read;
889	BEV_UNLOCK(bev);
890	return r;
891}
892
893ev_ssize_t
894bufferevent_get_max_single_write(struct bufferevent *bev)
895{
896	ev_ssize_t r;
897
898	BEV_LOCK(bev);
899	r = BEV_UPCAST(bev)->max_single_write;
900	BEV_UNLOCK(bev);
901	return r;
902}
903
904ev_ssize_t
905bufferevent_get_max_to_read(struct bufferevent *bev)
906{
907	ev_ssize_t r;
908	BEV_LOCK(bev);
909	r = bufferevent_get_read_max_(BEV_UPCAST(bev));
910	BEV_UNLOCK(bev);
911	return r;
912}
913
914ev_ssize_t
915bufferevent_get_max_to_write(struct bufferevent *bev)
916{
917	ev_ssize_t r;
918	BEV_LOCK(bev);
919	r = bufferevent_get_write_max_(BEV_UPCAST(bev));
920	BEV_UNLOCK(bev);
921	return r;
922}
923
924const struct ev_token_bucket_cfg *
925bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) {
926	struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
927	struct ev_token_bucket_cfg *cfg;
928
929	BEV_LOCK(bev);
930
931	if (bufev_private->rate_limiting) {
932		cfg = bufev_private->rate_limiting->cfg;
933	} else {
934		cfg = NULL;
935	}
936
937	BEV_UNLOCK(bev);
938
939	return cfg;
940}
941
942/* Mostly you don't want to use this function from inside libevent;
943 * bufferevent_get_read_max_() is more likely what you want*/
944ev_ssize_t
945bufferevent_rate_limit_group_get_read_limit(
946	struct bufferevent_rate_limit_group *grp)
947{
948	ev_ssize_t r;
949	LOCK_GROUP(grp);
950	r = grp->rate_limit.read_limit;
951	UNLOCK_GROUP(grp);
952	return r;
953}
954
955/* Mostly you don't want to use this function from inside libevent;
956 * bufferevent_get_write_max_() is more likely what you want. */
957ev_ssize_t
958bufferevent_rate_limit_group_get_write_limit(
959	struct bufferevent_rate_limit_group *grp)
960{
961	ev_ssize_t r;
962	LOCK_GROUP(grp);
963	r = grp->rate_limit.write_limit;
964	UNLOCK_GROUP(grp);
965	return r;
966}
967
968int
969bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
970{
971	int r = 0;
972	ev_ssize_t old_limit, new_limit;
973	struct bufferevent_private *bevp;
974	BEV_LOCK(bev);
975	bevp = BEV_UPCAST(bev);
976	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
977	old_limit = bevp->rate_limiting->limit.read_limit;
978
979	new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
980	if (old_limit > 0 && new_limit <= 0) {
981		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
982		if (event_add(&bevp->rate_limiting->refill_bucket_event,
983			&bevp->rate_limiting->cfg->tick_timeout) < 0)
984			r = -1;
985	} else if (old_limit <= 0 && new_limit > 0) {
986		if (!(bevp->write_suspended & BEV_SUSPEND_BW))
987			event_del(&bevp->rate_limiting->refill_bucket_event);
988		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
989	}
990
991	BEV_UNLOCK(bev);
992	return r;
993}
994
995int
996bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
997{
998	/* XXXX this is mostly copy-and-paste from
999	 * bufferevent_decrement_read_limit */
1000	int r = 0;
1001	ev_ssize_t old_limit, new_limit;
1002	struct bufferevent_private *bevp;
1003	BEV_LOCK(bev);
1004	bevp = BEV_UPCAST(bev);
1005	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
1006	old_limit = bevp->rate_limiting->limit.write_limit;
1007
1008	new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
1009	if (old_limit > 0 && new_limit <= 0) {
1010		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
1011		if (event_add(&bevp->rate_limiting->refill_bucket_event,
1012			&bevp->rate_limiting->cfg->tick_timeout) < 0)
1013			r = -1;
1014	} else if (old_limit <= 0 && new_limit > 0) {
1015		if (!(bevp->read_suspended & BEV_SUSPEND_BW))
1016			event_del(&bevp->rate_limiting->refill_bucket_event);
1017		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
1018	}
1019
1020	BEV_UNLOCK(bev);
1021	return r;
1022}
1023
1024int
1025bufferevent_rate_limit_group_decrement_read(
1026	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1027{
1028	int r = 0;
1029	ev_ssize_t old_limit, new_limit;
1030	LOCK_GROUP(grp);
1031	old_limit = grp->rate_limit.read_limit;
1032	new_limit = (grp->rate_limit.read_limit -= decr);
1033
1034	if (old_limit > 0 && new_limit <= 0) {
1035		bev_group_suspend_reading_(grp);
1036	} else if (old_limit <= 0 && new_limit > 0) {
1037		bev_group_unsuspend_reading_(grp);
1038	}
1039
1040	UNLOCK_GROUP(grp);
1041	return r;
1042}
1043
1044int
1045bufferevent_rate_limit_group_decrement_write(
1046	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1047{
1048	int r = 0;
1049	ev_ssize_t old_limit, new_limit;
1050	LOCK_GROUP(grp);
1051	old_limit = grp->rate_limit.write_limit;
1052	new_limit = (grp->rate_limit.write_limit -= decr);
1053
1054	if (old_limit > 0 && new_limit <= 0) {
1055		bev_group_suspend_writing_(grp);
1056	} else if (old_limit <= 0 && new_limit > 0) {
1057		bev_group_unsuspend_writing_(grp);
1058	}
1059
1060	UNLOCK_GROUP(grp);
1061	return r;
1062}
1063
1064void
1065bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
1066    ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
1067{
1068	EVUTIL_ASSERT(grp != NULL);
1069	if (total_read_out)
1070		*total_read_out = grp->total_read;
1071	if (total_written_out)
1072		*total_written_out = grp->total_written;
1073}
1074
1075void
1076bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
1077{
1078	grp->total_read = grp->total_written = 0;
1079}
1080
1081int
1082bufferevent_ratelim_init_(struct bufferevent_private *bev)
1083{
1084	bev->rate_limiting = NULL;
1085	bev->max_single_read = MAX_SINGLE_READ_DEFAULT;
1086	bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
1087
1088	return 0;
1089}
1090