1/*-
2 * SPDX-License-Identifier: BSD-2-Clause
3 *
4 * Copyright (c) 2000 Doug Rabson
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 * 1. Redistributions of source code must retain the above copyright
11 *    notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 *    notice, this list of conditions and the following disclaimer in the
14 *    documentation and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26 * SUCH DAMAGE.
27 */
28
29#include <sys/param.h>
30#include <sys/systm.h>
31#include <sys/bus.h>
32#include <sys/cpuset.h>
33#include <sys/interrupt.h>
34#include <sys/kernel.h>
35#include <sys/kthread.h>
36#include <sys/libkern.h>
37#include <sys/limits.h>
38#include <sys/lock.h>
39#include <sys/malloc.h>
40#include <sys/mutex.h>
41#include <sys/proc.h>
42#include <sys/epoch.h>
43#include <sys/sched.h>
44#include <sys/smp.h>
45#include <sys/taskqueue.h>
46#include <sys/unistd.h>
47#include <machine/stdarg.h>
48
49static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
50static void	*taskqueue_giant_ih;
51static void	*taskqueue_ih;
52static void	 taskqueue_fast_enqueue(void *);
53static void	 taskqueue_swi_enqueue(void *);
54static void	 taskqueue_swi_giant_enqueue(void *);
55
56struct taskqueue_busy {
57	struct task		*tb_running;
58	u_int			 tb_seq;
59	bool			 tb_canceling;
60	LIST_ENTRY(taskqueue_busy) tb_link;
61};
62
63struct taskqueue {
64	STAILQ_HEAD(, task)	tq_queue;
65	LIST_HEAD(, taskqueue_busy) tq_active;
66	struct task		*tq_hint;
67	u_int			tq_seq;
68	int			tq_callouts;
69	struct mtx_padalign	tq_mutex;
70	taskqueue_enqueue_fn	tq_enqueue;
71	void			*tq_context;
72	char			*tq_name;
73	struct thread		**tq_threads;
74	int			tq_tcount;
75	int			tq_spin;
76	int			tq_flags;
77	taskqueue_callback_fn	tq_callbacks[TASKQUEUE_NUM_CALLBACKS];
78	void			*tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS];
79};
80
81#define	TQ_FLAGS_ACTIVE		(1 << 0)
82#define	TQ_FLAGS_BLOCKED	(1 << 1)
83#define	TQ_FLAGS_UNLOCKED_ENQUEUE	(1 << 2)
84
85#define	DT_CALLOUT_ARMED	(1 << 0)
86#define	DT_DRAIN_IN_PROGRESS	(1 << 1)
87
88#define	TQ_LOCK(tq)							\
89	do {								\
90		if ((tq)->tq_spin)					\
91			mtx_lock_spin(&(tq)->tq_mutex);			\
92		else							\
93			mtx_lock(&(tq)->tq_mutex);			\
94	} while (0)
95#define	TQ_ASSERT_LOCKED(tq)	mtx_assert(&(tq)->tq_mutex, MA_OWNED)
96
97#define	TQ_UNLOCK(tq)							\
98	do {								\
99		if ((tq)->tq_spin)					\
100			mtx_unlock_spin(&(tq)->tq_mutex);		\
101		else							\
102			mtx_unlock(&(tq)->tq_mutex);			\
103	} while (0)
104#define	TQ_ASSERT_UNLOCKED(tq)	mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED)
105
106void
107_timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task,
108    int priority, task_fn_t func, void *context)
109{
110
111	TASK_INIT(&timeout_task->t, priority, func, context);
112	callout_init_mtx(&timeout_task->c, &queue->tq_mutex,
113	    CALLOUT_RETURNUNLOCKED);
114	timeout_task->q = queue;
115	timeout_task->f = 0;
116}
117
118static __inline int
119TQ_SLEEP(struct taskqueue *tq, void *p, const char *wm)
120{
121	if (tq->tq_spin)
122		return (msleep_spin(p, (struct mtx *)&tq->tq_mutex, wm, 0));
123	return (msleep(p, &tq->tq_mutex, 0, wm, 0));
124}
125
126static struct taskqueue_busy *
127task_get_busy(struct taskqueue *queue, struct task *task)
128{
129	struct taskqueue_busy *tb;
130
131	TQ_ASSERT_LOCKED(queue);
132	LIST_FOREACH(tb, &queue->tq_active, tb_link) {
133		if (tb->tb_running == task)
134			return (tb);
135	}
136	return (NULL);
137}
138
139static struct taskqueue *
140_taskqueue_create(const char *name, int mflags,
141		 taskqueue_enqueue_fn enqueue, void *context,
142		 int mtxflags, const char *mtxname __unused)
143{
144	struct taskqueue *queue;
145	char *tq_name;
146
147	tq_name = malloc(TASKQUEUE_NAMELEN, M_TASKQUEUE, mflags | M_ZERO);
148	if (tq_name == NULL)
149		return (NULL);
150
151	queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
152	if (queue == NULL) {
153		free(tq_name, M_TASKQUEUE);
154		return (NULL);
155	}
156
157	snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue");
158
159	STAILQ_INIT(&queue->tq_queue);
160	LIST_INIT(&queue->tq_active);
161	queue->tq_enqueue = enqueue;
162	queue->tq_context = context;
163	queue->tq_name = tq_name;
164	queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
165	queue->tq_flags |= TQ_FLAGS_ACTIVE;
166	if (enqueue == taskqueue_fast_enqueue ||
167	    enqueue == taskqueue_swi_enqueue ||
168	    enqueue == taskqueue_swi_giant_enqueue ||
169	    enqueue == taskqueue_thread_enqueue)
170		queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE;
171	mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags);
172
173	return (queue);
174}
175
176struct taskqueue *
177taskqueue_create(const char *name, int mflags,
178		 taskqueue_enqueue_fn enqueue, void *context)
179{
180
181	return _taskqueue_create(name, mflags, enqueue, context,
182			MTX_DEF, name);
183}
184
185void
186taskqueue_set_callback(struct taskqueue *queue,
187    enum taskqueue_callback_type cb_type, taskqueue_callback_fn callback,
188    void *context)
189{
190
191	KASSERT(((cb_type >= TASKQUEUE_CALLBACK_TYPE_MIN) &&
192	    (cb_type <= TASKQUEUE_CALLBACK_TYPE_MAX)),
193	    ("Callback type %d not valid, must be %d-%d", cb_type,
194	    TASKQUEUE_CALLBACK_TYPE_MIN, TASKQUEUE_CALLBACK_TYPE_MAX));
195	KASSERT((queue->tq_callbacks[cb_type] == NULL),
196	    ("Re-initialization of taskqueue callback?"));
197
198	queue->tq_callbacks[cb_type] = callback;
199	queue->tq_cb_contexts[cb_type] = context;
200}
201
202/*
203 * Signal a taskqueue thread to terminate.
204 */
205static void
206taskqueue_terminate(struct thread **pp, struct taskqueue *tq)
207{
208
209	while (tq->tq_tcount > 0 || tq->tq_callouts > 0) {
210		wakeup(tq);
211		TQ_SLEEP(tq, pp, "tq_destroy");
212	}
213}
214
215void
216taskqueue_free(struct taskqueue *queue)
217{
218
219	TQ_LOCK(queue);
220	queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
221	taskqueue_terminate(queue->tq_threads, queue);
222	KASSERT(LIST_EMPTY(&queue->tq_active), ("Tasks still running?"));
223	KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks"));
224	mtx_destroy(&queue->tq_mutex);
225	free(queue->tq_threads, M_TASKQUEUE);
226	free(queue->tq_name, M_TASKQUEUE);
227	free(queue, M_TASKQUEUE);
228}
229
230static int
231taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task, int flags)
232{
233	struct task *ins;
234	struct task *prev;
235	struct taskqueue_busy *tb;
236
237	KASSERT(task->ta_func != NULL, ("enqueueing task with NULL func"));
238	/*
239	 * Ignore canceling task if requested.
240	 */
241	if (__predict_false((flags & TASKQUEUE_FAIL_IF_CANCELING) != 0)) {
242		tb = task_get_busy(queue, task);
243		if (tb != NULL && tb->tb_canceling) {
244			TQ_UNLOCK(queue);
245			return (ECANCELED);
246		}
247	}
248
249	/*
250	 * Count multiple enqueues.
251	 */
252	if (task->ta_pending) {
253		if (__predict_false((flags & TASKQUEUE_FAIL_IF_PENDING) != 0)) {
254			TQ_UNLOCK(queue);
255			return (EEXIST);
256		}
257		if (task->ta_pending < USHRT_MAX)
258			task->ta_pending++;
259		TQ_UNLOCK(queue);
260		return (0);
261	}
262
263	/*
264	 * Optimise cases when all tasks use small set of priorities.
265	 * In case of only one priority we always insert at the end.
266	 * In case of two tq_hint typically gives the insertion point.
267	 * In case of more then two tq_hint should halve the search.
268	 */
269	prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
270	if (!prev || prev->ta_priority >= task->ta_priority) {
271		STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
272	} else {
273		prev = queue->tq_hint;
274		if (prev && prev->ta_priority >= task->ta_priority) {
275			ins = STAILQ_NEXT(prev, ta_link);
276		} else {
277			prev = NULL;
278			ins = STAILQ_FIRST(&queue->tq_queue);
279		}
280		for (; ins; prev = ins, ins = STAILQ_NEXT(ins, ta_link))
281			if (ins->ta_priority < task->ta_priority)
282				break;
283
284		if (prev) {
285			STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
286			queue->tq_hint = task;
287		} else
288			STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
289	}
290
291	task->ta_pending = 1;
292	if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) != 0)
293		TQ_UNLOCK(queue);
294	if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
295		queue->tq_enqueue(queue->tq_context);
296	if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0)
297		TQ_UNLOCK(queue);
298
299	/* Return with lock released. */
300	return (0);
301}
302
303int
304taskqueue_enqueue_flags(struct taskqueue *queue, struct task *task, int flags)
305{
306	int res;
307
308	TQ_LOCK(queue);
309	res = taskqueue_enqueue_locked(queue, task, flags);
310	/* The lock is released inside. */
311
312	return (res);
313}
314
315int
316taskqueue_enqueue(struct taskqueue *queue, struct task *task)
317{
318	return (taskqueue_enqueue_flags(queue, task, 0));
319}
320
321static void
322taskqueue_timeout_func(void *arg)
323{
324	struct taskqueue *queue;
325	struct timeout_task *timeout_task;
326
327	timeout_task = arg;
328	queue = timeout_task->q;
329	KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout"));
330	timeout_task->f &= ~DT_CALLOUT_ARMED;
331	queue->tq_callouts--;
332	taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t, 0);
333	/* The lock is released inside. */
334}
335
336int
337taskqueue_enqueue_timeout_sbt(struct taskqueue *queue,
338    struct timeout_task *timeout_task, sbintime_t sbt, sbintime_t pr, int flags)
339{
340	int res;
341
342	TQ_LOCK(queue);
343	KASSERT(timeout_task->q == NULL || timeout_task->q == queue,
344	    ("Migrated queue"));
345	timeout_task->q = queue;
346	res = timeout_task->t.ta_pending;
347	if (timeout_task->f & DT_DRAIN_IN_PROGRESS) {
348		/* Do nothing */
349		TQ_UNLOCK(queue);
350		res = -1;
351	} else if (sbt == 0) {
352		taskqueue_enqueue_locked(queue, &timeout_task->t, 0);
353		/* The lock is released inside. */
354	} else {
355		if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
356			res++;
357		} else {
358			queue->tq_callouts++;
359			timeout_task->f |= DT_CALLOUT_ARMED;
360			if (sbt < 0)
361				sbt = -sbt; /* Ignore overflow. */
362		}
363		if (sbt > 0) {
364			if (queue->tq_spin)
365				flags |= C_DIRECT_EXEC;
366			if (queue->tq_spin && queue->tq_tcount == 1 &&
367			    queue->tq_threads[0] == curthread) {
368				callout_reset_sbt_curcpu(&timeout_task->c, sbt, pr,
369				    taskqueue_timeout_func, timeout_task, flags);
370			} else {
371				callout_reset_sbt(&timeout_task->c, sbt, pr,
372				    taskqueue_timeout_func, timeout_task, flags);
373			}
374		}
375		TQ_UNLOCK(queue);
376	}
377	return (res);
378}
379
380int
381taskqueue_enqueue_timeout(struct taskqueue *queue,
382    struct timeout_task *ttask, int ticks)
383{
384
385	return (taskqueue_enqueue_timeout_sbt(queue, ttask, ticks * tick_sbt,
386	    0, C_HARDCLOCK));
387}
388
389static void
390taskqueue_task_nop_fn(void *context, int pending)
391{
392}
393
394/*
395 * Block until all currently queued tasks in this taskqueue
396 * have begun execution.  Tasks queued during execution of
397 * this function are ignored.
398 */
399static int
400taskqueue_drain_tq_queue(struct taskqueue *queue)
401{
402	struct task t_barrier;
403
404	if (STAILQ_EMPTY(&queue->tq_queue))
405		return (0);
406
407	/*
408	 * Enqueue our barrier after all current tasks, but with
409	 * the highest priority so that newly queued tasks cannot
410	 * pass it.  Because of the high priority, we can not use
411	 * taskqueue_enqueue_locked directly (which drops the lock
412	 * anyway) so just insert it at tail while we have the
413	 * queue lock.
414	 */
415	TASK_INIT(&t_barrier, UCHAR_MAX, taskqueue_task_nop_fn, &t_barrier);
416	STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link);
417	queue->tq_hint = &t_barrier;
418	t_barrier.ta_pending = 1;
419
420	/*
421	 * Once the barrier has executed, all previously queued tasks
422	 * have completed or are currently executing.
423	 */
424	while (t_barrier.ta_pending != 0)
425		TQ_SLEEP(queue, &t_barrier, "tq_qdrain");
426	return (1);
427}
428
429/*
430 * Block until all currently executing tasks for this taskqueue
431 * complete.  Tasks that begin execution during the execution
432 * of this function are ignored.
433 */
434static int
435taskqueue_drain_tq_active(struct taskqueue *queue)
436{
437	struct taskqueue_busy *tb;
438	u_int seq;
439
440	if (LIST_EMPTY(&queue->tq_active))
441		return (0);
442
443	/* Block taskq_terminate().*/
444	queue->tq_callouts++;
445
446	/* Wait for any active task with sequence from the past. */
447	seq = queue->tq_seq;
448restart:
449	LIST_FOREACH(tb, &queue->tq_active, tb_link) {
450		if ((int)(tb->tb_seq - seq) <= 0) {
451			TQ_SLEEP(queue, tb->tb_running, "tq_adrain");
452			goto restart;
453		}
454	}
455
456	/* Release taskqueue_terminate(). */
457	queue->tq_callouts--;
458	if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0)
459		wakeup_one(queue->tq_threads);
460	return (1);
461}
462
463void
464taskqueue_block(struct taskqueue *queue)
465{
466
467	TQ_LOCK(queue);
468	queue->tq_flags |= TQ_FLAGS_BLOCKED;
469	TQ_UNLOCK(queue);
470}
471
472void
473taskqueue_unblock(struct taskqueue *queue)
474{
475
476	TQ_LOCK(queue);
477	queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
478	if (!STAILQ_EMPTY(&queue->tq_queue))
479		queue->tq_enqueue(queue->tq_context);
480	TQ_UNLOCK(queue);
481}
482
483static void
484taskqueue_run_locked(struct taskqueue *queue)
485{
486	struct epoch_tracker et;
487	struct taskqueue_busy tb;
488	struct task *task;
489	bool in_net_epoch;
490	int pending;
491
492	KASSERT(queue != NULL, ("tq is NULL"));
493	TQ_ASSERT_LOCKED(queue);
494	tb.tb_running = NULL;
495	LIST_INSERT_HEAD(&queue->tq_active, &tb, tb_link);
496	in_net_epoch = false;
497
498	while ((task = STAILQ_FIRST(&queue->tq_queue)) != NULL) {
499		STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
500		if (queue->tq_hint == task)
501			queue->tq_hint = NULL;
502		pending = task->ta_pending;
503		task->ta_pending = 0;
504		tb.tb_running = task;
505		tb.tb_seq = ++queue->tq_seq;
506		tb.tb_canceling = false;
507		TQ_UNLOCK(queue);
508
509		KASSERT(task->ta_func != NULL, ("task->ta_func is NULL"));
510		if (!in_net_epoch && TASK_IS_NET(task)) {
511			in_net_epoch = true;
512			NET_EPOCH_ENTER(et);
513		} else if (in_net_epoch && !TASK_IS_NET(task)) {
514			NET_EPOCH_EXIT(et);
515			in_net_epoch = false;
516		}
517		task->ta_func(task->ta_context, pending);
518
519		TQ_LOCK(queue);
520		wakeup(task);
521	}
522	if (in_net_epoch)
523		NET_EPOCH_EXIT(et);
524	LIST_REMOVE(&tb, tb_link);
525}
526
527void
528taskqueue_run(struct taskqueue *queue)
529{
530
531	TQ_LOCK(queue);
532	taskqueue_run_locked(queue);
533	TQ_UNLOCK(queue);
534}
535
536/*
537 * Only use this function in single threaded contexts. It returns
538 * non-zero if the given task is either pending or running. Else the
539 * task is idle and can be queued again or freed.
540 */
541int
542taskqueue_poll_is_busy(struct taskqueue *queue, struct task *task)
543{
544	int retval;
545
546	TQ_LOCK(queue);
547	retval = task->ta_pending > 0 || task_get_busy(queue, task) != NULL;
548	TQ_UNLOCK(queue);
549
550	return (retval);
551}
552
553static int
554taskqueue_cancel_locked(struct taskqueue *queue, struct task *task,
555    u_int *pendp)
556{
557	struct taskqueue_busy *tb;
558	int retval = 0;
559
560	if (task->ta_pending > 0) {
561		STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link);
562		if (queue->tq_hint == task)
563			queue->tq_hint = NULL;
564	}
565	if (pendp != NULL)
566		*pendp = task->ta_pending;
567	task->ta_pending = 0;
568	tb = task_get_busy(queue, task);
569	if (tb != NULL) {
570		tb->tb_canceling = true;
571		retval = EBUSY;
572	}
573
574	return (retval);
575}
576
577int
578taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp)
579{
580	int error;
581
582	TQ_LOCK(queue);
583	error = taskqueue_cancel_locked(queue, task, pendp);
584	TQ_UNLOCK(queue);
585
586	return (error);
587}
588
589int
590taskqueue_cancel_timeout(struct taskqueue *queue,
591    struct timeout_task *timeout_task, u_int *pendp)
592{
593	u_int pending, pending1;
594	int error;
595
596	TQ_LOCK(queue);
597	pending = !!(callout_stop(&timeout_task->c) > 0);
598	error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1);
599	if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
600		timeout_task->f &= ~DT_CALLOUT_ARMED;
601		queue->tq_callouts--;
602	}
603	TQ_UNLOCK(queue);
604
605	if (pendp != NULL)
606		*pendp = pending + pending1;
607	return (error);
608}
609
610void
611taskqueue_drain(struct taskqueue *queue, struct task *task)
612{
613
614	if (!queue->tq_spin)
615		WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
616
617	TQ_LOCK(queue);
618	while (task->ta_pending != 0 || task_get_busy(queue, task) != NULL)
619		TQ_SLEEP(queue, task, "tq_drain");
620	TQ_UNLOCK(queue);
621}
622
623void
624taskqueue_drain_all(struct taskqueue *queue)
625{
626
627	if (!queue->tq_spin)
628		WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
629
630	TQ_LOCK(queue);
631	(void)taskqueue_drain_tq_queue(queue);
632	(void)taskqueue_drain_tq_active(queue);
633	TQ_UNLOCK(queue);
634}
635
636void
637taskqueue_drain_timeout(struct taskqueue *queue,
638    struct timeout_task *timeout_task)
639{
640
641	/*
642	 * Set flag to prevent timer from re-starting during drain:
643	 */
644	TQ_LOCK(queue);
645	KASSERT((timeout_task->f & DT_DRAIN_IN_PROGRESS) == 0,
646	    ("Drain already in progress"));
647	timeout_task->f |= DT_DRAIN_IN_PROGRESS;
648	TQ_UNLOCK(queue);
649
650	callout_drain(&timeout_task->c);
651	taskqueue_drain(queue, &timeout_task->t);
652
653	/*
654	 * Clear flag to allow timer to re-start:
655	 */
656	TQ_LOCK(queue);
657	timeout_task->f &= ~DT_DRAIN_IN_PROGRESS;
658	TQ_UNLOCK(queue);
659}
660
661void
662taskqueue_quiesce(struct taskqueue *queue)
663{
664	int ret;
665
666	TQ_LOCK(queue);
667	do {
668		ret = taskqueue_drain_tq_queue(queue);
669		if (ret == 0)
670			ret = taskqueue_drain_tq_active(queue);
671	} while (ret != 0);
672	TQ_UNLOCK(queue);
673}
674
675static void
676taskqueue_swi_enqueue(void *context)
677{
678	swi_sched(taskqueue_ih, 0);
679}
680
681static void
682taskqueue_swi_run(void *dummy)
683{
684	taskqueue_run(taskqueue_swi);
685}
686
687static void
688taskqueue_swi_giant_enqueue(void *context)
689{
690	swi_sched(taskqueue_giant_ih, 0);
691}
692
693static void
694taskqueue_swi_giant_run(void *dummy)
695{
696	taskqueue_run(taskqueue_swi_giant);
697}
698
699static int
700_taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
701    cpuset_t *mask, struct proc *p, const char *name, va_list ap)
702{
703	char ktname[MAXCOMLEN + 1];
704	struct thread *td;
705	struct taskqueue *tq;
706	int i, error;
707
708	if (count <= 0)
709		return (EINVAL);
710
711	vsnprintf(ktname, sizeof(ktname), name, ap);
712	tq = *tqp;
713
714	tq->tq_threads = malloc(sizeof(struct thread *) * count, M_TASKQUEUE,
715	    M_NOWAIT | M_ZERO);
716	if (tq->tq_threads == NULL) {
717		printf("%s: no memory for %s threads\n", __func__, ktname);
718		return (ENOMEM);
719	}
720
721	for (i = 0; i < count; i++) {
722		if (count == 1)
723			error = kthread_add(taskqueue_thread_loop, tqp, p,
724			    &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname);
725		else
726			error = kthread_add(taskqueue_thread_loop, tqp, p,
727			    &tq->tq_threads[i], RFSTOPPED, 0,
728			    "%s_%d", ktname, i);
729		if (error) {
730			/* should be ok to continue, taskqueue_free will dtrt */
731			printf("%s: kthread_add(%s): error %d", __func__,
732			    ktname, error);
733			tq->tq_threads[i] = NULL;		/* paranoid */
734		} else
735			tq->tq_tcount++;
736	}
737	if (tq->tq_tcount == 0) {
738		free(tq->tq_threads, M_TASKQUEUE);
739		tq->tq_threads = NULL;
740		return (ENOMEM);
741	}
742	for (i = 0; i < count; i++) {
743		if (tq->tq_threads[i] == NULL)
744			continue;
745		td = tq->tq_threads[i];
746		if (mask) {
747			error = cpuset_setthread(td->td_tid, mask);
748			/*
749			 * Failing to pin is rarely an actual fatal error;
750			 * it'll just affect performance.
751			 */
752			if (error)
753				printf("%s: curthread=%llu: can't pin; "
754				    "error=%d\n",
755				    __func__,
756				    (unsigned long long) td->td_tid,
757				    error);
758		}
759		thread_lock(td);
760		sched_prio(td, pri);
761		sched_add(td, SRQ_BORING);
762	}
763
764	return (0);
765}
766
767int
768taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
769    const char *name, ...)
770{
771	va_list ap;
772	int error;
773
774	va_start(ap, name);
775	error = _taskqueue_start_threads(tqp, count, pri, NULL, NULL, name, ap);
776	va_end(ap);
777	return (error);
778}
779
780int
781taskqueue_start_threads_in_proc(struct taskqueue **tqp, int count, int pri,
782    struct proc *proc, const char *name, ...)
783{
784	va_list ap;
785	int error;
786
787	va_start(ap, name);
788	error = _taskqueue_start_threads(tqp, count, pri, NULL, proc, name, ap);
789	va_end(ap);
790	return (error);
791}
792
793int
794taskqueue_start_threads_cpuset(struct taskqueue **tqp, int count, int pri,
795    cpuset_t *mask, const char *name, ...)
796{
797	va_list ap;
798	int error;
799
800	va_start(ap, name);
801	error = _taskqueue_start_threads(tqp, count, pri, mask, NULL, name, ap);
802	va_end(ap);
803	return (error);
804}
805
806static inline void
807taskqueue_run_callback(struct taskqueue *tq,
808    enum taskqueue_callback_type cb_type)
809{
810	taskqueue_callback_fn tq_callback;
811
812	TQ_ASSERT_UNLOCKED(tq);
813	tq_callback = tq->tq_callbacks[cb_type];
814	if (tq_callback != NULL)
815		tq_callback(tq->tq_cb_contexts[cb_type]);
816}
817
818void
819taskqueue_thread_loop(void *arg)
820{
821	struct taskqueue **tqp, *tq;
822
823	tqp = arg;
824	tq = *tqp;
825	taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT);
826	TQ_LOCK(tq);
827	while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
828		/* XXX ? */
829		taskqueue_run_locked(tq);
830		/*
831		 * Because taskqueue_run() can drop tq_mutex, we need to
832		 * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the
833		 * meantime, which means we missed a wakeup.
834		 */
835		if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0)
836			break;
837		TQ_SLEEP(tq, tq, "-");
838	}
839	taskqueue_run_locked(tq);
840	/*
841	 * This thread is on its way out, so just drop the lock temporarily
842	 * in order to call the shutdown callback.  This allows the callback
843	 * to look at the taskqueue, even just before it dies.
844	 */
845	TQ_UNLOCK(tq);
846	taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN);
847	TQ_LOCK(tq);
848
849	/* rendezvous with thread that asked us to terminate */
850	tq->tq_tcount--;
851	wakeup_one(tq->tq_threads);
852	TQ_UNLOCK(tq);
853	kthread_exit();
854}
855
856void
857taskqueue_thread_enqueue(void *context)
858{
859	struct taskqueue **tqp, *tq;
860
861	tqp = context;
862	tq = *tqp;
863	wakeup_any(tq);
864}
865
866TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, NULL,
867		 swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ,
868		     INTR_MPSAFE, &taskqueue_ih));
869
870TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, NULL,
871		 swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run,
872		     NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih));
873
874TASKQUEUE_DEFINE_THREAD(thread);
875
876struct taskqueue *
877taskqueue_create_fast(const char *name, int mflags,
878		 taskqueue_enqueue_fn enqueue, void *context)
879{
880	return _taskqueue_create(name, mflags, enqueue, context,
881			MTX_SPIN, "fast_taskqueue");
882}
883
884static void	*taskqueue_fast_ih;
885
886static void
887taskqueue_fast_enqueue(void *context)
888{
889	swi_sched(taskqueue_fast_ih, 0);
890}
891
892static void
893taskqueue_fast_run(void *dummy)
894{
895	taskqueue_run(taskqueue_fast);
896}
897
898TASKQUEUE_FAST_DEFINE(fast, taskqueue_fast_enqueue, NULL,
899	swi_add(NULL, "fast taskq", taskqueue_fast_run, NULL,
900	SWI_TQ_FAST, INTR_MPSAFE, &taskqueue_fast_ih));
901
902int
903taskqueue_member(struct taskqueue *queue, struct thread *td)
904{
905	int i, j, ret = 0;
906
907	for (i = 0, j = 0; ; i++) {
908		if (queue->tq_threads[i] == NULL)
909			continue;
910		if (queue->tq_threads[i] == td) {
911			ret = 1;
912			break;
913		}
914		if (++j >= queue->tq_tcount)
915			break;
916	}
917	return (ret);
918}
919