1/*-
2 * Copyright (c) 2014 Chelsio Communications, Inc.
3 * All rights reserved.
4 * Written by: Navdeep Parhar <np@FreeBSD.org>
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 *    notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 *
15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25 * SUCH DAMAGE.
26 */
27
28#include <sys/cdefs.h>
29__FBSDID("$FreeBSD$");
30
31#include <sys/types.h>
32#include <sys/param.h>
33#include <sys/systm.h>
34#include <sys/counter.h>
35#include <sys/lock.h>
36#include <sys/mutex.h>
37#include <sys/malloc.h>
38#include <machine/cpu.h>
39#include <net/mp_ring.h>
40
41union ring_state {
42	struct {
43		uint16_t pidx_head;
44		uint16_t pidx_tail;
45		uint16_t cidx;
46		uint16_t flags;
47	};
48	uint64_t state;
49};
50
51enum {
52	IDLE = 0,	/* consumer ran to completion, nothing more to do. */
53	BUSY,		/* consumer is running already, or will be shortly. */
54	STALLED,	/* consumer stopped due to lack of resources. */
55	ABDICATED,	/* consumer stopped even though there was work to be
56			   done because it wants another thread to take over. */
57};
58
59static inline uint16_t
60space_available(struct ifmp_ring *r, union ring_state s)
61{
62	uint16_t x = r->size - 1;
63
64	if (s.cidx == s.pidx_head)
65		return (x);
66	else if (s.cidx > s.pidx_head)
67		return (s.cidx - s.pidx_head - 1);
68	else
69		return (x - s.pidx_head + s.cidx);
70}
71
72static inline uint16_t
73increment_idx(struct ifmp_ring *r, uint16_t idx, uint16_t n)
74{
75	int x = r->size - idx;
76
77	MPASS(x > 0);
78	return (x > n ? idx + n : n - x);
79}
80
81/* Consumer is about to update the ring's state to s */
82static inline uint16_t
83state_to_flags(union ring_state s, int abdicate)
84{
85
86	if (s.cidx == s.pidx_tail)
87		return (IDLE);
88	else if (abdicate && s.pidx_tail != s.pidx_head)
89		return (ABDICATED);
90
91	return (BUSY);
92}
93
94#ifdef MP_RING_NO_64BIT_ATOMICS
95static void
96drain_ring_locked(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
97{
98	union ring_state ns;
99	int n, pending, total;
100	uint16_t cidx = os.cidx;
101	uint16_t pidx = os.pidx_tail;
102
103	MPASS(os.flags == BUSY);
104	MPASS(cidx != pidx);
105
106	if (prev == IDLE)
107		counter_u64_add(r->starts, 1);
108	pending = 0;
109	total = 0;
110
111	while (cidx != pidx) {
112		/* Items from cidx to pidx are available for consumption. */
113		n = r->drain(r, cidx, pidx);
114		if (n == 0) {
115			os.state = ns.state = r->state;
116			ns.cidx = cidx;
117			ns.flags = STALLED;
118			r->state = ns.state;
119			if (prev != STALLED)
120				counter_u64_add(r->stalls, 1);
121			else if (total > 0) {
122				counter_u64_add(r->restarts, 1);
123				counter_u64_add(r->stalls, 1);
124			}
125			break;
126		}
127		cidx = increment_idx(r, cidx, n);
128		pending += n;
129		total += n;
130
131		/*
132		 * We update the cidx only if we've caught up with the pidx, the
133		 * real cidx is getting too far ahead of the one visible to
134		 * everyone else, or we have exceeded our budget.
135		 */
136		if (cidx != pidx && pending < 64 && total < budget)
137			continue;
138
139		os.state = ns.state = r->state;
140		ns.cidx = cidx;
141		ns.flags = state_to_flags(ns, total >= budget);
142		r->state = ns.state;
143
144		if (ns.flags == ABDICATED)
145			counter_u64_add(r->abdications, 1);
146		if (ns.flags != BUSY) {
147			/* Wrong loop exit if we're going to stall. */
148			MPASS(ns.flags != STALLED);
149			if (prev == STALLED) {
150				MPASS(total > 0);
151				counter_u64_add(r->restarts, 1);
152			}
153			break;
154		}
155
156		/*
157		 * The acquire style atomic above guarantees visibility of items
158		 * associated with any pidx change that we notice here.
159		 */
160		pidx = ns.pidx_tail;
161		pending = 0;
162	}
163}
164#else
165/*
166 * Caller passes in a state, with a guarantee that there is work to do and that
167 * all items up to the pidx_tail in the state are visible.
168 */
169static void
170drain_ring_lockless(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
171{
172	union ring_state ns;
173	int n, pending, total;
174	uint16_t cidx = os.cidx;
175	uint16_t pidx = os.pidx_tail;
176
177	MPASS(os.flags == BUSY);
178	MPASS(cidx != pidx);
179
180	if (prev == IDLE)
181		counter_u64_add(r->starts, 1);
182	pending = 0;
183	total = 0;
184
185	while (cidx != pidx) {
186		/* Items from cidx to pidx are available for consumption. */
187		n = r->drain(r, cidx, pidx);
188		if (n == 0) {
189			critical_enter();
190			os.state = r->state;
191			do {
192				ns.state = os.state;
193				ns.cidx = cidx;
194				ns.flags = STALLED;
195			} while (atomic_fcmpset_64(&r->state, &os.state,
196			    ns.state) == 0);
197			critical_exit();
198			if (prev != STALLED)
199				counter_u64_add(r->stalls, 1);
200			else if (total > 0) {
201				counter_u64_add(r->restarts, 1);
202				counter_u64_add(r->stalls, 1);
203			}
204			break;
205		}
206		cidx = increment_idx(r, cidx, n);
207		pending += n;
208		total += n;
209
210		/*
211		 * We update the cidx only if we've caught up with the pidx, the
212		 * real cidx is getting too far ahead of the one visible to
213		 * everyone else, or we have exceeded our budget.
214		 */
215		if (cidx != pidx && pending < 64 && total < budget)
216			continue;
217		critical_enter();
218		os.state = r->state;
219		do {
220			ns.state = os.state;
221			ns.cidx = cidx;
222			ns.flags = state_to_flags(ns, total >= budget);
223		} while (atomic_fcmpset_acq_64(&r->state, &os.state,
224		    ns.state) == 0);
225		critical_exit();
226
227		if (ns.flags == ABDICATED)
228			counter_u64_add(r->abdications, 1);
229		if (ns.flags != BUSY) {
230			/* Wrong loop exit if we're going to stall. */
231			MPASS(ns.flags != STALLED);
232			if (prev == STALLED) {
233				MPASS(total > 0);
234				counter_u64_add(r->restarts, 1);
235			}
236			break;
237		}
238
239		/*
240		 * The acquire style atomic above guarantees visibility of items
241		 * associated with any pidx change that we notice here.
242		 */
243		pidx = ns.pidx_tail;
244		pending = 0;
245	}
246}
247#endif
248
249int
250ifmp_ring_alloc(struct ifmp_ring **pr, int size, void *cookie, mp_ring_drain_t drain,
251    mp_ring_can_drain_t can_drain, struct malloc_type *mt, int flags)
252{
253	struct ifmp_ring *r;
254
255	/* All idx are 16b so size can be 65536 at most */
256	if (pr == NULL || size < 2 || size > 65536 || drain == NULL ||
257	    can_drain == NULL)
258		return (EINVAL);
259	*pr = NULL;
260	flags &= M_NOWAIT | M_WAITOK;
261	MPASS(flags != 0);
262
263	r = malloc(__offsetof(struct ifmp_ring, items[size]), mt, flags | M_ZERO);
264	if (r == NULL)
265		return (ENOMEM);
266	r->size = size;
267	r->cookie = cookie;
268	r->mt = mt;
269	r->drain = drain;
270	r->can_drain = can_drain;
271	r->enqueues = counter_u64_alloc(flags);
272	r->drops = counter_u64_alloc(flags);
273	r->starts = counter_u64_alloc(flags);
274	r->stalls = counter_u64_alloc(flags);
275	r->restarts = counter_u64_alloc(flags);
276	r->abdications = counter_u64_alloc(flags);
277	if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL ||
278	    r->stalls == NULL || r->restarts == NULL ||
279	    r->abdications == NULL) {
280		ifmp_ring_free(r);
281		return (ENOMEM);
282	}
283
284	*pr = r;
285#ifdef MP_RING_NO_64BIT_ATOMICS
286	mtx_init(&r->lock, "mp_ring lock", NULL, MTX_DEF);
287#endif
288	return (0);
289}
290
291void
292ifmp_ring_free(struct ifmp_ring *r)
293{
294
295	if (r == NULL)
296		return;
297
298	if (r->enqueues != NULL)
299		counter_u64_free(r->enqueues);
300	if (r->drops != NULL)
301		counter_u64_free(r->drops);
302	if (r->starts != NULL)
303		counter_u64_free(r->starts);
304	if (r->stalls != NULL)
305		counter_u64_free(r->stalls);
306	if (r->restarts != NULL)
307		counter_u64_free(r->restarts);
308	if (r->abdications != NULL)
309		counter_u64_free(r->abdications);
310
311	free(r, r->mt);
312}
313
314/*
315 * Enqueue n items and maybe drain the ring for some time.
316 *
317 * Returns an errno.
318 */
319#ifdef MP_RING_NO_64BIT_ATOMICS
320int
321ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
322{
323	union ring_state os, ns;
324	uint16_t pidx_start, pidx_stop;
325	int i;
326
327	MPASS(items != NULL);
328	MPASS(n > 0);
329
330	mtx_lock(&r->lock);
331	/*
332	 * Reserve room for the new items.  Our reservation, if successful, is
333	 * from 'pidx_start' to 'pidx_stop'.
334	 */
335	os.state = r->state;
336	if (n >= space_available(r, os)) {
337		counter_u64_add(r->drops, n);
338		MPASS(os.flags != IDLE);
339		mtx_unlock(&r->lock);
340		if (os.flags == STALLED)
341			ifmp_ring_check_drainage(r, 0);
342		return (ENOBUFS);
343	}
344	ns.state = os.state;
345	ns.pidx_head = increment_idx(r, os.pidx_head, n);
346	r->state = ns.state;
347	pidx_start = os.pidx_head;
348	pidx_stop = ns.pidx_head;
349
350	/*
351	 * Wait for other producers who got in ahead of us to enqueue their
352	 * items, one producer at a time.  It is our turn when the ring's
353	 * pidx_tail reaches the beginning of our reservation (pidx_start).
354	 */
355	while (ns.pidx_tail != pidx_start) {
356		cpu_spinwait();
357		ns.state = r->state;
358	}
359
360	/* Now it is our turn to fill up the area we reserved earlier. */
361	i = pidx_start;
362	do {
363		r->items[i] = *items++;
364		if (__predict_false(++i == r->size))
365			i = 0;
366	} while (i != pidx_stop);
367
368	/*
369	 * Update the ring's pidx_tail.  The release style atomic guarantees
370	 * that the items are visible to any thread that sees the updated pidx.
371	 */
372	os.state = ns.state = r->state;
373	ns.pidx_tail = pidx_stop;
374	if (abdicate) {
375		if (os.flags == IDLE)
376			ns.flags = ABDICATED;
377	} else
378		ns.flags = BUSY;
379	r->state = ns.state;
380	counter_u64_add(r->enqueues, n);
381
382	if (!abdicate) {
383		/*
384		 * Turn into a consumer if some other thread isn't active as a consumer
385		 * already.
386		 */
387		if (os.flags != BUSY)
388			drain_ring_locked(r, ns, os.flags, budget);
389	}
390
391	mtx_unlock(&r->lock);
392	return (0);
393}
394#else
395int
396ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
397{
398	union ring_state os, ns;
399	uint16_t pidx_start, pidx_stop;
400	int i;
401
402	MPASS(items != NULL);
403	MPASS(n > 0);
404
405	/*
406	 * Reserve room for the new items.  Our reservation, if successful, is
407	 * from 'pidx_start' to 'pidx_stop'.
408	 */
409	os.state = r->state;
410	for (;;) {
411		if (n >= space_available(r, os)) {
412			counter_u64_add(r->drops, n);
413			MPASS(os.flags != IDLE);
414			if (os.flags == STALLED)
415				ifmp_ring_check_drainage(r, 0);
416			return (ENOBUFS);
417		}
418		ns.state = os.state;
419		ns.pidx_head = increment_idx(r, os.pidx_head, n);
420		critical_enter();
421		if (atomic_fcmpset_64(&r->state, &os.state, ns.state))
422			break;
423		critical_exit();
424		cpu_spinwait();
425	}
426	pidx_start = os.pidx_head;
427	pidx_stop = ns.pidx_head;
428
429	/*
430	 * Wait for other producers who got in ahead of us to enqueue their
431	 * items, one producer at a time.  It is our turn when the ring's
432	 * pidx_tail reaches the beginning of our reservation (pidx_start).
433	 */
434	while (ns.pidx_tail != pidx_start) {
435		cpu_spinwait();
436		ns.state = r->state;
437	}
438
439	/* Now it is our turn to fill up the area we reserved earlier. */
440	i = pidx_start;
441	do {
442		r->items[i] = *items++;
443		if (__predict_false(++i == r->size))
444			i = 0;
445	} while (i != pidx_stop);
446
447	/*
448	 * Update the ring's pidx_tail.  The release style atomic guarantees
449	 * that the items are visible to any thread that sees the updated pidx.
450	 */
451	os.state = r->state;
452	do {
453		ns.state = os.state;
454		ns.pidx_tail = pidx_stop;
455		if (abdicate) {
456			if (os.flags == IDLE)
457				ns.flags = ABDICATED;
458		} else
459			ns.flags = BUSY;
460	} while (atomic_fcmpset_rel_64(&r->state, &os.state, ns.state) == 0);
461	critical_exit();
462	counter_u64_add(r->enqueues, n);
463
464	if (!abdicate) {
465		/*
466		 * Turn into a consumer if some other thread isn't active as a consumer
467		 * already.
468		 */
469		if (os.flags != BUSY)
470			drain_ring_lockless(r, ns, os.flags, budget);
471	}
472
473	return (0);
474}
475#endif
476
477void
478ifmp_ring_check_drainage(struct ifmp_ring *r, int budget)
479{
480	union ring_state os, ns;
481
482	os.state = r->state;
483	if ((os.flags != STALLED && os.flags != ABDICATED) ||	// Only continue in STALLED and ABDICATED
484	    os.pidx_head != os.pidx_tail ||			// Require work to be available
485	    (os.flags != ABDICATED && r->can_drain(r) == 0))	// Can either drain, or everyone left
486		return;
487
488	MPASS(os.cidx != os.pidx_tail);	/* implied by STALLED */
489	ns.state = os.state;
490	ns.flags = BUSY;
491
492#ifdef MP_RING_NO_64BIT_ATOMICS
493	mtx_lock(&r->lock);
494	if (r->state != os.state) {
495		mtx_unlock(&r->lock);
496		return;
497	}
498	r->state = ns.state;
499	drain_ring_locked(r, ns, os.flags, budget);
500	mtx_unlock(&r->lock);
501#else
502	/*
503	 * The acquire style atomic guarantees visibility of items associated
504	 * with the pidx that we read here.
505	 */
506	if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state))
507		return;
508
509	drain_ring_lockless(r, ns, os.flags, budget);
510#endif
511}
512
513void
514ifmp_ring_reset_stats(struct ifmp_ring *r)
515{
516
517	counter_u64_zero(r->enqueues);
518	counter_u64_zero(r->drops);
519	counter_u64_zero(r->starts);
520	counter_u64_zero(r->stalls);
521	counter_u64_zero(r->restarts);
522	counter_u64_zero(r->abdications);
523}
524
525int
526ifmp_ring_is_idle(struct ifmp_ring *r)
527{
528	union ring_state s;
529
530	s.state = r->state;
531	if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx &&
532	    s.flags == IDLE)
533		return (1);
534
535	return (0);
536}
537
538int
539ifmp_ring_is_stalled(struct ifmp_ring *r)
540{
541	union ring_state s;
542
543	s.state = r->state;
544	if (s.pidx_head == s.pidx_tail && s.flags == STALLED)
545		return (1);
546
547	return (0);
548}
549