197403Sobrien/*- 297403Sobrien * SPDX-License-Identifier: BSD-2-Clause 3169691Skan * 4169691Skan * Copyright (c) 2007-2009 Kip Macy <kmacy@freebsd.org> 597403Sobrien * All rights reserved. 697403Sobrien * 797403Sobrien * Redistribution and use in source and binary forms, with or without 897403Sobrien * modification, are permitted provided that the following conditions 997403Sobrien * are met: 1097403Sobrien * 1. Redistributions of source code must retain the above copyright 1197403Sobrien * notice, this list of conditions and the following disclaimer. 1297403Sobrien * 2. Redistributions in binary form must reproduce the above copyright 1397403Sobrien * notice, this list of conditions and the following disclaimer in the 1497403Sobrien * documentation and/or other materials provided with the distribution. 1597403Sobrien * 1697403Sobrien * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 1797403Sobrien * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 1897403Sobrien * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 1997403Sobrien * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 2097403Sobrien * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 2197403Sobrien * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22169691Skan * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 2397403Sobrien * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 2497403Sobrien * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25132720Skan * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 2697403Sobrien * SUCH DAMAGE. 27117397Skan * 28132720Skan */ 29117397Skan 30117397Skan#ifndef _SYS_BUF_RING_H_ 31117397Skan#define _SYS_BUF_RING_H_ 32117397Skan 33117397Skan#include <machine/cpu.h> 34117397Skan 35117397Skan#ifdef DEBUG_BUFRING 36117397Skan#include <sys/lock.h> 37117397Skan#include <sys/mutex.h> 38117397Skan#endif 39117397Skan 40117397Skanstruct buf_ring { 41117397Skan volatile uint32_t br_prod_head; 42117397Skan volatile uint32_t br_prod_tail; 43117397Skan int br_prod_size; 44117397Skan int br_prod_mask; 45117397Skan uint64_t br_drops; 46117397Skan volatile uint32_t br_cons_head __aligned(CACHE_LINE_SIZE); 47117397Skan volatile uint32_t br_cons_tail; 48117397Skan int br_cons_size; 49117397Skan int br_cons_mask; 50117397Skan#ifdef DEBUG_BUFRING 51117397Skan struct mtx *br_lock; 52117397Skan#endif 53117397Skan void *br_ring[0] __aligned(CACHE_LINE_SIZE); 54117397Skan}; 55117397Skan 56117397Skan/* 57117397Skan * multi-producer safe lock-free ring buffer enqueue 58117397Skan * 59117397Skan */ 60132720Skanstatic __inline int 61117397Skanbuf_ring_enqueue(struct buf_ring *br, void *buf) 62117397Skan{ 63117397Skan uint32_t prod_head, prod_next, cons_tail; 64117397Skan#ifdef DEBUG_BUFRING 65117397Skan int i; 66117397Skan 67117397Skan /* 68117397Skan * Note: It is possible to encounter an mbuf that was removed 69117397Skan * via drbr_peek(), and then re-added via drbr_putback() and 70117397Skan * trigger a spurious panic. 71117397Skan */ 72117397Skan for (i = br->br_cons_head; i != br->br_prod_head; 73117397Skan i = ((i + 1) & br->br_cons_mask)) 74117397Skan if (br->br_ring[i] == buf) 75117397Skan panic("buf=%p already enqueue at %d prod=%d cons=%d", 76117397Skan buf, i, br->br_prod_tail, br->br_cons_tail); 77117397Skan#endif 78117397Skan critical_enter(); 79117397Skan do { 80117397Skan prod_head = br->br_prod_head; 81117397Skan prod_next = (prod_head + 1) & br->br_prod_mask; 82117397Skan cons_tail = br->br_cons_tail; 83117397Skan 84117397Skan if (prod_next == cons_tail) { 85117397Skan rmb(); 86117397Skan if (prod_head == br->br_prod_head && 87117397Skan cons_tail == br->br_cons_tail) { 88117397Skan br->br_drops++; 89117397Skan critical_exit(); 90117397Skan return (ENOBUFS); 91117397Skan } 92132720Skan continue; 9397403Sobrien } 9497403Sobrien } while (!atomic_cmpset_acq_int(&br->br_prod_head, prod_head, prod_next)); 95132720Skan#ifdef DEBUG_BUFRING 9697403Sobrien if (br->br_ring[prod_head] != NULL) 9797403Sobrien panic("dangling value in enqueue"); 9897403Sobrien#endif 9997403Sobrien br->br_ring[prod_head] = buf; 10097403Sobrien 10197403Sobrien /* 10297403Sobrien * If there are other enqueues in progress 10397403Sobrien * that preceded us, we need to wait for them 10497403Sobrien * to complete 105117397Skan */ 10697403Sobrien while (br->br_prod_tail != prod_head) 10797403Sobrien cpu_spinwait(); 10897403Sobrien atomic_store_rel_int(&br->br_prod_tail, prod_next); 10997403Sobrien critical_exit(); 11097403Sobrien return (0); 11197403Sobrien} 11297403Sobrien 113117397Skan/* 114117397Skan * multi-consumer safe dequeue 11597403Sobrien * 11697403Sobrien */ 11797403Sobrienstatic __inline void * 11897403Sobrienbuf_ring_dequeue_mc(struct buf_ring *br) 11997403Sobrien{ 120169691Skan uint32_t cons_head, cons_next; 121132720Skan void *buf; 12297403Sobrien 12397403Sobrien critical_enter(); 12497403Sobrien do { 12597403Sobrien cons_head = br->br_cons_head; 12697403Sobrien cons_next = (cons_head + 1) & br->br_cons_mask; 12797403Sobrien 12897403Sobrien if (cons_head == br->br_prod_tail) { 12997403Sobrien critical_exit(); 13097403Sobrien return (NULL); 13197403Sobrien } 13297403Sobrien } while (!atomic_cmpset_acq_int(&br->br_cons_head, cons_head, cons_next)); 13397403Sobrien 13497403Sobrien buf = br->br_ring[cons_head]; 13597403Sobrien#ifdef DEBUG_BUFRING 13697403Sobrien br->br_ring[cons_head] = NULL; 13797403Sobrien#endif 13897403Sobrien /* 13997403Sobrien * If there are other dequeues in progress 14097403Sobrien * that preceded us, we need to wait for them 14197403Sobrien * to complete 14297403Sobrien */ 14397403Sobrien while (br->br_cons_tail != cons_head) 14497403Sobrien cpu_spinwait(); 14597403Sobrien 14697403Sobrien atomic_store_rel_int(&br->br_cons_tail, cons_next); 14797403Sobrien critical_exit(); 14897403Sobrien 14997403Sobrien return (buf); 15097403Sobrien} 15197403Sobrien 15297403Sobrien/* 15397403Sobrien * single-consumer dequeue 15497403Sobrien * use where dequeue is protected by a lock 155132720Skan * e.g. a network driver's tx queue lock 156132720Skan */ 157117397Skanstatic __inline void * 15897403Sobrienbuf_ring_dequeue_sc(struct buf_ring *br) 159132720Skan{ 16097403Sobrien uint32_t cons_head, cons_next; 16197403Sobrien#ifdef PREFETCH_DEFINED 16297403Sobrien uint32_t cons_next_next; 16397403Sobrien#endif 16497403Sobrien uint32_t prod_tail; 16597403Sobrien void *buf; 16697403Sobrien 16797403Sobrien /* 16897403Sobrien * This is a workaround to allow using buf_ring on ARM and ARM64. 16997403Sobrien * ARM64TODO: Fix buf_ring in a generic way. 17097403Sobrien * REMARKS: It is suspected that br_cons_head does not require 17197403Sobrien * load_acq operation, but this change was extensively tested 17297403Sobrien * and confirmed it's working. To be reviewed once again in 17397403Sobrien * FreeBSD-12. 17497403Sobrien * 17597403Sobrien * Preventing following situation: 17697403Sobrien 17797403Sobrien * Core(0) - buf_ring_enqueue() Core(1) - buf_ring_dequeue_sc() 17897403Sobrien * ----------------------------------------- ---------------------------------------------- 17997403Sobrien * 18097403Sobrien * cons_head = br->br_cons_head; 18197403Sobrien * atomic_cmpset_acq_32(&br->br_prod_head, ...)); 18297403Sobrien * buf = br->br_ring[cons_head]; <see <1>> 18397403Sobrien * br->br_ring[prod_head] = buf; 18497403Sobrien * atomic_store_rel_32(&br->br_prod_tail, ...); 18597403Sobrien * prod_tail = br->br_prod_tail; 18697403Sobrien * if (cons_head == prod_tail) 18797403Sobrien * return (NULL); 18897403Sobrien * <condition is false and code uses invalid(old) buf>` 18997403Sobrien * 19097403Sobrien * <1> Load (on core 1) from br->br_ring[cons_head] can be reordered (speculative readed) by CPU. 19197403Sobrien */ 19297403Sobrien#if defined(__arm__) || defined(__aarch64__) 19397403Sobrien cons_head = atomic_load_acq_32(&br->br_cons_head); 19497403Sobrien#else 19597403Sobrien cons_head = br->br_cons_head; 19697403Sobrien#endif 19797403Sobrien prod_tail = atomic_load_acq_32(&br->br_prod_tail); 19897403Sobrien 199169691Skan cons_next = (cons_head + 1) & br->br_cons_mask; 200169691Skan#ifdef PREFETCH_DEFINED 201169691Skan cons_next_next = (cons_head + 2) & br->br_cons_mask; 202169691Skan#endif 203169691Skan 204169691Skan if (cons_head == prod_tail) 205169691Skan return (NULL); 206169691Skan 207169691Skan#ifdef PREFETCH_DEFINED 208169691Skan if (cons_next != prod_tail) { 209169691Skan prefetch(br->br_ring[cons_next]); 210169691Skan if (cons_next_next != prod_tail) 211169691Skan prefetch(br->br_ring[cons_next_next]); 212169691Skan } 213169691Skan#endif 214169691Skan br->br_cons_head = cons_next; 215169691Skan buf = br->br_ring[cons_head]; 216169691Skan 217169691Skan#ifdef DEBUG_BUFRING 218169691Skan br->br_ring[cons_head] = NULL; 219169691Skan if (!mtx_owned(br->br_lock)) 220169691Skan panic("lock not held on single consumer dequeue"); 221169691Skan if (br->br_cons_tail != cons_head) 222169691Skan panic("inconsistent list cons_tail=%d cons_head=%d", 223169691Skan br->br_cons_tail, cons_head); 224169691Skan#endif 225169691Skan br->br_cons_tail = cons_next; 226169691Skan return (buf); 227169691Skan} 228169691Skan 229169691Skan/* 230169691Skan * single-consumer advance after a peek 231169691Skan * use where it is protected by a lock 232169691Skan * e.g. a network driver's tx queue lock 233169691Skan */ 234169691Skanstatic __inline void 235169691Skanbuf_ring_advance_sc(struct buf_ring *br) 236169691Skan{ 237169691Skan uint32_t cons_head, cons_next; 238169691Skan uint32_t prod_tail; 239169691Skan 240169691Skan cons_head = br->br_cons_head; 241169691Skan prod_tail = br->br_prod_tail; 242169691Skan 243169691Skan cons_next = (cons_head + 1) & br->br_cons_mask; 244169691Skan if (cons_head == prod_tail) 245169691Skan return; 246169691Skan br->br_cons_head = cons_next; 247169691Skan#ifdef DEBUG_BUFRING 248169691Skan br->br_ring[cons_head] = NULL; 249169691Skan#endif 250169691Skan br->br_cons_tail = cons_next; 251169691Skan} 252169691Skan 253169691Skan/* 254169691Skan * Used to return a buffer (most likely already there) 255169691Skan * to the top of the ring. The caller should *not* 256169691Skan * have used any dequeue to pull it out of the ring 257169691Skan * but instead should have used the peek() function. 258169691Skan * This is normally used where the transmit queue 259169691Skan * of a driver is full, and an mbuf must be returned. 260169691Skan * Most likely whats in the ring-buffer is what 261169691Skan * is being put back (since it was not removed), but 262169691Skan * sometimes the lower transmit function may have 263169691Skan * done a pullup or other function that will have 264169691Skan * changed it. As an optimization we always put it 265169691Skan * back (since jhb says the store is probably cheaper), 266169691Skan * if we have to do a multi-queue version we will need 267169691Skan * the compare and an atomic. 268169691Skan */ 269169691Skanstatic __inline void 270169691Skanbuf_ring_putback_sc(struct buf_ring *br, void *new) 271169691Skan{ 272169691Skan KASSERT(br->br_cons_head != br->br_prod_tail, 273169691Skan ("Buf-Ring has none in putback")) ; 274169691Skan br->br_ring[br->br_cons_head] = new; 275169691Skan} 276169691Skan 277169691Skan/* 278169691Skan * return a pointer to the first entry in the ring 279169691Skan * without modifying it, or NULL if the ring is empty 280169691Skan * race-prone if not protected by a lock 281169691Skan */ 282169691Skanstatic __inline void * 283169691Skanbuf_ring_peek(struct buf_ring *br) 284169691Skan{ 285169691Skan 286169691Skan#ifdef DEBUG_BUFRING 287169691Skan if ((br->br_lock != NULL) && !mtx_owned(br->br_lock)) 288169691Skan panic("lock not held on single consumer dequeue"); 289169691Skan#endif 290169691Skan /* 291169691Skan * I believe it is safe to not have a memory barrier 292169691Skan * here because we control cons and tail is worst case 293169691Skan * a lagging indicator so we worst case we might 294169691Skan * return NULL immediately after a buffer has been enqueued 295169691Skan */ 296169691Skan if (br->br_cons_head == br->br_prod_tail) 297169691Skan return (NULL); 298169691Skan 299169691Skan return (br->br_ring[br->br_cons_head]); 300169691Skan} 301169691Skan 302169691Skanstatic __inline void * 303169691Skanbuf_ring_peek_clear_sc(struct buf_ring *br) 304169691Skan{ 305169691Skan#ifdef DEBUG_BUFRING 306169691Skan void *ret; 307169691Skan 308169691Skan if (!mtx_owned(br->br_lock)) 309169691Skan panic("lock not held on single consumer dequeue"); 310169691Skan#endif 311169691Skan 312169691Skan if (br->br_cons_head == br->br_prod_tail) 313169691Skan return (NULL); 314169691Skan 315169691Skan#if defined(__arm__) || defined(__aarch64__) 316169691Skan /* 317169691Skan * The barrier is required there on ARM and ARM64 to ensure, that 318169691Skan * br->br_ring[br->br_cons_head] will not be fetched before the above 319169691Skan * condition is checked. 320169691Skan * Without the barrier, it is possible, that buffer will be fetched 321169691Skan * before the enqueue will put mbuf into br, then, in the meantime, the 322169691Skan * enqueue will update the array and the br_prod_tail, and the 323169691Skan * conditional check will be true, so we will return previously fetched 324169691Skan * (and invalid) buffer. 325169691Skan */ 326169691Skan atomic_thread_fence_acq(); 327169691Skan#endif 328169691Skan 329169691Skan#ifdef DEBUG_BUFRING 330169691Skan /* 331169691Skan * Single consumer, i.e. cons_head will not move while we are 332169691Skan * running, so atomic_swap_ptr() is not necessary here. 333169691Skan */ 334169691Skan ret = br->br_ring[br->br_cons_head]; 335169691Skan br->br_ring[br->br_cons_head] = NULL; 336169691Skan return (ret); 337169691Skan#else 338169691Skan return (br->br_ring[br->br_cons_head]); 339169691Skan#endif 340169691Skan} 341169691Skan 342169691Skanstatic __inline int 343169691Skanbuf_ring_full(struct buf_ring *br) 344169691Skan{ 345169691Skan 346169691Skan return (((br->br_prod_head + 1) & br->br_prod_mask) == br->br_cons_tail); 347169691Skan} 348169691Skan 349169691Skanstatic __inline int 350169691Skanbuf_ring_empty(struct buf_ring *br) 351169691Skan{ 352169691Skan 353169691Skan return (br->br_cons_head == br->br_prod_tail); 354169691Skan} 355169691Skan 356169691Skanstatic __inline int 357169691Skanbuf_ring_count(struct buf_ring *br) 358169691Skan{ 359169691Skan 360169691Skan return ((br->br_prod_size + br->br_prod_tail - br->br_cons_tail) 361169691Skan & br->br_prod_mask); 362169691Skan} 363169691Skan 364169691Skanstruct buf_ring *buf_ring_alloc(int count, struct malloc_type *type, int flags, 365169691Skan struct mtx *); 366169691Skanvoid buf_ring_free(struct buf_ring *br, struct malloc_type *type); 367169691Skan 368169691Skan#endif 369169691Skan