1/*-
2 * SPDX-License-Identifier: BSD-2-Clause-FreeBSD
3 *
4 * Copyright (c) 2007-2009 Kip Macy <kmacy@freebsd.org>
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 * 1. Redistributions of source code must retain the above copyright
11 *    notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 *    notice, this list of conditions and the following disclaimer in the
14 *    documentation and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26 * SUCH DAMAGE.
27 *
28 * $FreeBSD$
29 *
30 */
31
32#ifndef	_SYS_BUF_RING_H_
33#define	_SYS_BUF_RING_H_
34
35#include <machine/cpu.h>
36
37#ifdef DEBUG_BUFRING
38#include <sys/lock.h>
39#include <sys/mutex.h>
40#endif
41
42struct buf_ring {
43	volatile uint32_t	br_prod_head;
44	volatile uint32_t	br_prod_tail;
45	int              	br_prod_size;
46	int              	br_prod_mask;
47	uint64_t		br_drops;
48	volatile uint32_t	br_cons_head __aligned(CACHE_LINE_SIZE);
49	volatile uint32_t	br_cons_tail;
50	int		 	br_cons_size;
51	int              	br_cons_mask;
52#ifdef DEBUG_BUFRING
53	struct mtx		*br_lock;
54#endif
55	void			*br_ring[0] __aligned(CACHE_LINE_SIZE);
56};
57
58/*
59 * multi-producer safe lock-free ring buffer enqueue
60 *
61 */
62static __inline int
63buf_ring_enqueue(struct buf_ring *br, void *buf)
64{
65	uint32_t prod_head, prod_next, cons_tail;
66#ifdef DEBUG_BUFRING
67	int i;
68
69	/*
70	 * Note: It is possible to encounter an mbuf that was removed
71	 * via drbr_peek(), and then re-added via drbr_putback() and
72	 * trigger a spurious panic.
73	 */
74	for (i = br->br_cons_head; i != br->br_prod_head;
75	     i = ((i + 1) & br->br_cons_mask))
76		if(br->br_ring[i] == buf)
77			panic("buf=%p already enqueue at %d prod=%d cons=%d",
78			    buf, i, br->br_prod_tail, br->br_cons_tail);
79#endif
80	critical_enter();
81	do {
82		prod_head = br->br_prod_head;
83		prod_next = (prod_head + 1) & br->br_prod_mask;
84		cons_tail = br->br_cons_tail;
85
86		if (prod_next == cons_tail) {
87			rmb();
88			if (prod_head == br->br_prod_head &&
89			    cons_tail == br->br_cons_tail) {
90				br->br_drops++;
91				critical_exit();
92				return (ENOBUFS);
93			}
94			continue;
95		}
96	} while (!atomic_cmpset_acq_int(&br->br_prod_head, prod_head, prod_next));
97#ifdef DEBUG_BUFRING
98	if (br->br_ring[prod_head] != NULL)
99		panic("dangling value in enqueue");
100#endif
101	br->br_ring[prod_head] = buf;
102
103	/*
104	 * If there are other enqueues in progress
105	 * that preceded us, we need to wait for them
106	 * to complete
107	 */
108	while (br->br_prod_tail != prod_head)
109		cpu_spinwait();
110	atomic_store_rel_int(&br->br_prod_tail, prod_next);
111	critical_exit();
112	return (0);
113}
114
115/*
116 * multi-consumer safe dequeue
117 *
118 */
119static __inline void *
120buf_ring_dequeue_mc(struct buf_ring *br)
121{
122	uint32_t cons_head, cons_next;
123	void *buf;
124
125	critical_enter();
126	do {
127		cons_head = br->br_cons_head;
128		cons_next = (cons_head + 1) & br->br_cons_mask;
129
130		if (cons_head == br->br_prod_tail) {
131			critical_exit();
132			return (NULL);
133		}
134	} while (!atomic_cmpset_acq_int(&br->br_cons_head, cons_head, cons_next));
135
136	buf = br->br_ring[cons_head];
137#ifdef DEBUG_BUFRING
138	br->br_ring[cons_head] = NULL;
139#endif
140	/*
141	 * If there are other dequeues in progress
142	 * that preceded us, we need to wait for them
143	 * to complete
144	 */
145	while (br->br_cons_tail != cons_head)
146		cpu_spinwait();
147
148	atomic_store_rel_int(&br->br_cons_tail, cons_next);
149	critical_exit();
150
151	return (buf);
152}
153
154/*
155 * single-consumer dequeue
156 * use where dequeue is protected by a lock
157 * e.g. a network driver's tx queue lock
158 */
159static __inline void *
160buf_ring_dequeue_sc(struct buf_ring *br)
161{
162	uint32_t cons_head, cons_next;
163#ifdef PREFETCH_DEFINED
164	uint32_t cons_next_next;
165#endif
166	uint32_t prod_tail;
167	void *buf;
168
169	/*
170	 * This is a workaround to allow using buf_ring on ARM and ARM64.
171	 * ARM64TODO: Fix buf_ring in a generic way.
172	 * REMARKS: It is suspected that br_cons_head does not require
173	 *   load_acq operation, but this change was extensively tested
174	 *   and confirmed it's working. To be reviewed once again in
175	 *   FreeBSD-12.
176	 *
177	 * Preventing following situation:
178
179	 * Core(0) - buf_ring_enqueue()                                       Core(1) - buf_ring_dequeue_sc()
180	 * -----------------------------------------                                       ----------------------------------------------
181	 *
182	 *                                                                                cons_head = br->br_cons_head;
183	 * atomic_cmpset_acq_32(&br->br_prod_head, ...));
184	 *                                                                                buf = br->br_ring[cons_head];     <see <1>>
185	 * br->br_ring[prod_head] = buf;
186	 * atomic_store_rel_32(&br->br_prod_tail, ...);
187	 *                                                                                prod_tail = br->br_prod_tail;
188	 *                                                                                if (cons_head == prod_tail)
189	 *                                                                                        return (NULL);
190	 *                                                                                <condition is false and code uses invalid(old) buf>`
191	 *
192	 * <1> Load (on core 1) from br->br_ring[cons_head] can be reordered (speculative readed) by CPU.
193	 */
194#if defined(__arm__) || defined(__aarch64__)
195	cons_head = atomic_load_acq_32(&br->br_cons_head);
196#else
197	cons_head = br->br_cons_head;
198#endif
199	prod_tail = atomic_load_acq_32(&br->br_prod_tail);
200
201	cons_next = (cons_head + 1) & br->br_cons_mask;
202#ifdef PREFETCH_DEFINED
203	cons_next_next = (cons_head + 2) & br->br_cons_mask;
204#endif
205
206	if (cons_head == prod_tail)
207		return (NULL);
208
209#ifdef PREFETCH_DEFINED
210	if (cons_next != prod_tail) {
211		prefetch(br->br_ring[cons_next]);
212		if (cons_next_next != prod_tail)
213			prefetch(br->br_ring[cons_next_next]);
214	}
215#endif
216	br->br_cons_head = cons_next;
217	buf = br->br_ring[cons_head];
218
219#ifdef DEBUG_BUFRING
220	br->br_ring[cons_head] = NULL;
221	if (!mtx_owned(br->br_lock))
222		panic("lock not held on single consumer dequeue");
223	if (br->br_cons_tail != cons_head)
224		panic("inconsistent list cons_tail=%d cons_head=%d",
225		    br->br_cons_tail, cons_head);
226#endif
227	br->br_cons_tail = cons_next;
228	return (buf);
229}
230
231/*
232 * single-consumer advance after a peek
233 * use where it is protected by a lock
234 * e.g. a network driver's tx queue lock
235 */
236static __inline void
237buf_ring_advance_sc(struct buf_ring *br)
238{
239	uint32_t cons_head, cons_next;
240	uint32_t prod_tail;
241
242	cons_head = br->br_cons_head;
243	prod_tail = br->br_prod_tail;
244
245	cons_next = (cons_head + 1) & br->br_cons_mask;
246	if (cons_head == prod_tail)
247		return;
248	br->br_cons_head = cons_next;
249#ifdef DEBUG_BUFRING
250	br->br_ring[cons_head] = NULL;
251#endif
252	br->br_cons_tail = cons_next;
253}
254
255/*
256 * Used to return a buffer (most likely already there)
257 * to the top of the ring. The caller should *not*
258 * have used any dequeue to pull it out of the ring
259 * but instead should have used the peek() function.
260 * This is normally used where the transmit queue
261 * of a driver is full, and an mbuf must be returned.
262 * Most likely whats in the ring-buffer is what
263 * is being put back (since it was not removed), but
264 * sometimes the lower transmit function may have
265 * done a pullup or other function that will have
266 * changed it. As an optimization we always put it
267 * back (since jhb says the store is probably cheaper),
268 * if we have to do a multi-queue version we will need
269 * the compare and an atomic.
270 */
271static __inline void
272buf_ring_putback_sc(struct buf_ring *br, void *new)
273{
274	KASSERT(br->br_cons_head != br->br_prod_tail,
275		("Buf-Ring has none in putback")) ;
276	br->br_ring[br->br_cons_head] = new;
277}
278
279/*
280 * return a pointer to the first entry in the ring
281 * without modifying it, or NULL if the ring is empty
282 * race-prone if not protected by a lock
283 */
284static __inline void *
285buf_ring_peek(struct buf_ring *br)
286{
287
288#ifdef DEBUG_BUFRING
289	if ((br->br_lock != NULL) && !mtx_owned(br->br_lock))
290		panic("lock not held on single consumer dequeue");
291#endif
292	/*
293	 * I believe it is safe to not have a memory barrier
294	 * here because we control cons and tail is worst case
295	 * a lagging indicator so we worst case we might
296	 * return NULL immediately after a buffer has been enqueued
297	 */
298	if (br->br_cons_head == br->br_prod_tail)
299		return (NULL);
300
301	return (br->br_ring[br->br_cons_head]);
302}
303
304static __inline void *
305buf_ring_peek_clear_sc(struct buf_ring *br)
306{
307#ifdef DEBUG_BUFRING
308	void *ret;
309
310	if (!mtx_owned(br->br_lock))
311		panic("lock not held on single consumer dequeue");
312#endif
313
314	if (br->br_cons_head == br->br_prod_tail)
315		return (NULL);
316
317#if defined(__arm__) || defined(__aarch64__)
318	/*
319	 * The barrier is required there on ARM and ARM64 to ensure, that
320	 * br->br_ring[br->br_cons_head] will not be fetched before the above
321	 * condition is checked.
322	 * Without the barrier, it is possible, that buffer will be fetched
323	 * before the enqueue will put mbuf into br, then, in the meantime, the
324	 * enqueue will update the array and the br_prod_tail, and the
325	 * conditional check will be true, so we will return previously fetched
326	 * (and invalid) buffer.
327	 */
328	atomic_thread_fence_acq();
329#endif
330
331#ifdef DEBUG_BUFRING
332	/*
333	 * Single consumer, i.e. cons_head will not move while we are
334	 * running, so atomic_swap_ptr() is not necessary here.
335	 */
336	ret = br->br_ring[br->br_cons_head];
337	br->br_ring[br->br_cons_head] = NULL;
338	return (ret);
339#else
340	return (br->br_ring[br->br_cons_head]);
341#endif
342}
343
344static __inline int
345buf_ring_full(struct buf_ring *br)
346{
347
348	return (((br->br_prod_head + 1) & br->br_prod_mask) == br->br_cons_tail);
349}
350
351static __inline int
352buf_ring_empty(struct buf_ring *br)
353{
354
355	return (br->br_cons_head == br->br_prod_tail);
356}
357
358static __inline int
359buf_ring_count(struct buf_ring *br)
360{
361
362	return ((br->br_prod_size + br->br_prod_tail - br->br_cons_tail)
363	    & br->br_prod_mask);
364}
365
366struct buf_ring *buf_ring_alloc(int count, struct malloc_type *type, int flags,
367    struct mtx *);
368void buf_ring_free(struct buf_ring *br, struct malloc_type *type);
369
370#endif
371