197403Sobrien/*-
297403Sobrien * SPDX-License-Identifier: BSD-2-Clause
3169691Skan *
4169691Skan * Copyright (c) 2007-2009 Kip Macy <kmacy@freebsd.org>
597403Sobrien * All rights reserved.
697403Sobrien *
797403Sobrien * Redistribution and use in source and binary forms, with or without
897403Sobrien * modification, are permitted provided that the following conditions
997403Sobrien * are met:
1097403Sobrien * 1. Redistributions of source code must retain the above copyright
1197403Sobrien *    notice, this list of conditions and the following disclaimer.
1297403Sobrien * 2. Redistributions in binary form must reproduce the above copyright
1397403Sobrien *    notice, this list of conditions and the following disclaimer in the
1497403Sobrien *    documentation and/or other materials provided with the distribution.
1597403Sobrien *
1697403Sobrien * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
1797403Sobrien * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
1897403Sobrien * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
1997403Sobrien * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
2097403Sobrien * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
2197403Sobrien * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22169691Skan * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
2397403Sobrien * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
2497403Sobrien * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25132720Skan * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
2697403Sobrien * SUCH DAMAGE.
27117397Skan *
28132720Skan */
29117397Skan
30117397Skan#ifndef	_SYS_BUF_RING_H_
31117397Skan#define	_SYS_BUF_RING_H_
32117397Skan
33117397Skan#include <machine/cpu.h>
34117397Skan
35117397Skan#ifdef DEBUG_BUFRING
36117397Skan#include <sys/lock.h>
37117397Skan#include <sys/mutex.h>
38117397Skan#endif
39117397Skan
40117397Skanstruct buf_ring {
41117397Skan	volatile uint32_t	br_prod_head;
42117397Skan	volatile uint32_t	br_prod_tail;
43117397Skan	int              	br_prod_size;
44117397Skan	int              	br_prod_mask;
45117397Skan	uint64_t		br_drops;
46117397Skan	volatile uint32_t	br_cons_head __aligned(CACHE_LINE_SIZE);
47117397Skan	volatile uint32_t	br_cons_tail;
48117397Skan	int		 	br_cons_size;
49117397Skan	int              	br_cons_mask;
50117397Skan#ifdef DEBUG_BUFRING
51117397Skan	struct mtx		*br_lock;
52117397Skan#endif
53117397Skan	void			*br_ring[0] __aligned(CACHE_LINE_SIZE);
54117397Skan};
55117397Skan
56117397Skan/*
57117397Skan * multi-producer safe lock-free ring buffer enqueue
58117397Skan *
59117397Skan */
60132720Skanstatic __inline int
61117397Skanbuf_ring_enqueue(struct buf_ring *br, void *buf)
62117397Skan{
63117397Skan	uint32_t prod_head, prod_next, cons_tail;
64117397Skan#ifdef DEBUG_BUFRING
65117397Skan	int i;
66117397Skan
67117397Skan	/*
68117397Skan	 * Note: It is possible to encounter an mbuf that was removed
69117397Skan	 * via drbr_peek(), and then re-added via drbr_putback() and
70117397Skan	 * trigger a spurious panic.
71117397Skan	 */
72117397Skan	for (i = br->br_cons_head; i != br->br_prod_head;
73117397Skan	     i = ((i + 1) & br->br_cons_mask))
74117397Skan		if (br->br_ring[i] == buf)
75117397Skan			panic("buf=%p already enqueue at %d prod=%d cons=%d",
76117397Skan			    buf, i, br->br_prod_tail, br->br_cons_tail);
77117397Skan#endif
78117397Skan	critical_enter();
79117397Skan	do {
80117397Skan		prod_head = br->br_prod_head;
81117397Skan		prod_next = (prod_head + 1) & br->br_prod_mask;
82117397Skan		cons_tail = br->br_cons_tail;
83117397Skan
84117397Skan		if (prod_next == cons_tail) {
85117397Skan			rmb();
86117397Skan			if (prod_head == br->br_prod_head &&
87117397Skan			    cons_tail == br->br_cons_tail) {
88117397Skan				br->br_drops++;
89117397Skan				critical_exit();
90117397Skan				return (ENOBUFS);
91117397Skan			}
92132720Skan			continue;
9397403Sobrien		}
9497403Sobrien	} while (!atomic_cmpset_acq_int(&br->br_prod_head, prod_head, prod_next));
95132720Skan#ifdef DEBUG_BUFRING
9697403Sobrien	if (br->br_ring[prod_head] != NULL)
9797403Sobrien		panic("dangling value in enqueue");
9897403Sobrien#endif
9997403Sobrien	br->br_ring[prod_head] = buf;
10097403Sobrien
10197403Sobrien	/*
10297403Sobrien	 * If there are other enqueues in progress
10397403Sobrien	 * that preceded us, we need to wait for them
10497403Sobrien	 * to complete
105117397Skan	 */
10697403Sobrien	while (br->br_prod_tail != prod_head)
10797403Sobrien		cpu_spinwait();
10897403Sobrien	atomic_store_rel_int(&br->br_prod_tail, prod_next);
10997403Sobrien	critical_exit();
11097403Sobrien	return (0);
11197403Sobrien}
11297403Sobrien
113117397Skan/*
114117397Skan * multi-consumer safe dequeue
11597403Sobrien *
11697403Sobrien */
11797403Sobrienstatic __inline void *
11897403Sobrienbuf_ring_dequeue_mc(struct buf_ring *br)
11997403Sobrien{
120169691Skan	uint32_t cons_head, cons_next;
121132720Skan	void *buf;
12297403Sobrien
12397403Sobrien	critical_enter();
12497403Sobrien	do {
12597403Sobrien		cons_head = br->br_cons_head;
12697403Sobrien		cons_next = (cons_head + 1) & br->br_cons_mask;
12797403Sobrien
12897403Sobrien		if (cons_head == br->br_prod_tail) {
12997403Sobrien			critical_exit();
13097403Sobrien			return (NULL);
13197403Sobrien		}
13297403Sobrien	} while (!atomic_cmpset_acq_int(&br->br_cons_head, cons_head, cons_next));
13397403Sobrien
13497403Sobrien	buf = br->br_ring[cons_head];
13597403Sobrien#ifdef DEBUG_BUFRING
13697403Sobrien	br->br_ring[cons_head] = NULL;
13797403Sobrien#endif
13897403Sobrien	/*
13997403Sobrien	 * If there are other dequeues in progress
14097403Sobrien	 * that preceded us, we need to wait for them
14197403Sobrien	 * to complete
14297403Sobrien	 */
14397403Sobrien	while (br->br_cons_tail != cons_head)
14497403Sobrien		cpu_spinwait();
14597403Sobrien
14697403Sobrien	atomic_store_rel_int(&br->br_cons_tail, cons_next);
14797403Sobrien	critical_exit();
14897403Sobrien
14997403Sobrien	return (buf);
15097403Sobrien}
15197403Sobrien
15297403Sobrien/*
15397403Sobrien * single-consumer dequeue
15497403Sobrien * use where dequeue is protected by a lock
155132720Skan * e.g. a network driver's tx queue lock
156132720Skan */
157117397Skanstatic __inline void *
15897403Sobrienbuf_ring_dequeue_sc(struct buf_ring *br)
159132720Skan{
16097403Sobrien	uint32_t cons_head, cons_next;
16197403Sobrien#ifdef PREFETCH_DEFINED
16297403Sobrien	uint32_t cons_next_next;
16397403Sobrien#endif
16497403Sobrien	uint32_t prod_tail;
16597403Sobrien	void *buf;
16697403Sobrien
16797403Sobrien	/*
16897403Sobrien	 * This is a workaround to allow using buf_ring on ARM and ARM64.
16997403Sobrien	 * ARM64TODO: Fix buf_ring in a generic way.
17097403Sobrien	 * REMARKS: It is suspected that br_cons_head does not require
17197403Sobrien	 *   load_acq operation, but this change was extensively tested
17297403Sobrien	 *   and confirmed it's working. To be reviewed once again in
17397403Sobrien	 *   FreeBSD-12.
17497403Sobrien	 *
17597403Sobrien	 * Preventing following situation:
17697403Sobrien
17797403Sobrien	 * Core(0) - buf_ring_enqueue()                                       Core(1) - buf_ring_dequeue_sc()
17897403Sobrien	 * -----------------------------------------                                       ----------------------------------------------
17997403Sobrien	 *
18097403Sobrien	 *                                                                                cons_head = br->br_cons_head;
18197403Sobrien	 * atomic_cmpset_acq_32(&br->br_prod_head, ...));
18297403Sobrien	 *                                                                                buf = br->br_ring[cons_head];     <see <1>>
18397403Sobrien	 * br->br_ring[prod_head] = buf;
18497403Sobrien	 * atomic_store_rel_32(&br->br_prod_tail, ...);
18597403Sobrien	 *                                                                                prod_tail = br->br_prod_tail;
18697403Sobrien	 *                                                                                if (cons_head == prod_tail)
18797403Sobrien	 *                                                                                        return (NULL);
18897403Sobrien	 *                                                                                <condition is false and code uses invalid(old) buf>`
18997403Sobrien	 *
19097403Sobrien	 * <1> Load (on core 1) from br->br_ring[cons_head] can be reordered (speculative readed) by CPU.
19197403Sobrien	 */
19297403Sobrien#if defined(__arm__) || defined(__aarch64__)
19397403Sobrien	cons_head = atomic_load_acq_32(&br->br_cons_head);
19497403Sobrien#else
19597403Sobrien	cons_head = br->br_cons_head;
19697403Sobrien#endif
19797403Sobrien	prod_tail = atomic_load_acq_32(&br->br_prod_tail);
19897403Sobrien
199169691Skan	cons_next = (cons_head + 1) & br->br_cons_mask;
200169691Skan#ifdef PREFETCH_DEFINED
201169691Skan	cons_next_next = (cons_head + 2) & br->br_cons_mask;
202169691Skan#endif
203169691Skan
204169691Skan	if (cons_head == prod_tail)
205169691Skan		return (NULL);
206169691Skan
207169691Skan#ifdef PREFETCH_DEFINED
208169691Skan	if (cons_next != prod_tail) {
209169691Skan		prefetch(br->br_ring[cons_next]);
210169691Skan		if (cons_next_next != prod_tail)
211169691Skan			prefetch(br->br_ring[cons_next_next]);
212169691Skan	}
213169691Skan#endif
214169691Skan	br->br_cons_head = cons_next;
215169691Skan	buf = br->br_ring[cons_head];
216169691Skan
217169691Skan#ifdef DEBUG_BUFRING
218169691Skan	br->br_ring[cons_head] = NULL;
219169691Skan	if (!mtx_owned(br->br_lock))
220169691Skan		panic("lock not held on single consumer dequeue");
221169691Skan	if (br->br_cons_tail != cons_head)
222169691Skan		panic("inconsistent list cons_tail=%d cons_head=%d",
223169691Skan		    br->br_cons_tail, cons_head);
224169691Skan#endif
225169691Skan	br->br_cons_tail = cons_next;
226169691Skan	return (buf);
227169691Skan}
228169691Skan
229169691Skan/*
230169691Skan * single-consumer advance after a peek
231169691Skan * use where it is protected by a lock
232169691Skan * e.g. a network driver's tx queue lock
233169691Skan */
234169691Skanstatic __inline void
235169691Skanbuf_ring_advance_sc(struct buf_ring *br)
236169691Skan{
237169691Skan	uint32_t cons_head, cons_next;
238169691Skan	uint32_t prod_tail;
239169691Skan
240169691Skan	cons_head = br->br_cons_head;
241169691Skan	prod_tail = br->br_prod_tail;
242169691Skan
243169691Skan	cons_next = (cons_head + 1) & br->br_cons_mask;
244169691Skan	if (cons_head == prod_tail)
245169691Skan		return;
246169691Skan	br->br_cons_head = cons_next;
247169691Skan#ifdef DEBUG_BUFRING
248169691Skan	br->br_ring[cons_head] = NULL;
249169691Skan#endif
250169691Skan	br->br_cons_tail = cons_next;
251169691Skan}
252169691Skan
253169691Skan/*
254169691Skan * Used to return a buffer (most likely already there)
255169691Skan * to the top of the ring. The caller should *not*
256169691Skan * have used any dequeue to pull it out of the ring
257169691Skan * but instead should have used the peek() function.
258169691Skan * This is normally used where the transmit queue
259169691Skan * of a driver is full, and an mbuf must be returned.
260169691Skan * Most likely whats in the ring-buffer is what
261169691Skan * is being put back (since it was not removed), but
262169691Skan * sometimes the lower transmit function may have
263169691Skan * done a pullup or other function that will have
264169691Skan * changed it. As an optimization we always put it
265169691Skan * back (since jhb says the store is probably cheaper),
266169691Skan * if we have to do a multi-queue version we will need
267169691Skan * the compare and an atomic.
268169691Skan */
269169691Skanstatic __inline void
270169691Skanbuf_ring_putback_sc(struct buf_ring *br, void *new)
271169691Skan{
272169691Skan	KASSERT(br->br_cons_head != br->br_prod_tail,
273169691Skan		("Buf-Ring has none in putback")) ;
274169691Skan	br->br_ring[br->br_cons_head] = new;
275169691Skan}
276169691Skan
277169691Skan/*
278169691Skan * return a pointer to the first entry in the ring
279169691Skan * without modifying it, or NULL if the ring is empty
280169691Skan * race-prone if not protected by a lock
281169691Skan */
282169691Skanstatic __inline void *
283169691Skanbuf_ring_peek(struct buf_ring *br)
284169691Skan{
285169691Skan
286169691Skan#ifdef DEBUG_BUFRING
287169691Skan	if ((br->br_lock != NULL) && !mtx_owned(br->br_lock))
288169691Skan		panic("lock not held on single consumer dequeue");
289169691Skan#endif
290169691Skan	/*
291169691Skan	 * I believe it is safe to not have a memory barrier
292169691Skan	 * here because we control cons and tail is worst case
293169691Skan	 * a lagging indicator so we worst case we might
294169691Skan	 * return NULL immediately after a buffer has been enqueued
295169691Skan	 */
296169691Skan	if (br->br_cons_head == br->br_prod_tail)
297169691Skan		return (NULL);
298169691Skan
299169691Skan	return (br->br_ring[br->br_cons_head]);
300169691Skan}
301169691Skan
302169691Skanstatic __inline void *
303169691Skanbuf_ring_peek_clear_sc(struct buf_ring *br)
304169691Skan{
305169691Skan#ifdef DEBUG_BUFRING
306169691Skan	void *ret;
307169691Skan
308169691Skan	if (!mtx_owned(br->br_lock))
309169691Skan		panic("lock not held on single consumer dequeue");
310169691Skan#endif
311169691Skan
312169691Skan	if (br->br_cons_head == br->br_prod_tail)
313169691Skan		return (NULL);
314169691Skan
315169691Skan#if defined(__arm__) || defined(__aarch64__)
316169691Skan	/*
317169691Skan	 * The barrier is required there on ARM and ARM64 to ensure, that
318169691Skan	 * br->br_ring[br->br_cons_head] will not be fetched before the above
319169691Skan	 * condition is checked.
320169691Skan	 * Without the barrier, it is possible, that buffer will be fetched
321169691Skan	 * before the enqueue will put mbuf into br, then, in the meantime, the
322169691Skan	 * enqueue will update the array and the br_prod_tail, and the
323169691Skan	 * conditional check will be true, so we will return previously fetched
324169691Skan	 * (and invalid) buffer.
325169691Skan	 */
326169691Skan	atomic_thread_fence_acq();
327169691Skan#endif
328169691Skan
329169691Skan#ifdef DEBUG_BUFRING
330169691Skan	/*
331169691Skan	 * Single consumer, i.e. cons_head will not move while we are
332169691Skan	 * running, so atomic_swap_ptr() is not necessary here.
333169691Skan	 */
334169691Skan	ret = br->br_ring[br->br_cons_head];
335169691Skan	br->br_ring[br->br_cons_head] = NULL;
336169691Skan	return (ret);
337169691Skan#else
338169691Skan	return (br->br_ring[br->br_cons_head]);
339169691Skan#endif
340169691Skan}
341169691Skan
342169691Skanstatic __inline int
343169691Skanbuf_ring_full(struct buf_ring *br)
344169691Skan{
345169691Skan
346169691Skan	return (((br->br_prod_head + 1) & br->br_prod_mask) == br->br_cons_tail);
347169691Skan}
348169691Skan
349169691Skanstatic __inline int
350169691Skanbuf_ring_empty(struct buf_ring *br)
351169691Skan{
352169691Skan
353169691Skan	return (br->br_cons_head == br->br_prod_tail);
354169691Skan}
355169691Skan
356169691Skanstatic __inline int
357169691Skanbuf_ring_count(struct buf_ring *br)
358169691Skan{
359169691Skan
360169691Skan	return ((br->br_prod_size + br->br_prod_tail - br->br_cons_tail)
361169691Skan	    & br->br_prod_mask);
362169691Skan}
363169691Skan
364169691Skanstruct buf_ring *buf_ring_alloc(int count, struct malloc_type *type, int flags,
365169691Skan    struct mtx *);
366169691Skanvoid buf_ring_free(struct buf_ring *br, struct malloc_type *type);
367169691Skan
368169691Skan#endif
369169691Skan