1/*- 2 * SPDX-License-Identifier: BSD-2-Clause-FreeBSD 3 * 4 * Copyright (c) 2007-2009 Kip Macy <kmacy@freebsd.org> 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26 * SUCH DAMAGE. 27 * 28 * $FreeBSD$ 29 * 30 */ 31 32#ifndef _SYS_BUF_RING_H_ 33#define _SYS_BUF_RING_H_ 34 35#include <machine/cpu.h> 36 37#ifdef DEBUG_BUFRING 38#include <sys/lock.h> 39#include <sys/mutex.h> 40#endif 41 42struct buf_ring { 43 volatile uint32_t br_prod_head; 44 volatile uint32_t br_prod_tail; 45 int br_prod_size; 46 int br_prod_mask; 47 uint64_t br_drops; 48 volatile uint32_t br_cons_head __aligned(CACHE_LINE_SIZE); 49 volatile uint32_t br_cons_tail; 50 int br_cons_size; 51 int br_cons_mask; 52#ifdef DEBUG_BUFRING 53 struct mtx *br_lock; 54#endif 55 void *br_ring[0] __aligned(CACHE_LINE_SIZE); 56}; 57 58/* 59 * multi-producer safe lock-free ring buffer enqueue 60 * 61 */ 62static __inline int 63buf_ring_enqueue(struct buf_ring *br, void *buf) 64{ 65 uint32_t prod_head, prod_next, cons_tail; 66#ifdef DEBUG_BUFRING 67 int i; 68 69 /* 70 * Note: It is possible to encounter an mbuf that was removed 71 * via drbr_peek(), and then re-added via drbr_putback() and 72 * trigger a spurious panic. 73 */ 74 for (i = br->br_cons_head; i != br->br_prod_head; 75 i = ((i + 1) & br->br_cons_mask)) 76 if(br->br_ring[i] == buf) 77 panic("buf=%p already enqueue at %d prod=%d cons=%d", 78 buf, i, br->br_prod_tail, br->br_cons_tail); 79#endif 80 critical_enter(); 81 do { 82 prod_head = br->br_prod_head; 83 prod_next = (prod_head + 1) & br->br_prod_mask; 84 cons_tail = br->br_cons_tail; 85 86 if (prod_next == cons_tail) { 87 rmb(); 88 if (prod_head == br->br_prod_head && 89 cons_tail == br->br_cons_tail) { 90 br->br_drops++; 91 critical_exit(); 92 return (ENOBUFS); 93 } 94 continue; 95 } 96 } while (!atomic_cmpset_acq_int(&br->br_prod_head, prod_head, prod_next)); 97#ifdef DEBUG_BUFRING 98 if (br->br_ring[prod_head] != NULL) 99 panic("dangling value in enqueue"); 100#endif 101 br->br_ring[prod_head] = buf; 102 103 /* 104 * If there are other enqueues in progress 105 * that preceded us, we need to wait for them 106 * to complete 107 */ 108 while (br->br_prod_tail != prod_head) 109 cpu_spinwait(); 110 atomic_store_rel_int(&br->br_prod_tail, prod_next); 111 critical_exit(); 112 return (0); 113} 114 115/* 116 * multi-consumer safe dequeue 117 * 118 */ 119static __inline void * 120buf_ring_dequeue_mc(struct buf_ring *br) 121{ 122 uint32_t cons_head, cons_next; 123 void *buf; 124 125 critical_enter(); 126 do { 127 cons_head = br->br_cons_head; 128 cons_next = (cons_head + 1) & br->br_cons_mask; 129 130 if (cons_head == br->br_prod_tail) { 131 critical_exit(); 132 return (NULL); 133 } 134 } while (!atomic_cmpset_acq_int(&br->br_cons_head, cons_head, cons_next)); 135 136 buf = br->br_ring[cons_head]; 137#ifdef DEBUG_BUFRING 138 br->br_ring[cons_head] = NULL; 139#endif 140 /* 141 * If there are other dequeues in progress 142 * that preceded us, we need to wait for them 143 * to complete 144 */ 145 while (br->br_cons_tail != cons_head) 146 cpu_spinwait(); 147 148 atomic_store_rel_int(&br->br_cons_tail, cons_next); 149 critical_exit(); 150 151 return (buf); 152} 153 154/* 155 * single-consumer dequeue 156 * use where dequeue is protected by a lock 157 * e.g. a network driver's tx queue lock 158 */ 159static __inline void * 160buf_ring_dequeue_sc(struct buf_ring *br) 161{ 162 uint32_t cons_head, cons_next; 163#ifdef PREFETCH_DEFINED 164 uint32_t cons_next_next; 165#endif 166 uint32_t prod_tail; 167 void *buf; 168 169 /* 170 * This is a workaround to allow using buf_ring on ARM and ARM64. 171 * ARM64TODO: Fix buf_ring in a generic way. 172 * REMARKS: It is suspected that br_cons_head does not require 173 * load_acq operation, but this change was extensively tested 174 * and confirmed it's working. To be reviewed once again in 175 * FreeBSD-12. 176 * 177 * Preventing following situation: 178 179 * Core(0) - buf_ring_enqueue() Core(1) - buf_ring_dequeue_sc() 180 * ----------------------------------------- ---------------------------------------------- 181 * 182 * cons_head = br->br_cons_head; 183 * atomic_cmpset_acq_32(&br->br_prod_head, ...)); 184 * buf = br->br_ring[cons_head]; <see <1>> 185 * br->br_ring[prod_head] = buf; 186 * atomic_store_rel_32(&br->br_prod_tail, ...); 187 * prod_tail = br->br_prod_tail; 188 * if (cons_head == prod_tail) 189 * return (NULL); 190 * <condition is false and code uses invalid(old) buf>` 191 * 192 * <1> Load (on core 1) from br->br_ring[cons_head] can be reordered (speculative readed) by CPU. 193 */ 194#if defined(__arm__) || defined(__aarch64__) 195 cons_head = atomic_load_acq_32(&br->br_cons_head); 196#else 197 cons_head = br->br_cons_head; 198#endif 199 prod_tail = atomic_load_acq_32(&br->br_prod_tail); 200 201 cons_next = (cons_head + 1) & br->br_cons_mask; 202#ifdef PREFETCH_DEFINED 203 cons_next_next = (cons_head + 2) & br->br_cons_mask; 204#endif 205 206 if (cons_head == prod_tail) 207 return (NULL); 208 209#ifdef PREFETCH_DEFINED 210 if (cons_next != prod_tail) { 211 prefetch(br->br_ring[cons_next]); 212 if (cons_next_next != prod_tail) 213 prefetch(br->br_ring[cons_next_next]); 214 } 215#endif 216 br->br_cons_head = cons_next; 217 buf = br->br_ring[cons_head]; 218 219#ifdef DEBUG_BUFRING 220 br->br_ring[cons_head] = NULL; 221 if (!mtx_owned(br->br_lock)) 222 panic("lock not held on single consumer dequeue"); 223 if (br->br_cons_tail != cons_head) 224 panic("inconsistent list cons_tail=%d cons_head=%d", 225 br->br_cons_tail, cons_head); 226#endif 227 br->br_cons_tail = cons_next; 228 return (buf); 229} 230 231/* 232 * single-consumer advance after a peek 233 * use where it is protected by a lock 234 * e.g. a network driver's tx queue lock 235 */ 236static __inline void 237buf_ring_advance_sc(struct buf_ring *br) 238{ 239 uint32_t cons_head, cons_next; 240 uint32_t prod_tail; 241 242 cons_head = br->br_cons_head; 243 prod_tail = br->br_prod_tail; 244 245 cons_next = (cons_head + 1) & br->br_cons_mask; 246 if (cons_head == prod_tail) 247 return; 248 br->br_cons_head = cons_next; 249#ifdef DEBUG_BUFRING 250 br->br_ring[cons_head] = NULL; 251#endif 252 br->br_cons_tail = cons_next; 253} 254 255/* 256 * Used to return a buffer (most likely already there) 257 * to the top of the ring. The caller should *not* 258 * have used any dequeue to pull it out of the ring 259 * but instead should have used the peek() function. 260 * This is normally used where the transmit queue 261 * of a driver is full, and an mbuf must be returned. 262 * Most likely whats in the ring-buffer is what 263 * is being put back (since it was not removed), but 264 * sometimes the lower transmit function may have 265 * done a pullup or other function that will have 266 * changed it. As an optimization we always put it 267 * back (since jhb says the store is probably cheaper), 268 * if we have to do a multi-queue version we will need 269 * the compare and an atomic. 270 */ 271static __inline void 272buf_ring_putback_sc(struct buf_ring *br, void *new) 273{ 274 KASSERT(br->br_cons_head != br->br_prod_tail, 275 ("Buf-Ring has none in putback")) ; 276 br->br_ring[br->br_cons_head] = new; 277} 278 279/* 280 * return a pointer to the first entry in the ring 281 * without modifying it, or NULL if the ring is empty 282 * race-prone if not protected by a lock 283 */ 284static __inline void * 285buf_ring_peek(struct buf_ring *br) 286{ 287 288#ifdef DEBUG_BUFRING 289 if ((br->br_lock != NULL) && !mtx_owned(br->br_lock)) 290 panic("lock not held on single consumer dequeue"); 291#endif 292 /* 293 * I believe it is safe to not have a memory barrier 294 * here because we control cons and tail is worst case 295 * a lagging indicator so we worst case we might 296 * return NULL immediately after a buffer has been enqueued 297 */ 298 if (br->br_cons_head == br->br_prod_tail) 299 return (NULL); 300 301 return (br->br_ring[br->br_cons_head]); 302} 303 304static __inline void * 305buf_ring_peek_clear_sc(struct buf_ring *br) 306{ 307#ifdef DEBUG_BUFRING 308 void *ret; 309 310 if (!mtx_owned(br->br_lock)) 311 panic("lock not held on single consumer dequeue"); 312#endif 313 314 if (br->br_cons_head == br->br_prod_tail) 315 return (NULL); 316 317#if defined(__arm__) || defined(__aarch64__) 318 /* 319 * The barrier is required there on ARM and ARM64 to ensure, that 320 * br->br_ring[br->br_cons_head] will not be fetched before the above 321 * condition is checked. 322 * Without the barrier, it is possible, that buffer will be fetched 323 * before the enqueue will put mbuf into br, then, in the meantime, the 324 * enqueue will update the array and the br_prod_tail, and the 325 * conditional check will be true, so we will return previously fetched 326 * (and invalid) buffer. 327 */ 328 atomic_thread_fence_acq(); 329#endif 330 331#ifdef DEBUG_BUFRING 332 /* 333 * Single consumer, i.e. cons_head will not move while we are 334 * running, so atomic_swap_ptr() is not necessary here. 335 */ 336 ret = br->br_ring[br->br_cons_head]; 337 br->br_ring[br->br_cons_head] = NULL; 338 return (ret); 339#else 340 return (br->br_ring[br->br_cons_head]); 341#endif 342} 343 344static __inline int 345buf_ring_full(struct buf_ring *br) 346{ 347 348 return (((br->br_prod_head + 1) & br->br_prod_mask) == br->br_cons_tail); 349} 350 351static __inline int 352buf_ring_empty(struct buf_ring *br) 353{ 354 355 return (br->br_cons_head == br->br_prod_tail); 356} 357 358static __inline int 359buf_ring_count(struct buf_ring *br) 360{ 361 362 return ((br->br_prod_size + br->br_prod_tail - br->br_cons_tail) 363 & br->br_prod_mask); 364} 365 366struct buf_ring *buf_ring_alloc(int count, struct malloc_type *type, int flags, 367 struct mtx *); 368void buf_ring_free(struct buf_ring *br, struct malloc_type *type); 369 370#endif 371