11556Srgrimes/*
21556Srgrimes * Licensed to the Apache Software Foundation (ASF) under one or more
31556Srgrimes * contributor license agreements.  See the NOTICE file distributed
41556Srgrimes * with this work for additional information regarding copyright
51556Srgrimes * ownership.  The ASF licenses this file to you under the Apache
61556Srgrimes * License, Version 2.0 (the "License"); you may not use this file
71556Srgrimes * except in compliance with the License.  You may obtain a copy of
81556Srgrimes * the License at
91556Srgrimes *
101556Srgrimes *     http://www.apache.org/licenses/LICENSE-2.0
111556Srgrimes *
121556Srgrimes * Unless required by applicable law or agreed to in writing, software
131556Srgrimes * distributed under the License is distributed on an "AS IS" BASIS,
141556Srgrimes * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
151556Srgrimes * implied.  See the License for the specific language governing
161556Srgrimes * permissions and limitations under the License.
171556Srgrimes */
181556Srgrimes
191556Srgrimes#include <assert.h>
201556Srgrimes#include "apr_thread_pool.h"
211556Srgrimes#include "apr_ring.h"
221556Srgrimes#include "apr_thread_cond.h"
231556Srgrimes#include "apr_portable.h"
241556Srgrimes
251556Srgrimes#if APR_HAS_THREADS
261556Srgrimes
271556Srgrimes#define TASK_PRIORITY_SEGS 4
281556Srgrimes#define TASK_PRIORITY_SEG(x) (((x)->dispatch.priority & 0xFF) / 64)
291556Srgrimes
301556Srgrimestypedef struct apr_thread_pool_task
311556Srgrimes{
321556Srgrimes    APR_RING_ENTRY(apr_thread_pool_task) link;
331556Srgrimes    apr_thread_start_t func;
341556Srgrimes    void *param;
351556Srgrimes    void *owner;
361556Srgrimes    union
371556Srgrimes    {
3827958Ssteve        apr_byte_t priority;
391556Srgrimes        apr_time_t time;
401556Srgrimes    } dispatch;
4127967Ssteve} apr_thread_pool_task_t;
4227967Ssteve
4327967SsteveAPR_RING_HEAD(apr_thread_pool_tasks, apr_thread_pool_task);
4427967Ssteve
4527967Sstevestruct apr_thread_list_elt
4627967Ssteve{
4727958Ssteve    APR_RING_ENTRY(apr_thread_list_elt) link;
4850471Speter    apr_thread_t *thd;
4927967Ssteve    volatile void *current_owner;
501556Srgrimes    volatile enum { TH_RUN, TH_STOP, TH_PROBATION } state;
511556Srgrimes};
521556Srgrimes
531556SrgrimesAPR_RING_HEAD(apr_thread_list, apr_thread_list_elt);
541556Srgrimes
551556Srgrimesstruct apr_thread_pool
561556Srgrimes{
571556Srgrimes    apr_pool_t *pool;
581556Srgrimes    volatile apr_size_t thd_max;
591556Srgrimes    volatile apr_size_t idle_max;
6050050Ssheldonh    volatile apr_interval_time_t idle_wait;
6150050Ssheldonh    volatile apr_size_t thd_cnt;
621556Srgrimes    volatile apr_size_t idle_cnt;
631556Srgrimes    volatile apr_size_t task_cnt;
641556Srgrimes    volatile apr_size_t scheduled_task_cnt;
6561289Sache    volatile apr_size_t threshold;
6661268Sjoe    volatile apr_size_t tasks_run;
6761289Sache    volatile apr_size_t tasks_high;
6861289Sache    volatile apr_size_t thd_high;
6961268Sjoe    volatile apr_size_t thd_timed_out;
701556Srgrimes    struct apr_thread_pool_tasks *tasks;
711556Srgrimes    struct apr_thread_pool_tasks *scheduled_tasks;
721556Srgrimes    struct apr_thread_list *busy_thds;
7386922Sgreen    struct apr_thread_list *idle_thds;
741556Srgrimes    apr_thread_mutex_t *lock;
7550050Ssheldonh    apr_thread_cond_t *cond;
7650050Ssheldonh    volatile int terminated;
7750050Ssheldonh    struct apr_thread_pool_tasks *recycled_tasks;
7850050Ssheldonh    struct apr_thread_list *recycled_thds;
7950050Ssheldonh    apr_thread_pool_task_t *task_idx[TASK_PRIORITY_SEGS];
8050051Ssheldonh};
8150050Ssheldonh
821556Srgrimesstatic apr_status_t thread_pool_construct(apr_thread_pool_t * me,
8337932Shoek                                          apr_size_t init_threads,
841556Srgrimes                                          apr_size_t max_threads)
851556Srgrimes{
861556Srgrimes    apr_status_t rv;
871556Srgrimes    int i;
881556Srgrimes
891556Srgrimes    me->thd_max = max_threads;
901556Srgrimes    me->idle_max = init_threads;
911556Srgrimes    me->threshold = init_threads / 2;
921556Srgrimes    rv = apr_thread_mutex_create(&me->lock, APR_THREAD_MUTEX_NESTED,
931556Srgrimes                                 me->pool);
941556Srgrimes    if (APR_SUCCESS != rv) {
951556Srgrimes        return rv;
961556Srgrimes    }
9788591Sjoe    rv = apr_thread_cond_create(&me->cond, me->pool);
981556Srgrimes    if (APR_SUCCESS != rv) {
992889Spst        apr_thread_mutex_destroy(me->lock);
1001556Srgrimes        return rv;
1011556Srgrimes    }
1021556Srgrimes    me->tasks = apr_palloc(me->pool, sizeof(*me->tasks));
1031556Srgrimes    if (!me->tasks) {
1041556Srgrimes        goto CATCH_ENOMEM;
10550050Ssheldonh    }
10649373Ssheldonh    APR_RING_INIT(me->tasks, apr_thread_pool_task, link);
10735373Sdes    me->scheduled_tasks = apr_palloc(me->pool, sizeof(*me->scheduled_tasks));
10835417Sdes    if (!me->scheduled_tasks) {
1091556Srgrimes        goto CATCH_ENOMEM;
1101556Srgrimes    }
1111556Srgrimes    APR_RING_INIT(me->scheduled_tasks, apr_thread_pool_task, link);
1121556Srgrimes    me->recycled_tasks = apr_palloc(me->pool, sizeof(*me->recycled_tasks));
1131556Srgrimes    if (!me->recycled_tasks) {
1141556Srgrimes        goto CATCH_ENOMEM;
1151556Srgrimes    }
1161556Srgrimes    APR_RING_INIT(me->recycled_tasks, apr_thread_pool_task, link);
11720417Ssteve    me->busy_thds = apr_palloc(me->pool, sizeof(*me->busy_thds));
11886922Sgreen    if (!me->busy_thds) {
11961268Sjoe        goto CATCH_ENOMEM;
12061178Sjoe    }
12161271Sjoe    APR_RING_INIT(me->busy_thds, apr_thread_list_elt, link);
12261271Sjoe    me->idle_thds = apr_palloc(me->pool, sizeof(*me->idle_thds));
12361271Sjoe    if (!me->idle_thds) {
12461271Sjoe        goto CATCH_ENOMEM;
12588583Sjoe    }
12688583Sjoe    APR_RING_INIT(me->idle_thds, apr_thread_list_elt, link);
12761268Sjoe    me->recycled_thds = apr_palloc(me->pool, sizeof(*me->recycled_thds));
1281556Srgrimes    if (!me->recycled_thds) {
12917852Sadam        goto CATCH_ENOMEM;
13017852Sadam    }
1311556Srgrimes    APR_RING_INIT(me->recycled_thds, apr_thread_list_elt, link);
1321556Srgrimes    me->thd_cnt = me->idle_cnt = me->task_cnt = me->scheduled_task_cnt = 0;
1331556Srgrimes    me->tasks_run = me->tasks_high = me->thd_high = me->thd_timed_out = 0;
1341556Srgrimes    me->idle_wait = 0;
1351556Srgrimes    me->terminated = 0;
13688602Sjoe    for (i = 0; i < TASK_PRIORITY_SEGS; i++) {
1371556Srgrimes        me->task_idx[i] = NULL;
1381556Srgrimes    }
1391556Srgrimes    goto FINAL_EXIT;
14061271Sjoe  CATCH_ENOMEM:
14188602Sjoe    rv = APR_ENOMEM;
14288602Sjoe    apr_thread_mutex_destroy(me->lock);
14361271Sjoe    apr_thread_cond_destroy(me->cond);
14461271Sjoe  FINAL_EXIT:
14561271Sjoe    return rv;
14688602Sjoe}
14711808Sache
1481556Srgrimes/*
1491556Srgrimes * NOTE: This function is not thread safe by itself. Caller should hold the lock
1501556Srgrimes */
1511556Srgrimesstatic apr_thread_pool_task_t *pop_task(apr_thread_pool_t * me)
1521556Srgrimes{
1531556Srgrimes    apr_thread_pool_task_t *task = NULL;
15488602Sjoe    int seg;
1551556Srgrimes
1561556Srgrimes    /* check for scheduled tasks */
1575158Sjoerg    if (me->scheduled_task_cnt > 0) {
1581556Srgrimes        task = APR_RING_FIRST(me->scheduled_tasks);
1595158Sjoerg        assert(task != NULL);
1607165Sjoerg        assert(task !=
1615158Sjoerg               APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
1625158Sjoerg                                 link));
1631556Srgrimes        /* if it's time */
1641556Srgrimes        if (task->dispatch.time <= apr_time_now()) {
1651556Srgrimes            --me->scheduled_task_cnt;
1661556Srgrimes            APR_RING_REMOVE(task, link);
1671556Srgrimes            return task;
1681556Srgrimes        }
16988603Sjoe    }
1701556Srgrimes    /* check for normal tasks if we're not returning a scheduled task */
1711556Srgrimes    if (me->task_cnt == 0) {
1721556Srgrimes        return NULL;
1731556Srgrimes    }
1741556Srgrimes
1751556Srgrimes    task = APR_RING_FIRST(me->tasks);
1761556Srgrimes    assert(task != NULL);
1771556Srgrimes    assert(task != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link));
1781556Srgrimes    --me->task_cnt;
17935417Sdes    seg = TASK_PRIORITY_SEG(task);
18035417Sdes    if (task == me->task_idx[seg]) {
18135417Sdes        me->task_idx[seg] = APR_RING_NEXT(task, link);
18288602Sjoe        if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks,
18335417Sdes                                                   apr_thread_pool_task, link)
1841556Srgrimes            || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) {
1851556Srgrimes            me->task_idx[seg] = NULL;
1861556Srgrimes        }
1871556Srgrimes    }
1881556Srgrimes    APR_RING_REMOVE(task, link);
1891556Srgrimes    return task;
1901556Srgrimes}
1911556Srgrimes
1921556Srgrimesstatic apr_interval_time_t waiting_time(apr_thread_pool_t * me)
1931556Srgrimes{
1941556Srgrimes    apr_thread_pool_task_t *task = NULL;
1951556Srgrimes
1961556Srgrimes    task = APR_RING_FIRST(me->scheduled_tasks);
1971556Srgrimes    assert(task != NULL);
1981556Srgrimes    assert(task !=
1991556Srgrimes           APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
2001556Srgrimes                             link));
2011556Srgrimes    return task->dispatch.time - apr_time_now();
2021556Srgrimes}
2031556Srgrimes
20435426Sdes/*
20588602Sjoe * NOTE: This function is not thread safe by itself. Caller should hold the lock
20635426Sdes */
20761178Sjoestatic struct apr_thread_list_elt *elt_new(apr_thread_pool_t * me,
20864568Sjoe                                           apr_thread_t * t)
20961178Sjoe{
2101556Srgrimes    struct apr_thread_list_elt *elt;
2111556Srgrimes
2121556Srgrimes    if (APR_RING_EMPTY(me->recycled_thds, apr_thread_list_elt, link)) {
2131556Srgrimes        elt = apr_pcalloc(me->pool, sizeof(*elt));
21435426Sdes        if (NULL == elt) {
21588602Sjoe            return NULL;
21635426Sdes        }
21735426Sdes    }
21835426Sdes    else {
2191556Srgrimes        elt = APR_RING_FIRST(me->recycled_thds);
2201556Srgrimes        APR_RING_REMOVE(elt, link);
2211556Srgrimes    }
2221556Srgrimes
2231556Srgrimes    APR_RING_ELEM_INIT(elt, link);
2241556Srgrimes    elt->thd = t;
2251556Srgrimes    elt->current_owner = NULL;
2261556Srgrimes    elt->state = TH_RUN;
2271556Srgrimes    return elt;
2281556Srgrimes}
2291556Srgrimes
2301556Srgrimes/*
2311556Srgrimes * The worker thread function. Take a task from the queue and perform it if
2321556Srgrimes * there is any. Otherwise, put itself into the idle thread list and waiting
2331556Srgrimes * for signal to wake up.
2341556Srgrimes * The thread terminate directly by detach and exit when it is asked to stop
2351556Srgrimes * after finishing a task. Otherwise, the thread should be in idle thread list
23688602Sjoe * and should be joined.
2371556Srgrimes */
23888591Sjoestatic void *APR_THREAD_FUNC thread_pool_func(apr_thread_t * t, void *param)
23988591Sjoe{
24088591Sjoe    apr_thread_pool_t *me = param;
2411556Srgrimes    apr_thread_pool_task_t *task = NULL;
2421556Srgrimes    apr_interval_time_t wait;
2431556Srgrimes    struct apr_thread_list_elt *elt;
2442889Spst
2452889Spst    apr_thread_mutex_lock(me->lock);
2462889Spst    elt = elt_new(me, t);
24749373Ssheldonh    if (!elt) {
24849373Ssheldonh        apr_thread_mutex_unlock(me->lock);
24949373Ssheldonh        apr_thread_exit(t, APR_ENOMEM);
2501556Srgrimes    }
2511556Srgrimes
2521556Srgrimes    while (!me->terminated && elt->state != TH_STOP) {
2531556Srgrimes        /* Test if not new element, it is awakened from idle */
2541556Srgrimes        if (APR_RING_NEXT(elt, link) != elt) {
25535373Sdes            --me->idle_cnt;
25688602Sjoe            APR_RING_REMOVE(elt, link);
2571556Srgrimes        }
2581556Srgrimes
2591556Srgrimes        APR_RING_INSERT_TAIL(me->busy_thds, elt, apr_thread_list_elt, link);
2601556Srgrimes        task = pop_task(me);
2611556Srgrimes        while (NULL != task && !me->terminated) {
2621556Srgrimes            ++me->tasks_run;
2631556Srgrimes            elt->current_owner = task->owner;
2641556Srgrimes            apr_thread_mutex_unlock(me->lock);
2651556Srgrimes            apr_thread_data_set(task, "apr_thread_pool_task", NULL, t);
2661556Srgrimes            task->func(t, task->param);
2671556Srgrimes            apr_thread_mutex_lock(me->lock);
2681556Srgrimes            APR_RING_INSERT_TAIL(me->recycled_tasks, task,
2691556Srgrimes                                 apr_thread_pool_task, link);
27020417Ssteve            elt->current_owner = NULL;
27120417Ssteve            if (TH_STOP == elt->state) {
27220417Ssteve                break;
27335373Sdes            }
27435373Sdes            task = pop_task(me);
27588602Sjoe        }
27635417Sdes        assert(NULL == elt->current_owner);
27735373Sdes        if (TH_STOP != elt->state)
27888603Sjoe            APR_RING_REMOVE(elt, link);
27988603Sjoe
28088603Sjoe        /* Test if a busy thread been asked to stop, which is not joinable */
28188603Sjoe        if ((me->idle_cnt >= me->idle_max
28288603Sjoe             && !(me->scheduled_task_cnt && 0 >= me->idle_max)
28386922Sgreen             && !me->idle_wait)
28486922Sgreen            || me->terminated || elt->state != TH_RUN) {
28586922Sgreen            --me->thd_cnt;
2861556Srgrimes            if ((TH_PROBATION == elt->state) && me->idle_wait)
2871556Srgrimes                ++me->thd_timed_out;
2881556Srgrimes            APR_RING_INSERT_TAIL(me->recycled_thds, elt,
2891556Srgrimes                                 apr_thread_list_elt, link);
2901556Srgrimes            apr_thread_mutex_unlock(me->lock);
2911556Srgrimes            apr_thread_detach(t);
2921556Srgrimes            apr_thread_exit(t, APR_SUCCESS);
2931556Srgrimes            return NULL;        /* should not be here, safe net */
29464568Sjoe        }
29564568Sjoe
29664568Sjoe        /* busy thread become idle */
29764604Sjoe        ++me->idle_cnt;
29864568Sjoe        APR_RING_INSERT_TAIL(me->idle_thds, elt, apr_thread_list_elt, link);
29964568Sjoe
30064568Sjoe        /*
30188583Sjoe         * If there is a scheduled task, always scheduled to perform that task.
30288583Sjoe         * Since there is no guarantee that current idle threads are scheduled
30364568Sjoe         * for next scheduled task.
30464568Sjoe         */
30564568Sjoe        if (me->scheduled_task_cnt)
30664568Sjoe            wait = waiting_time(me);
30764568Sjoe        else if (me->idle_cnt > me->idle_max) {
30864568Sjoe            wait = me->idle_wait;
30964568Sjoe            elt->state = TH_PROBATION;
31064568Sjoe        }
31164568Sjoe        else
31264568Sjoe            wait = -1;
31364604Sjoe
31464604Sjoe        if (wait >= 0) {
31564604Sjoe            apr_thread_cond_timedwait(me->cond, me->lock, wait);
31664568Sjoe        }
31764604Sjoe        else {
31861289Sache            apr_thread_cond_wait(me->cond, me->lock);
31961337Sache        }
32061337Sache    }
32161337Sache
32261337Sache    /* idle thread been asked to stop, will be joined */
32361337Sache    --me->thd_cnt;
32461337Sache    apr_thread_mutex_unlock(me->lock);
32588602Sjoe    apr_thread_exit(t, APR_SUCCESS);
32688602Sjoe    return NULL;                /* should not be here, safe net */
32761178Sjoe}
32861289Sache
32961268Sjoestatic apr_status_t thread_pool_cleanup(void *me)
33061178Sjoe{
3311556Srgrimes    apr_thread_pool_t *_myself = me;
3321556Srgrimes
33361178Sjoe    _myself->terminated = 1;
33461178Sjoe    apr_thread_pool_idle_max_set(_myself, 0);
3351556Srgrimes    while (_myself->thd_cnt) {
33661178Sjoe        apr_sleep(20 * 1000);   /* spin lock with 20 ms */
33761268Sjoe    }
33861268Sjoe    apr_thread_mutex_destroy(_myself->lock);
33961268Sjoe    apr_thread_cond_destroy(_myself->cond);
34088602Sjoe    return APR_SUCCESS;
3411556Srgrimes}
3421556Srgrimes
3431556SrgrimesAPU_DECLARE(apr_status_t) apr_thread_pool_create(apr_thread_pool_t ** me,
3441556Srgrimes                                                 apr_size_t init_threads,
3451556Srgrimes                                                 apr_size_t max_threads,
3461556Srgrimes                                                 apr_pool_t * pool)
3471556Srgrimes{
3481556Srgrimes    apr_thread_t *t;
3491556Srgrimes    apr_status_t rv = APR_SUCCESS;
35020417Ssteve    apr_thread_pool_t *tp;
35120417Ssteve
35220417Ssteve    *me = NULL;
35320417Ssteve    tp = apr_pcalloc(pool, sizeof(apr_thread_pool_t));
35420417Ssteve
35520417Ssteve    /*
35620417Ssteve     * This pool will be used by different threads. As we cannot ensure that
35720417Ssteve     * our caller won't use the pool without acquiring the mutex, we must
3581556Srgrimes     * create a new sub pool.
3591556Srgrimes     */
3602889Spst    rv = apr_pool_create(&tp->pool, pool);
3617282Sphk    if (APR_SUCCESS != rv)
3627282Sphk        return rv;
3637282Sphk    rv = thread_pool_construct(tp, init_threads, max_threads);
3647282Sphk    if (APR_SUCCESS != rv)
3657282Sphk        return rv;
3661556Srgrimes    apr_pool_pre_cleanup_register(tp->pool, tp, thread_pool_cleanup);
3671556Srgrimes
3681556Srgrimes    while (init_threads) {
3691556Srgrimes        /* Grab the mutex as apr_thread_create() and thread_pool_func() will
3701556Srgrimes         * allocate from (*me)->pool. This is dangerous if there are multiple
3711556Srgrimes         * initial threads to create.
3721556Srgrimes         */
3731556Srgrimes        apr_thread_mutex_lock(tp->lock);
3741556Srgrimes        rv = apr_thread_create(&t, NULL, thread_pool_func, tp, tp->pool);
37588602Sjoe        apr_thread_mutex_unlock(tp->lock);
3761556Srgrimes        if (APR_SUCCESS != rv) {
3771556Srgrimes            break;
3781556Srgrimes        }
3791556Srgrimes        tp->thd_cnt++;
3801556Srgrimes        if (tp->thd_cnt > tp->thd_high) {
3811556Srgrimes            tp->thd_high = tp->thd_cnt;
3821556Srgrimes        }
3831556Srgrimes        --init_threads;
38488602Sjoe    }
3851556Srgrimes
3861556Srgrimes    if (rv == APR_SUCCESS) {
3871556Srgrimes        *me = tp;
3881556Srgrimes    }
3891556Srgrimes
3901556Srgrimes    return rv;
3911556Srgrimes}
3921556Srgrimes
3931556SrgrimesAPU_DECLARE(apr_status_t) apr_thread_pool_destroy(apr_thread_pool_t * me)
3941556Srgrimes{
3951556Srgrimes    apr_pool_destroy(me->pool);
3961556Srgrimes    return APR_SUCCESS;
3971556Srgrimes}
3981556Srgrimes
3991556Srgrimes/*
40017852Sadam * NOTE: This function is not thread safe by itself. Caller should hold the lock
4011556Srgrimes */
4021556Srgrimesstatic apr_thread_pool_task_t *task_new(apr_thread_pool_t * me,
40388602Sjoe                                        apr_thread_start_t func,
4041556Srgrimes                                        void *param, apr_byte_t priority,
4051556Srgrimes                                        void *owner, apr_time_t time)
4061556Srgrimes{
4071556Srgrimes    apr_thread_pool_task_t *t;
4081556Srgrimes
4091556Srgrimes    if (APR_RING_EMPTY(me->recycled_tasks, apr_thread_pool_task, link)) {
4101556Srgrimes        t = apr_pcalloc(me->pool, sizeof(*t));
4111556Srgrimes        if (NULL == t) {
4121556Srgrimes            return NULL;
4131556Srgrimes        }
4141556Srgrimes    }
4151556Srgrimes    else {
4161556Srgrimes        t = APR_RING_FIRST(me->recycled_tasks);
4171556Srgrimes        APR_RING_REMOVE(t, link);
4181556Srgrimes    }
4191556Srgrimes
4201556Srgrimes    APR_RING_ELEM_INIT(t, link);
4211556Srgrimes    t->func = func;
4221556Srgrimes    t->param = param;
4231556Srgrimes    t->owner = owner;
4241556Srgrimes    if (time > 0) {
4251556Srgrimes        t->dispatch.time = apr_time_now() + time;
4261556Srgrimes    }
4271556Srgrimes    else {
4281556Srgrimes        t->dispatch.priority = priority;
4291556Srgrimes    }
4301556Srgrimes    return t;
4311556Srgrimes}
4321556Srgrimes
4331556Srgrimes/*
4341556Srgrimes * Test it the task is the only one within the priority segment.
4351556Srgrimes * If it is not, return the first element with same or lower priority.
4361556Srgrimes * Otherwise, add the task into the queue and return NULL.
4371556Srgrimes *
4381556Srgrimes * NOTE: This function is not thread safe by itself. Caller should hold the lock
4391556Srgrimes */
4401556Srgrimesstatic apr_thread_pool_task_t *add_if_empty(apr_thread_pool_t * me,
4411556Srgrimes                                            apr_thread_pool_task_t * const t)
44217852Sadam{
4431556Srgrimes    int seg;
4441556Srgrimes    int next;
4451556Srgrimes    apr_thread_pool_task_t *t_next;
4461556Srgrimes
4471556Srgrimes    seg = TASK_PRIORITY_SEG(t);
4481556Srgrimes    if (me->task_idx[seg]) {
4491556Srgrimes        assert(APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) !=
4501556Srgrimes               me->task_idx[seg]);
4511556Srgrimes        t_next = me->task_idx[seg];
4521556Srgrimes        while (t_next->dispatch.priority > t->dispatch.priority) {
4531556Srgrimes            t_next = APR_RING_NEXT(t_next, link);
4541556Srgrimes            if (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) ==
4551556Srgrimes                t_next) {
4561556Srgrimes                return t_next;
4571556Srgrimes            }
4581556Srgrimes        }
4591556Srgrimes        return t_next;
4601556Srgrimes    }
4611556Srgrimes
4621556Srgrimes    for (next = seg - 1; next >= 0; next--) {
4631556Srgrimes        if (me->task_idx[next]) {
4641556Srgrimes            APR_RING_INSERT_BEFORE(me->task_idx[next], t, link);
4651556Srgrimes            break;
4661556Srgrimes        }
4671556Srgrimes    }
4681556Srgrimes    if (0 > next) {
4691556Srgrimes        APR_RING_INSERT_TAIL(me->tasks, t, apr_thread_pool_task, link);
4701556Srgrimes    }
4711556Srgrimes    me->task_idx[seg] = t;
4721556Srgrimes    return NULL;
4731556Srgrimes}
4741556Srgrimes
4751556Srgrimes/*
4761556Srgrimes*   schedule a task to run in "time" microseconds. Find the spot in the ring where
4771556Srgrimes*   the time fits. Adjust the short_time so the thread wakes up when the time is reached.
4781556Srgrimes*/
4791556Srgrimesstatic apr_status_t schedule_task(apr_thread_pool_t *me,
4801556Srgrimes                                  apr_thread_start_t func, void *param,
4811556Srgrimes                                  void *owner, apr_interval_time_t time)
4821556Srgrimes{
4831556Srgrimes    apr_thread_pool_task_t *t;
4841556Srgrimes    apr_thread_pool_task_t *t_loc;
48586922Sgreen    apr_thread_t *thd;
48686922Sgreen    apr_status_t rv = APR_SUCCESS;
48737932Shoek    apr_thread_mutex_lock(me->lock);
4881556Srgrimes
48986922Sgreen    t = task_new(me, func, param, 0, owner, time);
49050050Ssheldonh    if (NULL == t) {
49150050Ssheldonh        apr_thread_mutex_unlock(me->lock);
49250050Ssheldonh        return APR_ENOMEM;
4931556Srgrimes    }
4941556Srgrimes    t_loc = APR_RING_FIRST(me->scheduled_tasks);
4951556Srgrimes    while (NULL != t_loc) {
4961556Srgrimes        /* if the time is less than the entry insert ahead of it */
4971556Srgrimes        if (t->dispatch.time < t_loc->dispatch.time) {
4981556Srgrimes            ++me->scheduled_task_cnt;
49946684Skris            APR_RING_INSERT_BEFORE(t_loc, t, link);
5001556Srgrimes            break;
5011556Srgrimes        }
5021556Srgrimes        else {
5031556Srgrimes            t_loc = APR_RING_NEXT(t_loc, link);
5041556Srgrimes            if (t_loc ==
5051556Srgrimes                APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
50637932Shoek                                  link)) {
50737932Shoek                ++me->scheduled_task_cnt;
50837932Shoek                APR_RING_INSERT_TAIL(me->scheduled_tasks, t,
50937932Shoek                                     apr_thread_pool_task, link);
51037932Shoek                break;
51137932Shoek            }
51237932Shoek        }
51337932Shoek    }
51437932Shoek    /* there should be at least one thread for scheduled tasks */
51537932Shoek    if (0 == me->thd_cnt) {
51637932Shoek        rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool);
51737932Shoek        if (APR_SUCCESS == rv) {
51837932Shoek            ++me->thd_cnt;
51937932Shoek            if (me->thd_cnt > me->thd_high)
52037932Shoek                me->thd_high = me->thd_cnt;
52137932Shoek        }
52237932Shoek    }
52337932Shoek    apr_thread_cond_signal(me->cond);
52437932Shoek    apr_thread_mutex_unlock(me->lock);
52537932Shoek    return rv;
52637932Shoek}
52737932Shoek
52837932Shoekstatic apr_status_t add_task(apr_thread_pool_t *me, apr_thread_start_t func,
52937932Shoek                             void *param, apr_byte_t priority, int push,
53037932Shoek                             void *owner)
53188602Sjoe{
53288602Sjoe    apr_thread_pool_task_t *t;
53337932Shoek    apr_thread_pool_task_t *t_loc;
53437932Shoek    apr_thread_t *thd;
53586922Sgreen    apr_status_t rv = APR_SUCCESS;
53637932Shoek
53786922Sgreen    apr_thread_mutex_lock(me->lock);
53837932Shoek
53937932Shoek    t = task_new(me, func, param, priority, owner, 0);
54088602Sjoe    if (NULL == t) {
54188602Sjoe        apr_thread_mutex_unlock(me->lock);
54288602Sjoe        return APR_ENOMEM;
54388602Sjoe    }
54488602Sjoe
54588602Sjoe    t_loc = add_if_empty(me, t);
54688602Sjoe    if (NULL == t_loc) {
54788602Sjoe        goto FINAL_EXIT;
54888602Sjoe    }
54988602Sjoe
55088602Sjoe    if (push) {
55188602Sjoe        while (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) !=
55288602Sjoe               t_loc && t_loc->dispatch.priority >= t->dispatch.priority) {
55388602Sjoe            t_loc = APR_RING_NEXT(t_loc, link);
55488602Sjoe        }
55588602Sjoe    }
55688602Sjoe    APR_RING_INSERT_BEFORE(t_loc, t, link);
55788602Sjoe    if (!push) {
55861338Sache        if (t_loc == me->task_idx[TASK_PRIORITY_SEG(t)]) {
55988602Sjoe            me->task_idx[TASK_PRIORITY_SEG(t)] = t;
56061337Sache        }
56188602Sjoe    }
56237932Shoek
56337932Shoek  FINAL_EXIT:
56437932Shoek    me->task_cnt++;
56537932Shoek    if (me->task_cnt > me->tasks_high)
56637932Shoek        me->tasks_high = me->task_cnt;
56738026Shoek    if (0 == me->thd_cnt || (0 == me->idle_cnt && me->thd_cnt < me->thd_max &&
56886922Sgreen                             me->task_cnt > me->threshold)) {
56938026Shoek        rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool);
57086922Sgreen        if (APR_SUCCESS == rv) {
57186922Sgreen            ++me->thd_cnt;
5721556Srgrimes            if (me->thd_cnt > me->thd_high)
5737165Sjoerg                me->thd_high = me->thd_cnt;
5741556Srgrimes        }
5751556Srgrimes    }
5761556Srgrimes
5771556Srgrimes    apr_thread_cond_signal(me->cond);
5781556Srgrimes    apr_thread_mutex_unlock(me->lock);
57917852Sadam
5801556Srgrimes    return rv;
5811556Srgrimes}
5821556Srgrimes
5831556SrgrimesAPU_DECLARE(apr_status_t) apr_thread_pool_push(apr_thread_pool_t *me,
5841556Srgrimes                                               apr_thread_start_t func,
5851556Srgrimes                                               void *param,
5861556Srgrimes                                               apr_byte_t priority,
5871556Srgrimes                                               void *owner)
5881556Srgrimes{
5891556Srgrimes    return add_task(me, func, param, priority, 1, owner);
5901556Srgrimes}
5911556Srgrimes
5921556SrgrimesAPU_DECLARE(apr_status_t) apr_thread_pool_schedule(apr_thread_pool_t *me,
5931556Srgrimes                                                   apr_thread_start_t func,
5941556Srgrimes                                                   void *param,
5951556Srgrimes                                                   apr_interval_time_t time,
5961556Srgrimes                                                   void *owner)
5971556Srgrimes{
5981556Srgrimes    return schedule_task(me, func, param, owner, time);
5991556Srgrimes}
6001556Srgrimes
60135417SdesAPU_DECLARE(apr_status_t) apr_thread_pool_top(apr_thread_pool_t *me,
60288602Sjoe                                              apr_thread_start_t func,
60388602Sjoe                                              void *param,
60488602Sjoe                                              apr_byte_t priority,
60588602Sjoe                                              void *owner)
60637932Shoek{
6071556Srgrimes    return add_task(me, func, param, priority, 0, owner);
6081556Srgrimes}
6091556Srgrimes
6101556Srgrimesstatic apr_status_t remove_scheduled_tasks(apr_thread_pool_t *me,
6111556Srgrimes                                           void *owner)
6121556Srgrimes{
6131556Srgrimes    apr_thread_pool_task_t *t_loc;
6141556Srgrimes    apr_thread_pool_task_t *next;
6151556Srgrimes
6161556Srgrimes    t_loc = APR_RING_FIRST(me->scheduled_tasks);
6171556Srgrimes    while (t_loc !=
6181556Srgrimes           APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
6191556Srgrimes                             link)) {
62049373Ssheldonh        next = APR_RING_NEXT(t_loc, link);
62149373Ssheldonh        /* if this is the owner remove it */
62249373Ssheldonh        if (t_loc->owner == owner) {
62349373Ssheldonh            --me->scheduled_task_cnt;
62449373Ssheldonh            APR_RING_REMOVE(t_loc, link);
62549373Ssheldonh        }
62649373Ssheldonh        t_loc = next;
62749373Ssheldonh    }
62849373Ssheldonh    return APR_SUCCESS;
62949373Ssheldonh}
63049373Ssheldonh
6311556Srgrimesstatic apr_status_t remove_tasks(apr_thread_pool_t *me, void *owner)
6321556Srgrimes{
6331556Srgrimes    apr_thread_pool_task_t *t_loc;
6341556Srgrimes    apr_thread_pool_task_t *next;
6351556Srgrimes    int seg;
63661749Sjoe
63761749Sjoe    t_loc = APR_RING_FIRST(me->tasks);
63861749Sjoe    while (t_loc != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link)) {
63961749Sjoe        next = APR_RING_NEXT(t_loc, link);
64061749Sjoe        if (t_loc->owner == owner) {
64161749Sjoe            --me->task_cnt;
64261749Sjoe            seg = TASK_PRIORITY_SEG(t_loc);
6431556Srgrimes            if (t_loc == me->task_idx[seg]) {
6441556Srgrimes                me->task_idx[seg] = APR_RING_NEXT(t_loc, link);
6451556Srgrimes                if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks,
6461556Srgrimes                                                           apr_thread_pool_task,
64786922Sgreen                                                           link)
64886922Sgreen                    || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) {
64986922Sgreen                    me->task_idx[seg] = NULL;
65086922Sgreen                }
65186922Sgreen            }
65286922Sgreen            APR_RING_REMOVE(t_loc, link);
65386922Sgreen        }
6541556Srgrimes        t_loc = next;
65586922Sgreen    }
65686922Sgreen    return APR_SUCCESS;
6571556Srgrimes}
6581556Srgrimes
6591556Srgrimesstatic void wait_on_busy_threads(apr_thread_pool_t *me, void *owner)
6601556Srgrimes{
6611556Srgrimes#ifndef NDEBUG
6621556Srgrimes    apr_os_thread_t *os_thread;
6631556Srgrimes#endif
6641556Srgrimes    struct apr_thread_list_elt *elt;
6651556Srgrimes    apr_thread_mutex_lock(me->lock);
6661556Srgrimes    elt = APR_RING_FIRST(me->busy_thds);
6671556Srgrimes    while (elt != APR_RING_SENTINEL(me->busy_thds, apr_thread_list_elt, link)) {
6681556Srgrimes        if (elt->current_owner != owner) {
6691556Srgrimes            elt = APR_RING_NEXT(elt, link);
67088602Sjoe            continue;
67161749Sjoe        }
6721556Srgrimes#ifndef NDEBUG
67386922Sgreen        /* make sure the thread is not the one calling tasks_cancel */
67486922Sgreen        apr_os_thread_get(&os_thread, elt->thd);
67586922Sgreen#ifdef WIN32
67686922Sgreen        /* hack for apr win32 bug */
67786922Sgreen        assert(!apr_os_thread_equal(apr_os_thread_current(), os_thread));
67886922Sgreen#else
6791556Srgrimes        assert(!apr_os_thread_equal(apr_os_thread_current(), *os_thread));
6801556Srgrimes#endif
6811556Srgrimes#endif
6821556Srgrimes        while (elt->current_owner == owner) {
6831556Srgrimes            apr_thread_mutex_unlock(me->lock);
6841556Srgrimes            apr_sleep(200 * 1000);
6851556Srgrimes            apr_thread_mutex_lock(me->lock);
6861556Srgrimes        }
6871556Srgrimes        elt = APR_RING_FIRST(me->busy_thds);
6881556Srgrimes    }
6891556Srgrimes    apr_thread_mutex_unlock(me->lock);
6901556Srgrimes    return;
6911556Srgrimes}
6921556Srgrimes
6931556SrgrimesAPU_DECLARE(apr_status_t) apr_thread_pool_tasks_cancel(apr_thread_pool_t *me,
6941556Srgrimes                                                       void *owner)
6951556Srgrimes{
6961556Srgrimes    apr_status_t rv = APR_SUCCESS;
69786922Sgreen
6981556Srgrimes    apr_thread_mutex_lock(me->lock);
6991556Srgrimes    if (me->task_cnt > 0) {
7001556Srgrimes        rv = remove_tasks(me, owner);
7011556Srgrimes    }
7021556Srgrimes    if (me->scheduled_task_cnt > 0) {
7031556Srgrimes        rv = remove_scheduled_tasks(me, owner);
7041556Srgrimes    }
7051556Srgrimes    apr_thread_mutex_unlock(me->lock);
7061556Srgrimes    wait_on_busy_threads(me, owner);
7071556Srgrimes
7081556Srgrimes    return rv;
7091556Srgrimes}
7101556Srgrimes
7111556SrgrimesAPU_DECLARE(apr_size_t) apr_thread_pool_tasks_count(apr_thread_pool_t *me)
7121556Srgrimes{
71386922Sgreen    return me->task_cnt;
71486922Sgreen}
7151556Srgrimes
7161556SrgrimesAPU_DECLARE(apr_size_t)
7171556Srgrimes    apr_thread_pool_scheduled_tasks_count(apr_thread_pool_t *me)
7181556Srgrimes{
7191556Srgrimes    return me->scheduled_task_cnt;
7201556Srgrimes}
7211556Srgrimes
7221556SrgrimesAPU_DECLARE(apr_size_t) apr_thread_pool_threads_count(apr_thread_pool_t *me)
7231556Srgrimes{
7241556Srgrimes    return me->thd_cnt;
7251556Srgrimes}
7261556Srgrimes
7271556SrgrimesAPU_DECLARE(apr_size_t) apr_thread_pool_busy_count(apr_thread_pool_t *me)
7281556Srgrimes{
7291556Srgrimes    return me->thd_cnt - me->idle_cnt;
7301556Srgrimes}
7311556Srgrimes
7321556SrgrimesAPU_DECLARE(apr_size_t) apr_thread_pool_idle_count(apr_thread_pool_t *me)
7331556Srgrimes{
7341556Srgrimes    return me->idle_cnt;
7351556Srgrimes}
7361556Srgrimes
7371556SrgrimesAPU_DECLARE(apr_size_t)
7381556Srgrimes    apr_thread_pool_tasks_run_count(apr_thread_pool_t * me)
73929560Ssef{
74029560Ssef    return me->tasks_run;
7411556Srgrimes}
7421556Srgrimes
74329560SsefAPU_DECLARE(apr_size_t)
7441556Srgrimes    apr_thread_pool_tasks_high_count(apr_thread_pool_t * me)
74529560Ssef{
74629560Ssef    return me->tasks_high;
7471556Srgrimes}
74837932Shoek
74937932ShoekAPU_DECLARE(apr_size_t)
75037932Shoek    apr_thread_pool_threads_high_count(apr_thread_pool_t * me)
75137932Shoek{
75237932Shoek    return me->thd_high;
75337932Shoek}
75437932Shoek
75537932ShoekAPU_DECLARE(apr_size_t)
75637932Shoek    apr_thread_pool_threads_idle_timeout_count(apr_thread_pool_t * me)
75737932Shoek{
75837932Shoek    return me->thd_timed_out;
75937932Shoek}
76037932Shoek
76137932Shoek
76237932ShoekAPU_DECLARE(apr_size_t) apr_thread_pool_idle_max_get(apr_thread_pool_t *me)
76337932Shoek{
76437932Shoek    return me->idle_max;
76537932Shoek}
76637932Shoek
76737932ShoekAPU_DECLARE(apr_interval_time_t)
768    apr_thread_pool_idle_wait_get(apr_thread_pool_t * me)
769{
770    return me->idle_wait;
771}
772
773/*
774 * This function stop extra idle threads to the cnt.
775 * @return the number of threads stopped
776 * NOTE: There could be busy threads become idle during this function
777 */
778static struct apr_thread_list_elt *trim_threads(apr_thread_pool_t *me,
779                                                apr_size_t *cnt, int idle)
780{
781    struct apr_thread_list *thds;
782    apr_size_t n, n_dbg, i;
783    struct apr_thread_list_elt *head, *tail, *elt;
784
785    apr_thread_mutex_lock(me->lock);
786    if (idle) {
787        thds = me->idle_thds;
788        n = me->idle_cnt;
789    }
790    else {
791        thds = me->busy_thds;
792        n = me->thd_cnt - me->idle_cnt;
793    }
794    if (n <= *cnt) {
795        apr_thread_mutex_unlock(me->lock);
796        *cnt = 0;
797        return NULL;
798    }
799    n -= *cnt;
800
801    head = APR_RING_FIRST(thds);
802    for (i = 0; i < *cnt; i++) {
803        head = APR_RING_NEXT(head, link);
804    }
805    tail = APR_RING_LAST(thds);
806    if (idle) {
807        APR_RING_UNSPLICE(head, tail, link);
808        me->idle_cnt = *cnt;
809    }
810
811    n_dbg = 0;
812    for (elt = head; elt != tail; elt = APR_RING_NEXT(elt, link)) {
813        elt->state = TH_STOP;
814        n_dbg++;
815    }
816    elt->state = TH_STOP;
817    n_dbg++;
818    assert(n == n_dbg);
819    *cnt = n;
820
821    apr_thread_mutex_unlock(me->lock);
822
823    APR_RING_PREV(head, link) = NULL;
824    APR_RING_NEXT(tail, link) = NULL;
825    return head;
826}
827
828static apr_size_t trim_idle_threads(apr_thread_pool_t *me, apr_size_t cnt)
829{
830    apr_size_t n_dbg;
831    struct apr_thread_list_elt *elt, *head, *tail;
832    apr_status_t rv;
833
834    elt = trim_threads(me, &cnt, 1);
835
836    apr_thread_mutex_lock(me->lock);
837    apr_thread_cond_broadcast(me->cond);
838    apr_thread_mutex_unlock(me->lock);
839
840    n_dbg = 0;
841    if (NULL != (head = elt)) {
842        while (elt) {
843            tail = elt;
844            apr_thread_join(&rv, elt->thd);
845            elt = APR_RING_NEXT(elt, link);
846            ++n_dbg;
847        }
848        apr_thread_mutex_lock(me->lock);
849        APR_RING_SPLICE_TAIL(me->recycled_thds, head, tail,
850                             apr_thread_list_elt, link);
851        apr_thread_mutex_unlock(me->lock);
852    }
853    assert(cnt == n_dbg);
854
855    return cnt;
856}
857
858/* don't join on busy threads for performance reasons, who knows how long will
859 * the task takes to perform
860 */
861static apr_size_t trim_busy_threads(apr_thread_pool_t *me, apr_size_t cnt)
862{
863    trim_threads(me, &cnt, 0);
864    return cnt;
865}
866
867APU_DECLARE(apr_size_t) apr_thread_pool_idle_max_set(apr_thread_pool_t *me,
868                                                     apr_size_t cnt)
869{
870    me->idle_max = cnt;
871    cnt = trim_idle_threads(me, cnt);
872    return cnt;
873}
874
875APU_DECLARE(apr_interval_time_t)
876    apr_thread_pool_idle_wait_set(apr_thread_pool_t * me,
877                                  apr_interval_time_t timeout)
878{
879    apr_interval_time_t oldtime;
880
881    oldtime = me->idle_wait;
882    me->idle_wait = timeout;
883
884    return oldtime;
885}
886
887APU_DECLARE(apr_size_t) apr_thread_pool_thread_max_get(apr_thread_pool_t *me)
888{
889    return me->thd_max;
890}
891
892/*
893 * This function stop extra working threads to the new limit.
894 * NOTE: There could be busy threads become idle during this function
895 */
896APU_DECLARE(apr_size_t) apr_thread_pool_thread_max_set(apr_thread_pool_t *me,
897                                                       apr_size_t cnt)
898{
899    unsigned int n;
900
901    me->thd_max = cnt;
902    if (0 == cnt || me->thd_cnt <= cnt) {
903        return 0;
904    }
905
906    n = me->thd_cnt - cnt;
907    if (n >= me->idle_cnt) {
908        trim_busy_threads(me, n - me->idle_cnt);
909        trim_idle_threads(me, 0);
910    }
911    else {
912        trim_idle_threads(me, me->idle_cnt - n);
913    }
914    return n;
915}
916
917APU_DECLARE(apr_size_t) apr_thread_pool_threshold_get(apr_thread_pool_t *me)
918{
919    return me->threshold;
920}
921
922APU_DECLARE(apr_size_t) apr_thread_pool_threshold_set(apr_thread_pool_t *me,
923                                                      apr_size_t val)
924{
925    apr_size_t ov;
926
927    ov = me->threshold;
928    me->threshold = val;
929    return ov;
930}
931
932APU_DECLARE(apr_status_t) apr_thread_pool_task_owner_get(apr_thread_t *thd,
933                                                         void **owner)
934{
935    apr_status_t rv;
936    apr_thread_pool_task_t *task;
937    void *data;
938
939    rv = apr_thread_data_get(&data, "apr_thread_pool_task", thd);
940    if (rv != APR_SUCCESS) {
941        return rv;
942    }
943
944    task = data;
945    if (!task) {
946        *owner = NULL;
947        return APR_BADARG;
948    }
949
950    *owner = task->owner;
951    return APR_SUCCESS;
952}
953
954#endif /* APR_HAS_THREADS */
955
956/* vim: set ts=4 sw=4 et cin tw=80: */
957