1267843Sdelphij/*-
2267843Sdelphij * Copyright (c) 2002, Jeffrey Roberson <jeff@freebsd.org>
3267843Sdelphij * Copyright (c) 2008-2009, Lawrence Stewart <lstewart@freebsd.org>
4267843Sdelphij * Copyright (c) 2009-2010, The FreeBSD Foundation
5267843Sdelphij * All rights reserved.
6267843Sdelphij *
7267843Sdelphij * Portions of this software were developed at the Centre for Advanced
8267843Sdelphij * Internet Architectures, Swinburne University of Technology, Melbourne,
9267843Sdelphij * Australia by Lawrence Stewart under sponsorship from the FreeBSD Foundation.
10267843Sdelphij *
11267843Sdelphij * Redistribution and use in source and binary forms, with or without
12267843Sdelphij * modification, are permitted provided that the following conditions
13267843Sdelphij * are met:
14267843Sdelphij * 1. Redistributions of source code must retain the above copyright
15267843Sdelphij *    notice unmodified, this list of conditions, and the following
16267843Sdelphij *    disclaimer.
17267843Sdelphij * 2. Redistributions in binary form must reproduce the above copyright
18267843Sdelphij *    notice, this list of conditions and the following disclaimer in the
19267843Sdelphij *    documentation and/or other materials provided with the distribution.
20267843Sdelphij *
21267843Sdelphij * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
22267843Sdelphij * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
23267843Sdelphij * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
24267843Sdelphij * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
25267843Sdelphij * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
26267843Sdelphij * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27267843Sdelphij * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28267843Sdelphij * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29267843Sdelphij * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
30267843Sdelphij * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31267843Sdelphij */
32267843Sdelphij
33267843Sdelphij#include <sys/cdefs.h>
34267843Sdelphij__FBSDID("$FreeBSD$");
35267843Sdelphij
36267843Sdelphij#include "opt_mac.h"
37267843Sdelphij
38267843Sdelphij#include <sys/param.h>
39267843Sdelphij#include <sys/systm.h>
40267843Sdelphij#include <sys/kernel.h>
41267843Sdelphij#include <sys/kthread.h>
42267843Sdelphij#include <sys/lock.h>
43267843Sdelphij#include <sys/mount.h>
44267843Sdelphij#include <sys/mutex.h>
45267843Sdelphij#include <sys/namei.h>
46267843Sdelphij#include <sys/proc.h>
47267843Sdelphij#include <sys/vnode.h>
48267843Sdelphij#include <sys/alq.h>
49267843Sdelphij#include <sys/malloc.h>
50267843Sdelphij#include <sys/unistd.h>
51267843Sdelphij#include <sys/fcntl.h>
52267843Sdelphij#include <sys/eventhandler.h>
53267843Sdelphij
54267843Sdelphij#include <security/mac/mac_framework.h>
55267843Sdelphij
56267843Sdelphij/* Async. Logging Queue */
57267843Sdelphijstruct alq {
58267843Sdelphij	char	*aq_entbuf;		/* Buffer for stored entries */
59267843Sdelphij	int	aq_entmax;		/* Max entries */
60267843Sdelphij	int	aq_entlen;		/* Entry length */
61267843Sdelphij	int	aq_freebytes;		/* Bytes available in buffer */
62267843Sdelphij	int	aq_buflen;		/* Total length of our buffer */
63267843Sdelphij	int	aq_writehead;		/* Location for next write */
64267843Sdelphij	int	aq_writetail;		/* Flush starts at this location */
65267843Sdelphij	int	aq_wrapearly;		/* # bytes left blank at end of buf */
66267843Sdelphij	int	aq_flags;		/* Queue flags */
67267843Sdelphij	int	aq_waiters;		/* Num threads waiting for resources
68267843Sdelphij					 * NB: Used as a wait channel so must
69267843Sdelphij					 * not be first field in the alq struct
70267843Sdelphij					 */
71267843Sdelphij	struct	ale	aq_getpost;	/* ALE for use by get/post */
72267843Sdelphij	struct mtx	aq_mtx;		/* Queue lock */
73267843Sdelphij	struct vnode	*aq_vp;		/* Open vnode handle */
74267843Sdelphij	struct ucred	*aq_cred;	/* Credentials of the opening thread */
75267843Sdelphij	LIST_ENTRY(alq)	aq_act;		/* List of active queues */
76267843Sdelphij	LIST_ENTRY(alq)	aq_link;	/* List of all queues */
77267843Sdelphij};
78267843Sdelphij
79267843Sdelphij#define	AQ_WANTED	0x0001		/* Wakeup sleeper when io is done */
80267843Sdelphij#define	AQ_ACTIVE	0x0002		/* on the active list */
81267843Sdelphij#define	AQ_FLUSHING	0x0004		/* doing IO */
82267843Sdelphij#define	AQ_SHUTDOWN	0x0008		/* Queue no longer valid */
83267843Sdelphij#define	AQ_ORDERED	0x0010		/* Queue enforces ordered writes */
84267843Sdelphij#define	AQ_LEGACY	0x0020		/* Legacy queue (fixed length writes) */
85267843Sdelphij
86267843Sdelphij#define	ALQ_LOCK(alq)	mtx_lock_spin(&(alq)->aq_mtx)
87267843Sdelphij#define	ALQ_UNLOCK(alq)	mtx_unlock_spin(&(alq)->aq_mtx)
88267843Sdelphij
89267843Sdelphij#define HAS_PENDING_DATA(alq) ((alq)->aq_freebytes != (alq)->aq_buflen)
90267843Sdelphij
91267843Sdelphijstatic MALLOC_DEFINE(M_ALD, "ALD", "ALD");
92267843Sdelphij
93267843Sdelphij/*
94267843Sdelphij * The ald_mtx protects the ald_queues list and the ald_active list.
95267843Sdelphij */
96267843Sdelphijstatic struct mtx ald_mtx;
97267843Sdelphijstatic LIST_HEAD(, alq) ald_queues;
98267843Sdelphijstatic LIST_HEAD(, alq) ald_active;
99267843Sdelphijstatic int ald_shutingdown = 0;
100267843Sdelphijstruct thread *ald_thread;
101267843Sdelphijstatic struct proc *ald_proc;
102267843Sdelphijstatic eventhandler_tag alq_eventhandler_tag = NULL;
103267843Sdelphij
104267843Sdelphij#define	ALD_LOCK()	mtx_lock(&ald_mtx)
105267843Sdelphij#define	ALD_UNLOCK()	mtx_unlock(&ald_mtx)
106267843Sdelphij
107267843Sdelphij/* Daemon functions */
108267843Sdelphijstatic int ald_add(struct alq *);
109267843Sdelphijstatic int ald_rem(struct alq *);
110267843Sdelphijstatic void ald_startup(void *);
111267843Sdelphijstatic void ald_daemon(void);
112267843Sdelphijstatic void ald_shutdown(void *, int);
113267843Sdelphijstatic void ald_activate(struct alq *);
114267843Sdelphijstatic void ald_deactivate(struct alq *);
115267843Sdelphij
116267843Sdelphij/* Internal queue functions */
117267843Sdelphijstatic void alq_shutdown(struct alq *);
118267843Sdelphijstatic void alq_destroy(struct alq *);
119267843Sdelphijstatic int alq_doio(struct alq *);
120267843Sdelphij
121267843Sdelphij
122267843Sdelphij/*
123267843Sdelphij * Add a new queue to the global list.  Fail if we're shutting down.
124267843Sdelphij */
125267843Sdelphijstatic int
126267843Sdelphijald_add(struct alq *alq)
127267843Sdelphij{
128267843Sdelphij	int error;
129267843Sdelphij
130267843Sdelphij	error = 0;
131267843Sdelphij
132267843Sdelphij	ALD_LOCK();
133267843Sdelphij	if (ald_shutingdown) {
134267843Sdelphij		error = EBUSY;
135267843Sdelphij		goto done;
136267843Sdelphij	}
137267843Sdelphij	LIST_INSERT_HEAD(&ald_queues, alq, aq_link);
138267843Sdelphijdone:
139267843Sdelphij	ALD_UNLOCK();
140267843Sdelphij	return (error);
141267843Sdelphij}
142267843Sdelphij
143267843Sdelphij/*
144267843Sdelphij * Remove a queue from the global list unless we're shutting down.  If so,
145267843Sdelphij * the ald will take care of cleaning up it's resources.
146267843Sdelphij */
147267843Sdelphijstatic int
148267843Sdelphijald_rem(struct alq *alq)
149267843Sdelphij{
150267843Sdelphij	int error;
151267843Sdelphij
152267843Sdelphij	error = 0;
153267843Sdelphij
154267843Sdelphij	ALD_LOCK();
155267843Sdelphij	if (ald_shutingdown) {
156267843Sdelphij		error = EBUSY;
157267843Sdelphij		goto done;
158267843Sdelphij	}
159267843Sdelphij	LIST_REMOVE(alq, aq_link);
160267843Sdelphijdone:
161267843Sdelphij	ALD_UNLOCK();
162267843Sdelphij	return (error);
163267843Sdelphij}
164267843Sdelphij
165267843Sdelphij/*
166267843Sdelphij * Put a queue on the active list.  This will schedule it for writing.
167267843Sdelphij */
168267843Sdelphijstatic void
169267843Sdelphijald_activate(struct alq *alq)
170267843Sdelphij{
171267843Sdelphij	LIST_INSERT_HEAD(&ald_active, alq, aq_act);
172267843Sdelphij	wakeup(&ald_active);
173267843Sdelphij}
174267843Sdelphij
175267843Sdelphijstatic void
176267843Sdelphijald_deactivate(struct alq *alq)
177267843Sdelphij{
178267843Sdelphij	LIST_REMOVE(alq, aq_act);
179267843Sdelphij	alq->aq_flags &= ~AQ_ACTIVE;
180267843Sdelphij}
181267843Sdelphij
182267843Sdelphijstatic void
183267843Sdelphijald_startup(void *unused)
184267843Sdelphij{
185267843Sdelphij	mtx_init(&ald_mtx, "ALDmtx", NULL, MTX_DEF|MTX_QUIET);
186267843Sdelphij	LIST_INIT(&ald_queues);
187267843Sdelphij	LIST_INIT(&ald_active);
188267843Sdelphij}
189267843Sdelphij
190267843Sdelphijstatic void
191267843Sdelphijald_daemon(void)
192267843Sdelphij{
193267843Sdelphij	int needwakeup;
194267843Sdelphij	struct alq *alq;
195267843Sdelphij
196267843Sdelphij	ald_thread = FIRST_THREAD_IN_PROC(ald_proc);
197267843Sdelphij
198267843Sdelphij	alq_eventhandler_tag = EVENTHANDLER_REGISTER(shutdown_pre_sync,
199267843Sdelphij	    ald_shutdown, NULL, SHUTDOWN_PRI_FIRST);
200267843Sdelphij
201267843Sdelphij	ALD_LOCK();
202267843Sdelphij
203267843Sdelphij	for (;;) {
204267843Sdelphij		while ((alq = LIST_FIRST(&ald_active)) == NULL &&
205267843Sdelphij		    !ald_shutingdown)
206267843Sdelphij			mtx_sleep(&ald_active, &ald_mtx, PWAIT, "aldslp", 0);
207267843Sdelphij
208267843Sdelphij		/* Don't shutdown until all active ALQs are flushed. */
209267843Sdelphij		if (ald_shutingdown && alq == NULL) {
210267843Sdelphij			ALD_UNLOCK();
211267843Sdelphij			break;
212267843Sdelphij		}
213267843Sdelphij
214267843Sdelphij		ALQ_LOCK(alq);
215267843Sdelphij		ald_deactivate(alq);
216267843Sdelphij		ALD_UNLOCK();
217267843Sdelphij		needwakeup = alq_doio(alq);
218267843Sdelphij		ALQ_UNLOCK(alq);
219267843Sdelphij		if (needwakeup)
220267843Sdelphij			wakeup_one(alq);
221267843Sdelphij		ALD_LOCK();
222267843Sdelphij	}
223267843Sdelphij
224267843Sdelphij	kproc_exit(0);
225267843Sdelphij}
226267843Sdelphij
227267843Sdelphijstatic void
228267843Sdelphijald_shutdown(void *arg, int howto)
229267843Sdelphij{
230267843Sdelphij	struct alq *alq;
231267843Sdelphij
232267843Sdelphij	ALD_LOCK();
233267843Sdelphij
234267843Sdelphij	/* Ensure no new queues can be created. */
235267843Sdelphij	ald_shutingdown = 1;
236267843Sdelphij
237267843Sdelphij	/* Shutdown all ALQs prior to terminating the ald_daemon. */
238267843Sdelphij	while ((alq = LIST_FIRST(&ald_queues)) != NULL) {
239267843Sdelphij		LIST_REMOVE(alq, aq_link);
240267843Sdelphij		ALD_UNLOCK();
241267843Sdelphij		alq_shutdown(alq);
242267843Sdelphij		ALD_LOCK();
243267843Sdelphij	}
244267843Sdelphij
245267843Sdelphij	/* At this point, all ALQs are flushed and shutdown. */
246267843Sdelphij
247267843Sdelphij	/*
248267843Sdelphij	 * Wake ald_daemon so that it exits. It won't be able to do
249267843Sdelphij	 * anything until we mtx_sleep because we hold the ald_mtx.
250267843Sdelphij	 */
251267843Sdelphij	wakeup(&ald_active);
252267843Sdelphij
253267843Sdelphij	/* Wait for ald_daemon to exit. */
254267843Sdelphij	mtx_sleep(ald_proc, &ald_mtx, PWAIT, "aldslp", 0);
255267843Sdelphij
256267843Sdelphij	ALD_UNLOCK();
257267843Sdelphij}
258267843Sdelphij
259267843Sdelphijstatic void
260267843Sdelphijalq_shutdown(struct alq *alq)
261267843Sdelphij{
262267843Sdelphij	ALQ_LOCK(alq);
263267843Sdelphij
264267843Sdelphij	/* Stop any new writers. */
265267843Sdelphij	alq->aq_flags |= AQ_SHUTDOWN;
266267843Sdelphij
267267843Sdelphij	/*
268267843Sdelphij	 * If the ALQ isn't active but has unwritten data (possible if
269267843Sdelphij	 * the ALQ_NOACTIVATE flag has been used), explicitly activate the
270267843Sdelphij	 * ALQ here so that the pending data gets flushed by the ald_daemon.
271267843Sdelphij	 */
272267843Sdelphij	if (!(alq->aq_flags & AQ_ACTIVE) && HAS_PENDING_DATA(alq)) {
273267843Sdelphij		alq->aq_flags |= AQ_ACTIVE;
274267843Sdelphij		ALQ_UNLOCK(alq);
275267843Sdelphij		ALD_LOCK();
276267843Sdelphij		ald_activate(alq);
277267843Sdelphij		ALD_UNLOCK();
278267843Sdelphij		ALQ_LOCK(alq);
279267843Sdelphij	}
280267843Sdelphij
281267843Sdelphij	/* Drain IO */
282267843Sdelphij	while (alq->aq_flags & AQ_ACTIVE) {
283267843Sdelphij		alq->aq_flags |= AQ_WANTED;
284267843Sdelphij		msleep_spin(alq, &alq->aq_mtx, "aldclose", 0);
285267843Sdelphij	}
286267843Sdelphij	ALQ_UNLOCK(alq);
287267843Sdelphij
288267843Sdelphij	vn_close(alq->aq_vp, FWRITE, alq->aq_cred,
289267843Sdelphij	    curthread);
290267843Sdelphij	crfree(alq->aq_cred);
291267843Sdelphij}
292267843Sdelphij
293267843Sdelphijvoid
294267843Sdelphijalq_destroy(struct alq *alq)
295267843Sdelphij{
296267843Sdelphij	/* Drain all pending IO. */
297267843Sdelphij	alq_shutdown(alq);
298267843Sdelphij
299267843Sdelphij	mtx_destroy(&alq->aq_mtx);
300267843Sdelphij	free(alq->aq_entbuf, M_ALD);
301267843Sdelphij	free(alq, M_ALD);
302267843Sdelphij}
303267843Sdelphij
304267843Sdelphij/*
305267843Sdelphij * Flush all pending data to disk.  This operation will block.
306267843Sdelphij */
307267843Sdelphijstatic int
308267843Sdelphijalq_doio(struct alq *alq)
309267843Sdelphij{
310267843Sdelphij	struct thread *td;
311267843Sdelphij	struct mount *mp;
312267843Sdelphij	struct vnode *vp;
313267843Sdelphij	struct uio auio;
314267843Sdelphij	struct iovec aiov[2];
315267843Sdelphij	int totlen;
316267843Sdelphij	int iov;
317267843Sdelphij	int wrapearly;
318267843Sdelphij
319267843Sdelphij	KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
320267843Sdelphij
321267843Sdelphij	vp = alq->aq_vp;
322267843Sdelphij	td = curthread;
323267843Sdelphij	totlen = 0;
324267843Sdelphij	iov = 1;
325267843Sdelphij	wrapearly = alq->aq_wrapearly;
326267843Sdelphij
327267843Sdelphij	bzero(&aiov, sizeof(aiov));
328267843Sdelphij	bzero(&auio, sizeof(auio));
329267843Sdelphij
330267843Sdelphij	/* Start the write from the location of our buffer tail pointer. */
331267843Sdelphij	aiov[0].iov_base = alq->aq_entbuf + alq->aq_writetail;
332267843Sdelphij
333267843Sdelphij	if (alq->aq_writetail < alq->aq_writehead) {
334267843Sdelphij		/* Buffer not wrapped. */
335267843Sdelphij		totlen = aiov[0].iov_len = alq->aq_writehead - alq->aq_writetail;
336267843Sdelphij	} else if (alq->aq_writehead == 0) {
337267843Sdelphij		/* Buffer not wrapped (special case to avoid an empty iov). */
338267843Sdelphij		totlen = aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail -
339267843Sdelphij		    wrapearly;
340267843Sdelphij	} else {
341267843Sdelphij		/*
342267843Sdelphij		 * Buffer wrapped, requires 2 aiov entries:
343267843Sdelphij		 * - first is from writetail to end of buffer
344267843Sdelphij		 * - second is from start of buffer to writehead
345267843Sdelphij		 */
346267843Sdelphij		aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail -
347267843Sdelphij		    wrapearly;
348267843Sdelphij		iov++;
349267843Sdelphij		aiov[1].iov_base = alq->aq_entbuf;
350267843Sdelphij		aiov[1].iov_len =  alq->aq_writehead;
351267843Sdelphij		totlen = aiov[0].iov_len + aiov[1].iov_len;
352267843Sdelphij	}
353267843Sdelphij
354267843Sdelphij	alq->aq_flags |= AQ_FLUSHING;
355267843Sdelphij	ALQ_UNLOCK(alq);
356267843Sdelphij
357267843Sdelphij	auio.uio_iov = &aiov[0];
358267843Sdelphij	auio.uio_offset = 0;
359267843Sdelphij	auio.uio_segflg = UIO_SYSSPACE;
360267843Sdelphij	auio.uio_rw = UIO_WRITE;
361267843Sdelphij	auio.uio_iovcnt = iov;
362267843Sdelphij	auio.uio_resid = totlen;
363267843Sdelphij	auio.uio_td = td;
364267843Sdelphij
365267843Sdelphij	/*
366267843Sdelphij	 * Do all of the junk required to write now.
367267843Sdelphij	 */
368267843Sdelphij	vn_start_write(vp, &mp, V_WAIT);
369267843Sdelphij	vn_lock(vp, LK_EXCLUSIVE | LK_RETRY);
370267843Sdelphij	/*
371267843Sdelphij	 * XXX: VOP_WRITE error checks are ignored.
372267843Sdelphij	 */
373267843Sdelphij#ifdef MAC
374267843Sdelphij	if (mac_vnode_check_write(alq->aq_cred, NOCRED, vp) == 0)
375267843Sdelphij#endif
376267843Sdelphij		VOP_WRITE(vp, &auio, IO_UNIT | IO_APPEND, alq->aq_cred);
377267843Sdelphij	VOP_UNLOCK(vp, 0);
378267843Sdelphij	vn_finished_write(mp);
379267843Sdelphij
380267843Sdelphij	ALQ_LOCK(alq);
381267843Sdelphij	alq->aq_flags &= ~AQ_FLUSHING;
382267843Sdelphij
383267843Sdelphij	/* Adjust writetail as required, taking into account wrapping. */
384267843Sdelphij	alq->aq_writetail = (alq->aq_writetail + totlen + wrapearly) %
385267843Sdelphij	    alq->aq_buflen;
386267843Sdelphij	alq->aq_freebytes += totlen + wrapearly;
387267843Sdelphij
388267843Sdelphij	/*
389267843Sdelphij	 * If we just flushed part of the buffer which wrapped, reset the
390267843Sdelphij	 * wrapearly indicator.
391267843Sdelphij	 */
392267843Sdelphij	if (wrapearly)
393267843Sdelphij		alq->aq_wrapearly = 0;
394267843Sdelphij
395267843Sdelphij	/*
396267843Sdelphij	 * If we just flushed the buffer completely, reset indexes to 0 to
397267843Sdelphij	 * minimise buffer wraps.
398267843Sdelphij	 * This is also required to ensure alq_getn() can't wedge itself.
399267843Sdelphij	 */
400267843Sdelphij	if (!HAS_PENDING_DATA(alq))
401267843Sdelphij		alq->aq_writehead = alq->aq_writetail = 0;
402267843Sdelphij
403267843Sdelphij	KASSERT((alq->aq_writetail >= 0 && alq->aq_writetail < alq->aq_buflen),
404267843Sdelphij	    ("%s: aq_writetail < 0 || aq_writetail >= aq_buflen", __func__));
405267843Sdelphij
406267843Sdelphij	if (alq->aq_flags & AQ_WANTED) {
407267843Sdelphij		alq->aq_flags &= ~AQ_WANTED;
408267843Sdelphij		return (1);
409267843Sdelphij	}
410267843Sdelphij
411267843Sdelphij	return(0);
412267843Sdelphij}
413267843Sdelphij
414267843Sdelphijstatic struct kproc_desc ald_kp = {
415267843Sdelphij        "ALQ Daemon",
416267843Sdelphij        ald_daemon,
417267843Sdelphij        &ald_proc
418267843Sdelphij};
419267843Sdelphij
420267843SdelphijSYSINIT(aldthread, SI_SUB_KTHREAD_IDLE, SI_ORDER_ANY, kproc_start, &ald_kp);
421267843SdelphijSYSINIT(ald, SI_SUB_LOCK, SI_ORDER_ANY, ald_startup, NULL);
422267843Sdelphij
423267843Sdelphij
424267843Sdelphij/* User visible queue functions */
425267843Sdelphij
426267843Sdelphij/*
427267843Sdelphij * Create the queue data structure, allocate the buffer, and open the file.
428267843Sdelphij */
429267843Sdelphij
430267843Sdelphijint
431267843Sdelphijalq_open_flags(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
432267843Sdelphij    int size, int flags)
433267843Sdelphij{
434267843Sdelphij	struct thread *td;
435267843Sdelphij	struct nameidata nd;
436267843Sdelphij	struct alq *alq;
437267843Sdelphij	int oflags;
438267843Sdelphij	int error;
439267843Sdelphij
440267843Sdelphij	KASSERT((size > 0), ("%s: size <= 0", __func__));
441267843Sdelphij
442267843Sdelphij	*alqp = NULL;
443267843Sdelphij	td = curthread;
444267843Sdelphij
445267843Sdelphij	NDINIT(&nd, LOOKUP, NOFOLLOW, UIO_SYSSPACE, file, td);
446267843Sdelphij	oflags = FWRITE | O_NOFOLLOW | O_CREAT;
447267843Sdelphij
448267843Sdelphij	error = vn_open_cred(&nd, &oflags, cmode, 0, cred, NULL);
449267843Sdelphij	if (error)
450267843Sdelphij		return (error);
451267843Sdelphij
452267843Sdelphij	NDFREE(&nd, NDF_ONLY_PNBUF);
453267843Sdelphij	/* We just unlock so we hold a reference */
454267843Sdelphij	VOP_UNLOCK(nd.ni_vp, 0);
455267843Sdelphij
456267843Sdelphij	alq = malloc(sizeof(*alq), M_ALD, M_WAITOK|M_ZERO);
457267843Sdelphij	alq->aq_vp = nd.ni_vp;
458267843Sdelphij	alq->aq_cred = crhold(cred);
459267843Sdelphij
460267843Sdelphij	mtx_init(&alq->aq_mtx, "ALD Queue", NULL, MTX_SPIN|MTX_QUIET);
461267843Sdelphij
462267843Sdelphij	alq->aq_buflen = size;
463267843Sdelphij	alq->aq_entmax = 0;
464267843Sdelphij	alq->aq_entlen = 0;
465267843Sdelphij
466267843Sdelphij	alq->aq_freebytes = alq->aq_buflen;
467267843Sdelphij	alq->aq_entbuf = malloc(alq->aq_buflen, M_ALD, M_WAITOK|M_ZERO);
468267843Sdelphij	alq->aq_writehead = alq->aq_writetail = 0;
469267843Sdelphij	if (flags & ALQ_ORDERED)
470267843Sdelphij		alq->aq_flags |= AQ_ORDERED;
471267843Sdelphij
472267843Sdelphij	if ((error = ald_add(alq)) != 0) {
473267843Sdelphij		alq_destroy(alq);
474267843Sdelphij		return (error);
475267843Sdelphij	}
476267843Sdelphij
477267843Sdelphij	*alqp = alq;
478267843Sdelphij
479267843Sdelphij	return (0);
480267843Sdelphij}
481267843Sdelphij
482267843Sdelphijint
483267843Sdelphijalq_open(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
484267843Sdelphij    int size, int count)
485267843Sdelphij{
486267843Sdelphij	int ret;
487267843Sdelphij
488267843Sdelphij	KASSERT((count >= 0), ("%s: count < 0", __func__));
489267843Sdelphij
490267843Sdelphij	if (count > 0) {
491267843Sdelphij		if ((ret = alq_open_flags(alqp, file, cred, cmode,
492267843Sdelphij		    size*count, 0)) == 0) {
493267843Sdelphij			(*alqp)->aq_flags |= AQ_LEGACY;
494267843Sdelphij			(*alqp)->aq_entmax = count;
495267843Sdelphij			(*alqp)->aq_entlen = size;
496267843Sdelphij		}
497267843Sdelphij	} else
498267843Sdelphij		ret = alq_open_flags(alqp, file, cred, cmode, size, 0);
499267843Sdelphij
500267843Sdelphij	return (ret);
501267843Sdelphij}
502267843Sdelphij
503267843Sdelphij
504267843Sdelphij/*
505267843Sdelphij * Copy a new entry into the queue.  If the operation would block either
506267843Sdelphij * wait or return an error depending on the value of waitok.
507267843Sdelphij */
508267843Sdelphijint
509267843Sdelphijalq_writen(struct alq *alq, void *data, int len, int flags)
510267843Sdelphij{
511267843Sdelphij	int activate, copy, ret;
512267843Sdelphij	void *waitchan;
513267843Sdelphij
514267843Sdelphij	KASSERT((len > 0 && len <= alq->aq_buflen),
515267843Sdelphij	    ("%s: len <= 0 || len > aq_buflen", __func__));
516267843Sdelphij
517267843Sdelphij	activate = ret = 0;
518267843Sdelphij	copy = len;
519267843Sdelphij	waitchan = NULL;
520267843Sdelphij
521267843Sdelphij	ALQ_LOCK(alq);
522267843Sdelphij
523267843Sdelphij	/*
524267843Sdelphij	 * Fail to perform the write and return EWOULDBLOCK if:
525267843Sdelphij	 * - The message is larger than our underlying buffer.
526267843Sdelphij	 * - The ALQ is being shutdown.
527267843Sdelphij	 * - There is insufficient free space in our underlying buffer
528267843Sdelphij	 *   to accept the message and the user can't wait for space.
529267843Sdelphij	 * - There is insufficient free space in our underlying buffer
530267843Sdelphij	 *   to accept the message and the alq is inactive due to prior
531267843Sdelphij	 *   use of the ALQ_NOACTIVATE flag (which would lead to deadlock).
532267843Sdelphij	 */
533267843Sdelphij	if (len > alq->aq_buflen ||
534267843Sdelphij	    alq->aq_flags & AQ_SHUTDOWN ||
535267843Sdelphij	    (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) &&
536267843Sdelphij	    HAS_PENDING_DATA(alq))) && alq->aq_freebytes < len)) {
537267843Sdelphij		ALQ_UNLOCK(alq);
538267843Sdelphij		return (EWOULDBLOCK);
539267843Sdelphij	}
540267843Sdelphij
541267843Sdelphij	/*
542267843Sdelphij	 * If we want ordered writes and there is already at least one thread
543267843Sdelphij	 * waiting for resources to become available, sleep until we're woken.
544267843Sdelphij	 */
545267843Sdelphij	if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) {
546267843Sdelphij		KASSERT(!(flags & ALQ_NOWAIT),
547267843Sdelphij		    ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
548267843Sdelphij		alq->aq_waiters++;
549267843Sdelphij		msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqwnord", 0);
550267843Sdelphij		alq->aq_waiters--;
551267843Sdelphij	}
552267843Sdelphij
553267843Sdelphij	/*
554267843Sdelphij	 * (ALQ_WAITOK && aq_freebytes < len) or aq_freebytes >= len, either
555267843Sdelphij	 * enter while loop and sleep until we have enough free bytes (former)
556267843Sdelphij	 * or skip (latter). If AQ_ORDERED is set, only 1 thread at a time will
557267843Sdelphij	 * be in this loop. Otherwise, multiple threads may be sleeping here
558267843Sdelphij	 * competing for ALQ resources.
559267843Sdelphij	 */
560267843Sdelphij	while (alq->aq_freebytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) {
561267843Sdelphij		KASSERT(!(flags & ALQ_NOWAIT),
562267843Sdelphij		    ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
563267843Sdelphij		alq->aq_flags |= AQ_WANTED;
564267843Sdelphij		alq->aq_waiters++;
565267843Sdelphij		if (waitchan)
566267843Sdelphij			wakeup(waitchan);
567267843Sdelphij		msleep_spin(alq, &alq->aq_mtx, "alqwnres", 0);
568267843Sdelphij		alq->aq_waiters--;
569267843Sdelphij
570267843Sdelphij		/*
571267843Sdelphij		 * If we're the first thread to wake after an AQ_WANTED wakeup
572267843Sdelphij		 * but there isn't enough free space for us, we're going to loop
573267843Sdelphij		 * and sleep again. If there are other threads waiting in this
574267843Sdelphij		 * loop, schedule a wakeup so that they can see if the space
575267843Sdelphij		 * they require is available.
576267843Sdelphij		 */
577267843Sdelphij		if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) &&
578267843Sdelphij		    alq->aq_freebytes < len && !(alq->aq_flags & AQ_WANTED))
579267843Sdelphij			waitchan = alq;
580267843Sdelphij		else
581267843Sdelphij			waitchan = NULL;
582267843Sdelphij	}
583267843Sdelphij
584267843Sdelphij	/*
585267843Sdelphij	 * If there are waiters, we need to signal the waiting threads after we
586267843Sdelphij	 * complete our work. The alq ptr is used as a wait channel for threads
587267843Sdelphij	 * requiring resources to be freed up. In the AQ_ORDERED case, threads
588267843Sdelphij	 * are not allowed to concurrently compete for resources in the above
589267843Sdelphij	 * while loop, so we use a different wait channel in this case.
590267843Sdelphij	 */
591267843Sdelphij	if (alq->aq_waiters > 0) {
592267843Sdelphij		if (alq->aq_flags & AQ_ORDERED)
593267843Sdelphij			waitchan = &alq->aq_waiters;
594267843Sdelphij		else
595267843Sdelphij			waitchan = alq;
596267843Sdelphij	} else
597267843Sdelphij		waitchan = NULL;
598267843Sdelphij
599267843Sdelphij	/* Bail if we're shutting down. */
600267843Sdelphij	if (alq->aq_flags & AQ_SHUTDOWN) {
601267843Sdelphij		ret = EWOULDBLOCK;
602267843Sdelphij		goto unlock;
603267843Sdelphij	}
604267843Sdelphij
605267843Sdelphij	/*
606267843Sdelphij	 * If we need to wrap the buffer to accommodate the write,
607267843Sdelphij	 * we'll need 2 calls to bcopy.
608267843Sdelphij	 */
609267843Sdelphij	if ((alq->aq_buflen - alq->aq_writehead) < len)
610267843Sdelphij		copy = alq->aq_buflen - alq->aq_writehead;
611267843Sdelphij
612267843Sdelphij	/* Copy message (or part thereof if wrap required) to the buffer. */
613267843Sdelphij	bcopy(data, alq->aq_entbuf + alq->aq_writehead, copy);
614267843Sdelphij	alq->aq_writehead += copy;
615267843Sdelphij
616267843Sdelphij	if (alq->aq_writehead >= alq->aq_buflen) {
617267843Sdelphij		KASSERT((alq->aq_writehead == alq->aq_buflen),
618267843Sdelphij		    ("%s: alq->aq_writehead (%d) > alq->aq_buflen (%d)",
619267843Sdelphij		    __func__,
620267843Sdelphij		    alq->aq_writehead,
621267843Sdelphij		    alq->aq_buflen));
622267843Sdelphij		alq->aq_writehead = 0;
623267843Sdelphij	}
624267843Sdelphij
625267843Sdelphij	if (copy != len) {
626267843Sdelphij		/*
627267843Sdelphij		 * Wrap the buffer by copying the remainder of our message
628267843Sdelphij		 * to the start of the buffer and resetting aq_writehead.
629267843Sdelphij		 */
630267843Sdelphij		bcopy(((uint8_t *)data)+copy, alq->aq_entbuf, len - copy);
631267843Sdelphij		alq->aq_writehead = len - copy;
632267843Sdelphij	}
633267843Sdelphij
634267843Sdelphij	KASSERT((alq->aq_writehead >= 0 && alq->aq_writehead < alq->aq_buflen),
635267843Sdelphij	    ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen", __func__));
636267843Sdelphij
637267843Sdelphij	alq->aq_freebytes -= len;
638267843Sdelphij
639267843Sdelphij	if (!(alq->aq_flags & AQ_ACTIVE) && !(flags & ALQ_NOACTIVATE)) {
640267843Sdelphij		alq->aq_flags |= AQ_ACTIVE;
641267843Sdelphij		activate = 1;
642267843Sdelphij	}
643267843Sdelphij
644267843Sdelphij	KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
645267843Sdelphij
646267843Sdelphijunlock:
647267843Sdelphij	ALQ_UNLOCK(alq);
648267843Sdelphij
649267843Sdelphij	if (activate) {
650267843Sdelphij		ALD_LOCK();
651267843Sdelphij		ald_activate(alq);
652267843Sdelphij		ALD_UNLOCK();
653267843Sdelphij	}
654267843Sdelphij
655267843Sdelphij	/* NB: We rely on wakeup_one waking threads in a FIFO manner. */
656267843Sdelphij	if (waitchan != NULL)
657267843Sdelphij		wakeup_one(waitchan);
658267843Sdelphij
659267843Sdelphij	return (ret);
660267843Sdelphij}
661267843Sdelphij
662267843Sdelphijint
663267843Sdelphijalq_write(struct alq *alq, void *data, int flags)
664267843Sdelphij{
665267843Sdelphij	/* Should only be called in fixed length message (legacy) mode. */
666267843Sdelphij	KASSERT((alq->aq_flags & AQ_LEGACY),
667267843Sdelphij	    ("%s: fixed length write on variable length queue", __func__));
668267843Sdelphij	return (alq_writen(alq, data, alq->aq_entlen, flags));
669267843Sdelphij}
670267843Sdelphij
671267843Sdelphij/*
672267843Sdelphij * Retrieve a pointer for the ALQ to write directly into, avoiding bcopy.
673267843Sdelphij */
674267843Sdelphijstruct ale *
675267843Sdelphijalq_getn(struct alq *alq, int len, int flags)
676267843Sdelphij{
677267843Sdelphij	int contigbytes;
678267843Sdelphij	void *waitchan;
679267843Sdelphij
680267843Sdelphij	KASSERT((len > 0 && len <= alq->aq_buflen),
681267843Sdelphij	    ("%s: len <= 0 || len > alq->aq_buflen", __func__));
682267843Sdelphij
683267843Sdelphij	waitchan = NULL;
684267843Sdelphij
685267843Sdelphij	ALQ_LOCK(alq);
686267843Sdelphij
687267843Sdelphij	/*
688267843Sdelphij	 * Determine the number of free contiguous bytes.
689267843Sdelphij	 * We ensure elsewhere that if aq_writehead == aq_writetail because
690267843Sdelphij	 * the buffer is empty, they will both be set to 0 and therefore
691267843Sdelphij	 * aq_freebytes == aq_buflen and is fully contiguous.
692267843Sdelphij	 * If they are equal and the buffer is not empty, aq_freebytes will
693267843Sdelphij	 * be 0 indicating the buffer is full.
694267843Sdelphij	 */
695267843Sdelphij	if (alq->aq_writehead <= alq->aq_writetail)
696267843Sdelphij		contigbytes = alq->aq_freebytes;
697267843Sdelphij	else {
698267843Sdelphij		contigbytes = alq->aq_buflen - alq->aq_writehead;
699267843Sdelphij
700267843Sdelphij		if (contigbytes < len) {
701267843Sdelphij			/*
702267843Sdelphij			 * Insufficient space at end of buffer to handle a
703267843Sdelphij			 * contiguous write. Wrap early if there's space at
704267843Sdelphij			 * the beginning. This will leave a hole at the end
705267843Sdelphij			 * of the buffer which we will have to skip over when
706267843Sdelphij			 * flushing the buffer to disk.
707267843Sdelphij			 */
708267843Sdelphij			if (alq->aq_writetail >= len || flags & ALQ_WAITOK) {
709267843Sdelphij				/* Keep track of # bytes left blank. */
710267843Sdelphij				alq->aq_wrapearly = contigbytes;
711267843Sdelphij				/* Do the wrap and adjust counters. */
712267843Sdelphij				contigbytes = alq->aq_freebytes =
713267843Sdelphij				    alq->aq_writetail;
714267843Sdelphij				alq->aq_writehead = 0;
715267843Sdelphij			}
716267843Sdelphij		}
717267843Sdelphij	}
718267843Sdelphij
719267843Sdelphij	/*
720267843Sdelphij	 * Return a NULL ALE if:
721267843Sdelphij	 * - The message is larger than our underlying buffer.
722267843Sdelphij	 * - The ALQ is being shutdown.
723267843Sdelphij	 * - There is insufficient free space in our underlying buffer
724267843Sdelphij	 *   to accept the message and the user can't wait for space.
725267843Sdelphij	 * - There is insufficient free space in our underlying buffer
726267843Sdelphij	 *   to accept the message and the alq is inactive due to prior
727267843Sdelphij	 *   use of the ALQ_NOACTIVATE flag (which would lead to deadlock).
728267843Sdelphij	 */
729267843Sdelphij	if (len > alq->aq_buflen ||
730267843Sdelphij	    alq->aq_flags & AQ_SHUTDOWN ||
731267843Sdelphij	    (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) &&
732267843Sdelphij	    HAS_PENDING_DATA(alq))) && contigbytes < len)) {
733267843Sdelphij		ALQ_UNLOCK(alq);
734267843Sdelphij		return (NULL);
735267843Sdelphij	}
736267843Sdelphij
737267843Sdelphij	/*
738267843Sdelphij	 * If we want ordered writes and there is already at least one thread
739267843Sdelphij	 * waiting for resources to become available, sleep until we're woken.
740267843Sdelphij	 */
741267843Sdelphij	if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) {
742267843Sdelphij		KASSERT(!(flags & ALQ_NOWAIT),
743267843Sdelphij		    ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
744267843Sdelphij		alq->aq_waiters++;
745267843Sdelphij		msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqgnord", 0);
746267843Sdelphij		alq->aq_waiters--;
747267843Sdelphij	}
748267843Sdelphij
749267843Sdelphij	/*
750267843Sdelphij	 * (ALQ_WAITOK && contigbytes < len) or contigbytes >= len, either enter
751267843Sdelphij	 * while loop and sleep until we have enough contiguous free bytes
752267843Sdelphij	 * (former) or skip (latter). If AQ_ORDERED is set, only 1 thread at a
753267843Sdelphij	 * time will be in this loop. Otherwise, multiple threads may be
754267843Sdelphij	 * sleeping here competing for ALQ resources.
755267843Sdelphij	 */
756267843Sdelphij	while (contigbytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) {
757267843Sdelphij		KASSERT(!(flags & ALQ_NOWAIT),
758267843Sdelphij		    ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
759267843Sdelphij		alq->aq_flags |= AQ_WANTED;
760267843Sdelphij		alq->aq_waiters++;
761267843Sdelphij		if (waitchan)
762267843Sdelphij			wakeup(waitchan);
763267843Sdelphij		msleep_spin(alq, &alq->aq_mtx, "alqgnres", 0);
764267843Sdelphij		alq->aq_waiters--;
765267843Sdelphij
766267843Sdelphij		if (alq->aq_writehead <= alq->aq_writetail)
767267843Sdelphij			contigbytes = alq->aq_freebytes;
768267843Sdelphij		else
769267843Sdelphij			contigbytes = alq->aq_buflen - alq->aq_writehead;
770267843Sdelphij
771267843Sdelphij		/*
772267843Sdelphij		 * If we're the first thread to wake after an AQ_WANTED wakeup
773267843Sdelphij		 * but there isn't enough free space for us, we're going to loop
774267843Sdelphij		 * and sleep again. If there are other threads waiting in this
775267843Sdelphij		 * loop, schedule a wakeup so that they can see if the space
776267843Sdelphij		 * they require is available.
777267843Sdelphij		 */
778267843Sdelphij		if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) &&
779267843Sdelphij		    contigbytes < len && !(alq->aq_flags & AQ_WANTED))
780267843Sdelphij			waitchan = alq;
781267843Sdelphij		else
782267843Sdelphij			waitchan = NULL;
783267843Sdelphij	}
784267843Sdelphij
785267843Sdelphij	/*
786267843Sdelphij	 * If there are waiters, we need to signal the waiting threads after we
787267843Sdelphij	 * complete our work. The alq ptr is used as a wait channel for threads
788267843Sdelphij	 * requiring resources to be freed up. In the AQ_ORDERED case, threads
789267843Sdelphij	 * are not allowed to concurrently compete for resources in the above
790267843Sdelphij	 * while loop, so we use a different wait channel in this case.
791267843Sdelphij	 */
792267843Sdelphij	if (alq->aq_waiters > 0) {
793267843Sdelphij		if (alq->aq_flags & AQ_ORDERED)
794267843Sdelphij			waitchan = &alq->aq_waiters;
795267843Sdelphij		else
796267843Sdelphij			waitchan = alq;
797267843Sdelphij	} else
798267843Sdelphij		waitchan = NULL;
799267843Sdelphij
800267843Sdelphij	/* Bail if we're shutting down. */
801267843Sdelphij	if (alq->aq_flags & AQ_SHUTDOWN) {
802267843Sdelphij		ALQ_UNLOCK(alq);
803267843Sdelphij		if (waitchan != NULL)
804267843Sdelphij			wakeup_one(waitchan);
805267843Sdelphij		return (NULL);
806267843Sdelphij	}
807267843Sdelphij
808267843Sdelphij	/*
809267843Sdelphij	 * If we are here, we have a contiguous number of bytes >= len
810267843Sdelphij	 * available in our buffer starting at aq_writehead.
811267843Sdelphij	 */
812267843Sdelphij	alq->aq_getpost.ae_data = alq->aq_entbuf + alq->aq_writehead;
813267843Sdelphij	alq->aq_getpost.ae_bytesused = len;
814267843Sdelphij
815267843Sdelphij	return (&alq->aq_getpost);
816267843Sdelphij}
817267843Sdelphij
818267843Sdelphijstruct ale *
819267843Sdelphijalq_get(struct alq *alq, int flags)
820267843Sdelphij{
821267843Sdelphij	/* Should only be called in fixed length message (legacy) mode. */
822267843Sdelphij	KASSERT((alq->aq_flags & AQ_LEGACY),
823267843Sdelphij	    ("%s: fixed length get on variable length queue", __func__));
824267843Sdelphij	return (alq_getn(alq, alq->aq_entlen, flags));
825267843Sdelphij}
826267843Sdelphij
827267843Sdelphijvoid
828267843Sdelphijalq_post_flags(struct alq *alq, struct ale *ale, int flags)
829267843Sdelphij{
830267843Sdelphij	int activate;
831267843Sdelphij	void *waitchan;
832267843Sdelphij
833267843Sdelphij	activate = 0;
834267843Sdelphij
835267843Sdelphij	if (ale->ae_bytesused > 0) {
836267843Sdelphij		if (!(alq->aq_flags & AQ_ACTIVE) &&
837267843Sdelphij		    !(flags & ALQ_NOACTIVATE)) {
838267843Sdelphij			alq->aq_flags |= AQ_ACTIVE;
839267843Sdelphij			activate = 1;
840267843Sdelphij		}
841267843Sdelphij
842267843Sdelphij		alq->aq_writehead += ale->ae_bytesused;
843267843Sdelphij		alq->aq_freebytes -= ale->ae_bytesused;
844267843Sdelphij
845267843Sdelphij		/* Wrap aq_writehead if we filled to the end of the buffer. */
846267843Sdelphij		if (alq->aq_writehead == alq->aq_buflen)
847267843Sdelphij			alq->aq_writehead = 0;
848267843Sdelphij
849267843Sdelphij		KASSERT((alq->aq_writehead >= 0 &&
850267843Sdelphij		    alq->aq_writehead < alq->aq_buflen),
851267843Sdelphij		    ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen",
852267843Sdelphij		    __func__));
853267843Sdelphij
854267843Sdelphij		KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
855267843Sdelphij	}
856267843Sdelphij
857267843Sdelphij	/*
858267843Sdelphij	 * If there are waiters, we need to signal the waiting threads after we
859267843Sdelphij	 * complete our work. The alq ptr is used as a wait channel for threads
860267843Sdelphij	 * requiring resources to be freed up. In the AQ_ORDERED case, threads
861267843Sdelphij	 * are not allowed to concurrently compete for resources in the
862267843Sdelphij	 * alq_getn() while loop, so we use a different wait channel in this case.
863267843Sdelphij	 */
864267843Sdelphij	if (alq->aq_waiters > 0) {
865267843Sdelphij		if (alq->aq_flags & AQ_ORDERED)
866267843Sdelphij			waitchan = &alq->aq_waiters;
867267843Sdelphij		else
868267843Sdelphij			waitchan = alq;
869267843Sdelphij	} else
870267843Sdelphij		waitchan = NULL;
871267843Sdelphij
872267843Sdelphij	ALQ_UNLOCK(alq);
873267843Sdelphij
874267843Sdelphij	if (activate) {
875267843Sdelphij		ALD_LOCK();
876267843Sdelphij		ald_activate(alq);
877267843Sdelphij		ALD_UNLOCK();
878267843Sdelphij	}
879267843Sdelphij
880267843Sdelphij	/* NB: We rely on wakeup_one waking threads in a FIFO manner. */
881267843Sdelphij	if (waitchan != NULL)
882267843Sdelphij		wakeup_one(waitchan);
883267843Sdelphij}
884267843Sdelphij
885267843Sdelphijvoid
886267843Sdelphijalq_flush(struct alq *alq)
887267843Sdelphij{
888267843Sdelphij	int needwakeup = 0;
889267843Sdelphij
890267843Sdelphij	ALD_LOCK();
891267843Sdelphij	ALQ_LOCK(alq);
892267843Sdelphij
893267843Sdelphij	/*
894267843Sdelphij	 * Pull the lever iff there is data to flush and we're
895267843Sdelphij	 * not already in the middle of a flush operation.
896267843Sdelphij	 */
897267843Sdelphij	if (HAS_PENDING_DATA(alq) && !(alq->aq_flags & AQ_FLUSHING)) {
898267843Sdelphij		if (alq->aq_flags & AQ_ACTIVE)
899267843Sdelphij			ald_deactivate(alq);
900267843Sdelphij
901267843Sdelphij		ALD_UNLOCK();
902267843Sdelphij		needwakeup = alq_doio(alq);
903267843Sdelphij	} else
904267843Sdelphij		ALD_UNLOCK();
905267843Sdelphij
906267843Sdelphij	ALQ_UNLOCK(alq);
907267843Sdelphij
908267843Sdelphij	if (needwakeup)
909267843Sdelphij		wakeup_one(alq);
910267843Sdelphij}
911267843Sdelphij
912267843Sdelphij/*
913267843Sdelphij * Flush remaining data, close the file and free all resources.
914267843Sdelphij */
915267843Sdelphijvoid
916267843Sdelphijalq_close(struct alq *alq)
917267843Sdelphij{
918267843Sdelphij	/* Only flush and destroy alq if not already shutting down. */
919267843Sdelphij	if (ald_rem(alq) == 0)
920267843Sdelphij		alq_destroy(alq);
921267843Sdelphij}
922267843Sdelphij
923267843Sdelphijstatic int
924267843Sdelphijalq_load_handler(module_t mod, int what, void *arg)
925267843Sdelphij{
926267843Sdelphij	int ret;
927267843Sdelphij
928267843Sdelphij	ret = 0;
929267843Sdelphij
930267843Sdelphij	switch (what) {
931267843Sdelphij	case MOD_LOAD:
932267843Sdelphij	case MOD_SHUTDOWN:
933267843Sdelphij		break;
934267843Sdelphij
935267843Sdelphij	case MOD_QUIESCE:
936267843Sdelphij		ALD_LOCK();
937267843Sdelphij		/* Only allow unload if there are no open queues. */
938267843Sdelphij		if (LIST_FIRST(&ald_queues) == NULL) {
939267843Sdelphij			ald_shutingdown = 1;
940267843Sdelphij			ALD_UNLOCK();
941267843Sdelphij			EVENTHANDLER_DEREGISTER(shutdown_pre_sync,
942267843Sdelphij			    alq_eventhandler_tag);
943267843Sdelphij			ald_shutdown(NULL, 0);
944267843Sdelphij			mtx_destroy(&ald_mtx);
945267843Sdelphij		} else {
946267843Sdelphij			ALD_UNLOCK();
947267843Sdelphij			ret = EBUSY;
948267843Sdelphij		}
949267843Sdelphij		break;
950267843Sdelphij
951267843Sdelphij	case MOD_UNLOAD:
952267843Sdelphij		/* If MOD_QUIESCE failed we must fail here too. */
953267843Sdelphij		if (ald_shutingdown == 0)
954267843Sdelphij			ret = EBUSY;
955267843Sdelphij		break;
956267843Sdelphij
957267843Sdelphij	default:
958267843Sdelphij		ret = EINVAL;
959267843Sdelphij		break;
960267843Sdelphij	}
961267843Sdelphij
962267843Sdelphij	return (ret);
963267843Sdelphij}
964267843Sdelphij
965267843Sdelphijstatic moduledata_t alq_mod =
966267843Sdelphij{
967267843Sdelphij	"alq",
968267843Sdelphij	alq_load_handler,
969267843Sdelphij	NULL
970267843Sdelphij};
971267843Sdelphij
972267843SdelphijDECLARE_MODULE(alq, alq_mod, SI_SUB_SMP, SI_ORDER_ANY);
973267843SdelphijMODULE_VERSION(alq, 1);
974267843Sdelphij