bqueue.c revision 288571
1/*
2 * CDDL HEADER START
3 *
4 * This file and its contents are supplied under the terms of the
5 * Common Development and Distribution License ("CDDL"), version 1.0.
6 * You may only use this file in accordance with the terms of version
7 * 1.0 of the CDDL.
8 *
9 * A full copy of the text of the CDDL should have accompanied this
10 * source.  A copy of the CDDL is also available via the Internet at
11 * http://www.illumos.org/license/CDDL.
12 *
13 * CDDL HEADER END
14 */
15/*
16 * Copyright (c) 2014 by Delphix. All rights reserved.
17 */
18
19#include	<sys/bqueue.h>
20#include	<sys/zfs_context.h>
21
22static inline bqueue_node_t *
23obj2node(bqueue_t *q, void *data)
24{
25	return ((bqueue_node_t *)((char *)data + q->bq_node_offset));
26}
27
28/*
29 * Initialize a blocking queue  The maximum capacity of the queue is set to
30 * size.  Types that want to be stored in a bqueue must contain a bqueue_node_t,
31 * and offset should give its offset from the start of the struct.  Return 0 on
32 * success, or -1 on failure.
33 */
34int
35bqueue_init(bqueue_t *q, uint64_t size, size_t node_offset)
36{
37	list_create(&q->bq_list, node_offset + sizeof (bqueue_node_t),
38	    node_offset + offsetof(bqueue_node_t, bqn_node));
39	cv_init(&q->bq_add_cv, NULL, CV_DEFAULT, NULL);
40	cv_init(&q->bq_pop_cv, NULL, CV_DEFAULT, NULL);
41	mutex_init(&q->bq_lock, NULL, MUTEX_DEFAULT, NULL);
42	q->bq_node_offset = node_offset;
43	q->bq_size = 0;
44	q->bq_maxsize = size;
45	return (0);
46}
47
48/*
49 * Destroy a blocking queue.  This function asserts that there are no
50 * elements in the queue, and no one is blocked on the condition
51 * variables.
52 */
53void
54bqueue_destroy(bqueue_t *q)
55{
56	ASSERT0(q->bq_size);
57	cv_destroy(&q->bq_add_cv);
58	cv_destroy(&q->bq_pop_cv);
59	mutex_destroy(&q->bq_lock);
60	list_destroy(&q->bq_list);
61}
62
63/*
64 * Add data to q, consuming size units of capacity.  If there is insufficient
65 * capacity to consume size units, block until capacity exists.  Asserts size is
66 * > 0.
67 */
68void
69bqueue_enqueue(bqueue_t *q, void *data, uint64_t item_size)
70{
71	ASSERT3U(item_size, >, 0);
72	ASSERT3U(item_size, <, q->bq_maxsize);
73	mutex_enter(&q->bq_lock);
74	obj2node(q, data)->bqn_size = item_size;
75	while (q->bq_size + item_size > q->bq_maxsize) {
76		cv_wait(&q->bq_add_cv, &q->bq_lock);
77	}
78	q->bq_size += item_size;
79	list_insert_tail(&q->bq_list, data);
80	cv_signal(&q->bq_pop_cv);
81	mutex_exit(&q->bq_lock);
82}
83/*
84 * Take the first element off of q.  If there are no elements on the queue, wait
85 * until one is put there.  Return the removed element.
86 */
87void *
88bqueue_dequeue(bqueue_t *q)
89{
90	void *ret;
91	uint64_t item_size;
92	mutex_enter(&q->bq_lock);
93	while (q->bq_size == 0) {
94		cv_wait(&q->bq_pop_cv, &q->bq_lock);
95	}
96	ret = list_remove_head(&q->bq_list);
97	item_size = obj2node(q, ret)->bqn_size;
98	q->bq_size -= item_size;
99	mutex_exit(&q->bq_lock);
100	cv_signal(&q->bq_add_cv);
101	return (ret);
102}
103
104/*
105 * Returns true if the space used is 0.
106 */
107boolean_t
108bqueue_empty(bqueue_t *q)
109{
110	return (q->bq_size == 0);
111}
112