11556Srgrimes/* 21556Srgrimes * Licensed to the Apache Software Foundation (ASF) under one or more 31556Srgrimes * contributor license agreements. See the NOTICE file distributed 41556Srgrimes * with this work for additional information regarding copyright 51556Srgrimes * ownership. The ASF licenses this file to you under the Apache 61556Srgrimes * License, Version 2.0 (the "License"); you may not use this file 71556Srgrimes * except in compliance with the License. You may obtain a copy of 81556Srgrimes * the License at 91556Srgrimes * 101556Srgrimes * http://www.apache.org/licenses/LICENSE-2.0 111556Srgrimes * 121556Srgrimes * Unless required by applicable law or agreed to in writing, software 131556Srgrimes * distributed under the License is distributed on an "AS IS" BASIS, 141556Srgrimes * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 151556Srgrimes * implied. See the License for the specific language governing 161556Srgrimes * permissions and limitations under the License. 171556Srgrimes */ 181556Srgrimes 191556Srgrimes#include <assert.h> 201556Srgrimes#include "apr_thread_pool.h" 211556Srgrimes#include "apr_ring.h" 221556Srgrimes#include "apr_thread_cond.h" 231556Srgrimes#include "apr_portable.h" 241556Srgrimes 251556Srgrimes#if APR_HAS_THREADS 261556Srgrimes 271556Srgrimes#define TASK_PRIORITY_SEGS 4 281556Srgrimes#define TASK_PRIORITY_SEG(x) (((x)->dispatch.priority & 0xFF) / 64) 291556Srgrimes 301556Srgrimestypedef struct apr_thread_pool_task 311556Srgrimes{ 321556Srgrimes APR_RING_ENTRY(apr_thread_pool_task) link; 331556Srgrimes apr_thread_start_t func; 341556Srgrimes void *param; 351556Srgrimes void *owner; 361556Srgrimes union 371556Srgrimes { 3827958Ssteve apr_byte_t priority; 391556Srgrimes apr_time_t time; 401556Srgrimes } dispatch; 4127967Ssteve} apr_thread_pool_task_t; 4227967Ssteve 4327967SsteveAPR_RING_HEAD(apr_thread_pool_tasks, apr_thread_pool_task); 4427967Ssteve 4527967Sstevestruct apr_thread_list_elt 4627967Ssteve{ 4727958Ssteve APR_RING_ENTRY(apr_thread_list_elt) link; 4850471Speter apr_thread_t *thd; 4927967Ssteve volatile void *current_owner; 501556Srgrimes volatile enum { TH_RUN, TH_STOP, TH_PROBATION } state; 511556Srgrimes}; 521556Srgrimes 531556SrgrimesAPR_RING_HEAD(apr_thread_list, apr_thread_list_elt); 541556Srgrimes 551556Srgrimesstruct apr_thread_pool 561556Srgrimes{ 571556Srgrimes apr_pool_t *pool; 581556Srgrimes volatile apr_size_t thd_max; 591556Srgrimes volatile apr_size_t idle_max; 6050050Ssheldonh volatile apr_interval_time_t idle_wait; 6150050Ssheldonh volatile apr_size_t thd_cnt; 621556Srgrimes volatile apr_size_t idle_cnt; 631556Srgrimes volatile apr_size_t task_cnt; 641556Srgrimes volatile apr_size_t scheduled_task_cnt; 6561289Sache volatile apr_size_t threshold; 6661268Sjoe volatile apr_size_t tasks_run; 6761289Sache volatile apr_size_t tasks_high; 6861289Sache volatile apr_size_t thd_high; 6961268Sjoe volatile apr_size_t thd_timed_out; 701556Srgrimes struct apr_thread_pool_tasks *tasks; 711556Srgrimes struct apr_thread_pool_tasks *scheduled_tasks; 721556Srgrimes struct apr_thread_list *busy_thds; 7386922Sgreen struct apr_thread_list *idle_thds; 741556Srgrimes apr_thread_mutex_t *lock; 7550050Ssheldonh apr_thread_cond_t *cond; 7650050Ssheldonh volatile int terminated; 7750050Ssheldonh struct apr_thread_pool_tasks *recycled_tasks; 7850050Ssheldonh struct apr_thread_list *recycled_thds; 7950050Ssheldonh apr_thread_pool_task_t *task_idx[TASK_PRIORITY_SEGS]; 8050051Ssheldonh}; 8150050Ssheldonh 821556Srgrimesstatic apr_status_t thread_pool_construct(apr_thread_pool_t * me, 8337932Shoek apr_size_t init_threads, 841556Srgrimes apr_size_t max_threads) 851556Srgrimes{ 861556Srgrimes apr_status_t rv; 871556Srgrimes int i; 881556Srgrimes 891556Srgrimes me->thd_max = max_threads; 901556Srgrimes me->idle_max = init_threads; 911556Srgrimes me->threshold = init_threads / 2; 921556Srgrimes rv = apr_thread_mutex_create(&me->lock, APR_THREAD_MUTEX_NESTED, 931556Srgrimes me->pool); 941556Srgrimes if (APR_SUCCESS != rv) { 951556Srgrimes return rv; 961556Srgrimes } 9788591Sjoe rv = apr_thread_cond_create(&me->cond, me->pool); 981556Srgrimes if (APR_SUCCESS != rv) { 992889Spst apr_thread_mutex_destroy(me->lock); 1001556Srgrimes return rv; 1011556Srgrimes } 1021556Srgrimes me->tasks = apr_palloc(me->pool, sizeof(*me->tasks)); 1031556Srgrimes if (!me->tasks) { 1041556Srgrimes goto CATCH_ENOMEM; 10550050Ssheldonh } 10649373Ssheldonh APR_RING_INIT(me->tasks, apr_thread_pool_task, link); 10735373Sdes me->scheduled_tasks = apr_palloc(me->pool, sizeof(*me->scheduled_tasks)); 10835417Sdes if (!me->scheduled_tasks) { 1091556Srgrimes goto CATCH_ENOMEM; 1101556Srgrimes } 1111556Srgrimes APR_RING_INIT(me->scheduled_tasks, apr_thread_pool_task, link); 1121556Srgrimes me->recycled_tasks = apr_palloc(me->pool, sizeof(*me->recycled_tasks)); 1131556Srgrimes if (!me->recycled_tasks) { 1141556Srgrimes goto CATCH_ENOMEM; 1151556Srgrimes } 1161556Srgrimes APR_RING_INIT(me->recycled_tasks, apr_thread_pool_task, link); 11720417Ssteve me->busy_thds = apr_palloc(me->pool, sizeof(*me->busy_thds)); 11886922Sgreen if (!me->busy_thds) { 11961268Sjoe goto CATCH_ENOMEM; 12061178Sjoe } 12161271Sjoe APR_RING_INIT(me->busy_thds, apr_thread_list_elt, link); 12261271Sjoe me->idle_thds = apr_palloc(me->pool, sizeof(*me->idle_thds)); 12361271Sjoe if (!me->idle_thds) { 12461271Sjoe goto CATCH_ENOMEM; 12588583Sjoe } 12688583Sjoe APR_RING_INIT(me->idle_thds, apr_thread_list_elt, link); 12761268Sjoe me->recycled_thds = apr_palloc(me->pool, sizeof(*me->recycled_thds)); 1281556Srgrimes if (!me->recycled_thds) { 12917852Sadam goto CATCH_ENOMEM; 13017852Sadam } 1311556Srgrimes APR_RING_INIT(me->recycled_thds, apr_thread_list_elt, link); 1321556Srgrimes me->thd_cnt = me->idle_cnt = me->task_cnt = me->scheduled_task_cnt = 0; 1331556Srgrimes me->tasks_run = me->tasks_high = me->thd_high = me->thd_timed_out = 0; 1341556Srgrimes me->idle_wait = 0; 1351556Srgrimes me->terminated = 0; 13688602Sjoe for (i = 0; i < TASK_PRIORITY_SEGS; i++) { 1371556Srgrimes me->task_idx[i] = NULL; 1381556Srgrimes } 1391556Srgrimes goto FINAL_EXIT; 14061271Sjoe CATCH_ENOMEM: 14188602Sjoe rv = APR_ENOMEM; 14288602Sjoe apr_thread_mutex_destroy(me->lock); 14361271Sjoe apr_thread_cond_destroy(me->cond); 14461271Sjoe FINAL_EXIT: 14561271Sjoe return rv; 14688602Sjoe} 14711808Sache 1481556Srgrimes/* 1491556Srgrimes * NOTE: This function is not thread safe by itself. Caller should hold the lock 1501556Srgrimes */ 1511556Srgrimesstatic apr_thread_pool_task_t *pop_task(apr_thread_pool_t * me) 1521556Srgrimes{ 1531556Srgrimes apr_thread_pool_task_t *task = NULL; 15488602Sjoe int seg; 1551556Srgrimes 1561556Srgrimes /* check for scheduled tasks */ 1575158Sjoerg if (me->scheduled_task_cnt > 0) { 1581556Srgrimes task = APR_RING_FIRST(me->scheduled_tasks); 1595158Sjoerg assert(task != NULL); 1607165Sjoerg assert(task != 1615158Sjoerg APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task, 1625158Sjoerg link)); 1631556Srgrimes /* if it's time */ 1641556Srgrimes if (task->dispatch.time <= apr_time_now()) { 1651556Srgrimes --me->scheduled_task_cnt; 1661556Srgrimes APR_RING_REMOVE(task, link); 1671556Srgrimes return task; 1681556Srgrimes } 16988603Sjoe } 1701556Srgrimes /* check for normal tasks if we're not returning a scheduled task */ 1711556Srgrimes if (me->task_cnt == 0) { 1721556Srgrimes return NULL; 1731556Srgrimes } 1741556Srgrimes 1751556Srgrimes task = APR_RING_FIRST(me->tasks); 1761556Srgrimes assert(task != NULL); 1771556Srgrimes assert(task != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link)); 1781556Srgrimes --me->task_cnt; 17935417Sdes seg = TASK_PRIORITY_SEG(task); 18035417Sdes if (task == me->task_idx[seg]) { 18135417Sdes me->task_idx[seg] = APR_RING_NEXT(task, link); 18288602Sjoe if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks, 18335417Sdes apr_thread_pool_task, link) 1841556Srgrimes || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) { 1851556Srgrimes me->task_idx[seg] = NULL; 1861556Srgrimes } 1871556Srgrimes } 1881556Srgrimes APR_RING_REMOVE(task, link); 1891556Srgrimes return task; 1901556Srgrimes} 1911556Srgrimes 1921556Srgrimesstatic apr_interval_time_t waiting_time(apr_thread_pool_t * me) 1931556Srgrimes{ 1941556Srgrimes apr_thread_pool_task_t *task = NULL; 1951556Srgrimes 1961556Srgrimes task = APR_RING_FIRST(me->scheduled_tasks); 1971556Srgrimes assert(task != NULL); 1981556Srgrimes assert(task != 1991556Srgrimes APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task, 2001556Srgrimes link)); 2011556Srgrimes return task->dispatch.time - apr_time_now(); 2021556Srgrimes} 2031556Srgrimes 20435426Sdes/* 20588602Sjoe * NOTE: This function is not thread safe by itself. Caller should hold the lock 20635426Sdes */ 20761178Sjoestatic struct apr_thread_list_elt *elt_new(apr_thread_pool_t * me, 20864568Sjoe apr_thread_t * t) 20961178Sjoe{ 2101556Srgrimes struct apr_thread_list_elt *elt; 2111556Srgrimes 2121556Srgrimes if (APR_RING_EMPTY(me->recycled_thds, apr_thread_list_elt, link)) { 2131556Srgrimes elt = apr_pcalloc(me->pool, sizeof(*elt)); 21435426Sdes if (NULL == elt) { 21588602Sjoe return NULL; 21635426Sdes } 21735426Sdes } 21835426Sdes else { 2191556Srgrimes elt = APR_RING_FIRST(me->recycled_thds); 2201556Srgrimes APR_RING_REMOVE(elt, link); 2211556Srgrimes } 2221556Srgrimes 2231556Srgrimes APR_RING_ELEM_INIT(elt, link); 2241556Srgrimes elt->thd = t; 2251556Srgrimes elt->current_owner = NULL; 2261556Srgrimes elt->state = TH_RUN; 2271556Srgrimes return elt; 2281556Srgrimes} 2291556Srgrimes 2301556Srgrimes/* 2311556Srgrimes * The worker thread function. Take a task from the queue and perform it if 2321556Srgrimes * there is any. Otherwise, put itself into the idle thread list and waiting 2331556Srgrimes * for signal to wake up. 2341556Srgrimes * The thread terminate directly by detach and exit when it is asked to stop 2351556Srgrimes * after finishing a task. Otherwise, the thread should be in idle thread list 23688602Sjoe * and should be joined. 2371556Srgrimes */ 23888591Sjoestatic void *APR_THREAD_FUNC thread_pool_func(apr_thread_t * t, void *param) 23988591Sjoe{ 24088591Sjoe apr_thread_pool_t *me = param; 2411556Srgrimes apr_thread_pool_task_t *task = NULL; 2421556Srgrimes apr_interval_time_t wait; 2431556Srgrimes struct apr_thread_list_elt *elt; 2442889Spst 2452889Spst apr_thread_mutex_lock(me->lock); 2462889Spst elt = elt_new(me, t); 24749373Ssheldonh if (!elt) { 24849373Ssheldonh apr_thread_mutex_unlock(me->lock); 24949373Ssheldonh apr_thread_exit(t, APR_ENOMEM); 2501556Srgrimes } 2511556Srgrimes 2521556Srgrimes while (!me->terminated && elt->state != TH_STOP) { 2531556Srgrimes /* Test if not new element, it is awakened from idle */ 2541556Srgrimes if (APR_RING_NEXT(elt, link) != elt) { 25535373Sdes --me->idle_cnt; 25688602Sjoe APR_RING_REMOVE(elt, link); 2571556Srgrimes } 2581556Srgrimes 2591556Srgrimes APR_RING_INSERT_TAIL(me->busy_thds, elt, apr_thread_list_elt, link); 2601556Srgrimes task = pop_task(me); 2611556Srgrimes while (NULL != task && !me->terminated) { 2621556Srgrimes ++me->tasks_run; 2631556Srgrimes elt->current_owner = task->owner; 2641556Srgrimes apr_thread_mutex_unlock(me->lock); 2651556Srgrimes apr_thread_data_set(task, "apr_thread_pool_task", NULL, t); 2661556Srgrimes task->func(t, task->param); 2671556Srgrimes apr_thread_mutex_lock(me->lock); 2681556Srgrimes APR_RING_INSERT_TAIL(me->recycled_tasks, task, 2691556Srgrimes apr_thread_pool_task, link); 27020417Ssteve elt->current_owner = NULL; 27120417Ssteve if (TH_STOP == elt->state) { 27220417Ssteve break; 27335373Sdes } 27435373Sdes task = pop_task(me); 27588602Sjoe } 27635417Sdes assert(NULL == elt->current_owner); 27735373Sdes if (TH_STOP != elt->state) 27888603Sjoe APR_RING_REMOVE(elt, link); 27988603Sjoe 28088603Sjoe /* Test if a busy thread been asked to stop, which is not joinable */ 28188603Sjoe if ((me->idle_cnt >= me->idle_max 28288603Sjoe && !(me->scheduled_task_cnt && 0 >= me->idle_max) 28386922Sgreen && !me->idle_wait) 28486922Sgreen || me->terminated || elt->state != TH_RUN) { 28586922Sgreen --me->thd_cnt; 2861556Srgrimes if ((TH_PROBATION == elt->state) && me->idle_wait) 2871556Srgrimes ++me->thd_timed_out; 2881556Srgrimes APR_RING_INSERT_TAIL(me->recycled_thds, elt, 2891556Srgrimes apr_thread_list_elt, link); 2901556Srgrimes apr_thread_mutex_unlock(me->lock); 2911556Srgrimes apr_thread_detach(t); 2921556Srgrimes apr_thread_exit(t, APR_SUCCESS); 2931556Srgrimes return NULL; /* should not be here, safe net */ 29464568Sjoe } 29564568Sjoe 29664568Sjoe /* busy thread become idle */ 29764604Sjoe ++me->idle_cnt; 29864568Sjoe APR_RING_INSERT_TAIL(me->idle_thds, elt, apr_thread_list_elt, link); 29964568Sjoe 30064568Sjoe /* 30188583Sjoe * If there is a scheduled task, always scheduled to perform that task. 30288583Sjoe * Since there is no guarantee that current idle threads are scheduled 30364568Sjoe * for next scheduled task. 30464568Sjoe */ 30564568Sjoe if (me->scheduled_task_cnt) 30664568Sjoe wait = waiting_time(me); 30764568Sjoe else if (me->idle_cnt > me->idle_max) { 30864568Sjoe wait = me->idle_wait; 30964568Sjoe elt->state = TH_PROBATION; 31064568Sjoe } 31164568Sjoe else 31264568Sjoe wait = -1; 31364604Sjoe 31464604Sjoe if (wait >= 0) { 31564604Sjoe apr_thread_cond_timedwait(me->cond, me->lock, wait); 31664568Sjoe } 31764604Sjoe else { 31861289Sache apr_thread_cond_wait(me->cond, me->lock); 31961337Sache } 32061337Sache } 32161337Sache 32261337Sache /* idle thread been asked to stop, will be joined */ 32361337Sache --me->thd_cnt; 32461337Sache apr_thread_mutex_unlock(me->lock); 32588602Sjoe apr_thread_exit(t, APR_SUCCESS); 32688602Sjoe return NULL; /* should not be here, safe net */ 32761178Sjoe} 32861289Sache 32961268Sjoestatic apr_status_t thread_pool_cleanup(void *me) 33061178Sjoe{ 3311556Srgrimes apr_thread_pool_t *_myself = me; 3321556Srgrimes 33361178Sjoe _myself->terminated = 1; 33461178Sjoe apr_thread_pool_idle_max_set(_myself, 0); 3351556Srgrimes while (_myself->thd_cnt) { 33661178Sjoe apr_sleep(20 * 1000); /* spin lock with 20 ms */ 33761268Sjoe } 33861268Sjoe apr_thread_mutex_destroy(_myself->lock); 33961268Sjoe apr_thread_cond_destroy(_myself->cond); 34088602Sjoe return APR_SUCCESS; 3411556Srgrimes} 3421556Srgrimes 3431556SrgrimesAPU_DECLARE(apr_status_t) apr_thread_pool_create(apr_thread_pool_t ** me, 3441556Srgrimes apr_size_t init_threads, 3451556Srgrimes apr_size_t max_threads, 3461556Srgrimes apr_pool_t * pool) 3471556Srgrimes{ 3481556Srgrimes apr_thread_t *t; 3491556Srgrimes apr_status_t rv = APR_SUCCESS; 35020417Ssteve apr_thread_pool_t *tp; 35120417Ssteve 35220417Ssteve *me = NULL; 35320417Ssteve tp = apr_pcalloc(pool, sizeof(apr_thread_pool_t)); 35420417Ssteve 35520417Ssteve /* 35620417Ssteve * This pool will be used by different threads. As we cannot ensure that 35720417Ssteve * our caller won't use the pool without acquiring the mutex, we must 3581556Srgrimes * create a new sub pool. 3591556Srgrimes */ 3602889Spst rv = apr_pool_create(&tp->pool, pool); 3617282Sphk if (APR_SUCCESS != rv) 3627282Sphk return rv; 3637282Sphk rv = thread_pool_construct(tp, init_threads, max_threads); 3647282Sphk if (APR_SUCCESS != rv) 3657282Sphk return rv; 3661556Srgrimes apr_pool_pre_cleanup_register(tp->pool, tp, thread_pool_cleanup); 3671556Srgrimes 3681556Srgrimes while (init_threads) { 3691556Srgrimes /* Grab the mutex as apr_thread_create() and thread_pool_func() will 3701556Srgrimes * allocate from (*me)->pool. This is dangerous if there are multiple 3711556Srgrimes * initial threads to create. 3721556Srgrimes */ 3731556Srgrimes apr_thread_mutex_lock(tp->lock); 3741556Srgrimes rv = apr_thread_create(&t, NULL, thread_pool_func, tp, tp->pool); 37588602Sjoe apr_thread_mutex_unlock(tp->lock); 3761556Srgrimes if (APR_SUCCESS != rv) { 3771556Srgrimes break; 3781556Srgrimes } 3791556Srgrimes tp->thd_cnt++; 3801556Srgrimes if (tp->thd_cnt > tp->thd_high) { 3811556Srgrimes tp->thd_high = tp->thd_cnt; 3821556Srgrimes } 3831556Srgrimes --init_threads; 38488602Sjoe } 3851556Srgrimes 3861556Srgrimes if (rv == APR_SUCCESS) { 3871556Srgrimes *me = tp; 3881556Srgrimes } 3891556Srgrimes 3901556Srgrimes return rv; 3911556Srgrimes} 3921556Srgrimes 3931556SrgrimesAPU_DECLARE(apr_status_t) apr_thread_pool_destroy(apr_thread_pool_t * me) 3941556Srgrimes{ 3951556Srgrimes apr_pool_destroy(me->pool); 3961556Srgrimes return APR_SUCCESS; 3971556Srgrimes} 3981556Srgrimes 3991556Srgrimes/* 40017852Sadam * NOTE: This function is not thread safe by itself. Caller should hold the lock 4011556Srgrimes */ 4021556Srgrimesstatic apr_thread_pool_task_t *task_new(apr_thread_pool_t * me, 40388602Sjoe apr_thread_start_t func, 4041556Srgrimes void *param, apr_byte_t priority, 4051556Srgrimes void *owner, apr_time_t time) 4061556Srgrimes{ 4071556Srgrimes apr_thread_pool_task_t *t; 4081556Srgrimes 4091556Srgrimes if (APR_RING_EMPTY(me->recycled_tasks, apr_thread_pool_task, link)) { 4101556Srgrimes t = apr_pcalloc(me->pool, sizeof(*t)); 4111556Srgrimes if (NULL == t) { 4121556Srgrimes return NULL; 4131556Srgrimes } 4141556Srgrimes } 4151556Srgrimes else { 4161556Srgrimes t = APR_RING_FIRST(me->recycled_tasks); 4171556Srgrimes APR_RING_REMOVE(t, link); 4181556Srgrimes } 4191556Srgrimes 4201556Srgrimes APR_RING_ELEM_INIT(t, link); 4211556Srgrimes t->func = func; 4221556Srgrimes t->param = param; 4231556Srgrimes t->owner = owner; 4241556Srgrimes if (time > 0) { 4251556Srgrimes t->dispatch.time = apr_time_now() + time; 4261556Srgrimes } 4271556Srgrimes else { 4281556Srgrimes t->dispatch.priority = priority; 4291556Srgrimes } 4301556Srgrimes return t; 4311556Srgrimes} 4321556Srgrimes 4331556Srgrimes/* 4341556Srgrimes * Test it the task is the only one within the priority segment. 4351556Srgrimes * If it is not, return the first element with same or lower priority. 4361556Srgrimes * Otherwise, add the task into the queue and return NULL. 4371556Srgrimes * 4381556Srgrimes * NOTE: This function is not thread safe by itself. Caller should hold the lock 4391556Srgrimes */ 4401556Srgrimesstatic apr_thread_pool_task_t *add_if_empty(apr_thread_pool_t * me, 4411556Srgrimes apr_thread_pool_task_t * const t) 44217852Sadam{ 4431556Srgrimes int seg; 4441556Srgrimes int next; 4451556Srgrimes apr_thread_pool_task_t *t_next; 4461556Srgrimes 4471556Srgrimes seg = TASK_PRIORITY_SEG(t); 4481556Srgrimes if (me->task_idx[seg]) { 4491556Srgrimes assert(APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) != 4501556Srgrimes me->task_idx[seg]); 4511556Srgrimes t_next = me->task_idx[seg]; 4521556Srgrimes while (t_next->dispatch.priority > t->dispatch.priority) { 4531556Srgrimes t_next = APR_RING_NEXT(t_next, link); 4541556Srgrimes if (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) == 4551556Srgrimes t_next) { 4561556Srgrimes return t_next; 4571556Srgrimes } 4581556Srgrimes } 4591556Srgrimes return t_next; 4601556Srgrimes } 4611556Srgrimes 4621556Srgrimes for (next = seg - 1; next >= 0; next--) { 4631556Srgrimes if (me->task_idx[next]) { 4641556Srgrimes APR_RING_INSERT_BEFORE(me->task_idx[next], t, link); 4651556Srgrimes break; 4661556Srgrimes } 4671556Srgrimes } 4681556Srgrimes if (0 > next) { 4691556Srgrimes APR_RING_INSERT_TAIL(me->tasks, t, apr_thread_pool_task, link); 4701556Srgrimes } 4711556Srgrimes me->task_idx[seg] = t; 4721556Srgrimes return NULL; 4731556Srgrimes} 4741556Srgrimes 4751556Srgrimes/* 4761556Srgrimes* schedule a task to run in "time" microseconds. Find the spot in the ring where 4771556Srgrimes* the time fits. Adjust the short_time so the thread wakes up when the time is reached. 4781556Srgrimes*/ 4791556Srgrimesstatic apr_status_t schedule_task(apr_thread_pool_t *me, 4801556Srgrimes apr_thread_start_t func, void *param, 4811556Srgrimes void *owner, apr_interval_time_t time) 4821556Srgrimes{ 4831556Srgrimes apr_thread_pool_task_t *t; 4841556Srgrimes apr_thread_pool_task_t *t_loc; 48586922Sgreen apr_thread_t *thd; 48686922Sgreen apr_status_t rv = APR_SUCCESS; 48737932Shoek apr_thread_mutex_lock(me->lock); 4881556Srgrimes 48986922Sgreen t = task_new(me, func, param, 0, owner, time); 49050050Ssheldonh if (NULL == t) { 49150050Ssheldonh apr_thread_mutex_unlock(me->lock); 49250050Ssheldonh return APR_ENOMEM; 4931556Srgrimes } 4941556Srgrimes t_loc = APR_RING_FIRST(me->scheduled_tasks); 4951556Srgrimes while (NULL != t_loc) { 4961556Srgrimes /* if the time is less than the entry insert ahead of it */ 4971556Srgrimes if (t->dispatch.time < t_loc->dispatch.time) { 4981556Srgrimes ++me->scheduled_task_cnt; 49946684Skris APR_RING_INSERT_BEFORE(t_loc, t, link); 5001556Srgrimes break; 5011556Srgrimes } 5021556Srgrimes else { 5031556Srgrimes t_loc = APR_RING_NEXT(t_loc, link); 5041556Srgrimes if (t_loc == 5051556Srgrimes APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task, 50637932Shoek link)) { 50737932Shoek ++me->scheduled_task_cnt; 50837932Shoek APR_RING_INSERT_TAIL(me->scheduled_tasks, t, 50937932Shoek apr_thread_pool_task, link); 51037932Shoek break; 51137932Shoek } 51237932Shoek } 51337932Shoek } 51437932Shoek /* there should be at least one thread for scheduled tasks */ 51537932Shoek if (0 == me->thd_cnt) { 51637932Shoek rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool); 51737932Shoek if (APR_SUCCESS == rv) { 51837932Shoek ++me->thd_cnt; 51937932Shoek if (me->thd_cnt > me->thd_high) 52037932Shoek me->thd_high = me->thd_cnt; 52137932Shoek } 52237932Shoek } 52337932Shoek apr_thread_cond_signal(me->cond); 52437932Shoek apr_thread_mutex_unlock(me->lock); 52537932Shoek return rv; 52637932Shoek} 52737932Shoek 52837932Shoekstatic apr_status_t add_task(apr_thread_pool_t *me, apr_thread_start_t func, 52937932Shoek void *param, apr_byte_t priority, int push, 53037932Shoek void *owner) 53188602Sjoe{ 53288602Sjoe apr_thread_pool_task_t *t; 53337932Shoek apr_thread_pool_task_t *t_loc; 53437932Shoek apr_thread_t *thd; 53586922Sgreen apr_status_t rv = APR_SUCCESS; 53637932Shoek 53786922Sgreen apr_thread_mutex_lock(me->lock); 53837932Shoek 53937932Shoek t = task_new(me, func, param, priority, owner, 0); 54088602Sjoe if (NULL == t) { 54188602Sjoe apr_thread_mutex_unlock(me->lock); 54288602Sjoe return APR_ENOMEM; 54388602Sjoe } 54488602Sjoe 54588602Sjoe t_loc = add_if_empty(me, t); 54688602Sjoe if (NULL == t_loc) { 54788602Sjoe goto FINAL_EXIT; 54888602Sjoe } 54988602Sjoe 55088602Sjoe if (push) { 55188602Sjoe while (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) != 55288602Sjoe t_loc && t_loc->dispatch.priority >= t->dispatch.priority) { 55388602Sjoe t_loc = APR_RING_NEXT(t_loc, link); 55488602Sjoe } 55588602Sjoe } 55688602Sjoe APR_RING_INSERT_BEFORE(t_loc, t, link); 55788602Sjoe if (!push) { 55861338Sache if (t_loc == me->task_idx[TASK_PRIORITY_SEG(t)]) { 55988602Sjoe me->task_idx[TASK_PRIORITY_SEG(t)] = t; 56061337Sache } 56188602Sjoe } 56237932Shoek 56337932Shoek FINAL_EXIT: 56437932Shoek me->task_cnt++; 56537932Shoek if (me->task_cnt > me->tasks_high) 56637932Shoek me->tasks_high = me->task_cnt; 56738026Shoek if (0 == me->thd_cnt || (0 == me->idle_cnt && me->thd_cnt < me->thd_max && 56886922Sgreen me->task_cnt > me->threshold)) { 56938026Shoek rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool); 57086922Sgreen if (APR_SUCCESS == rv) { 57186922Sgreen ++me->thd_cnt; 5721556Srgrimes if (me->thd_cnt > me->thd_high) 5737165Sjoerg me->thd_high = me->thd_cnt; 5741556Srgrimes } 5751556Srgrimes } 5761556Srgrimes 5771556Srgrimes apr_thread_cond_signal(me->cond); 5781556Srgrimes apr_thread_mutex_unlock(me->lock); 57917852Sadam 5801556Srgrimes return rv; 5811556Srgrimes} 5821556Srgrimes 5831556SrgrimesAPU_DECLARE(apr_status_t) apr_thread_pool_push(apr_thread_pool_t *me, 5841556Srgrimes apr_thread_start_t func, 5851556Srgrimes void *param, 5861556Srgrimes apr_byte_t priority, 5871556Srgrimes void *owner) 5881556Srgrimes{ 5891556Srgrimes return add_task(me, func, param, priority, 1, owner); 5901556Srgrimes} 5911556Srgrimes 5921556SrgrimesAPU_DECLARE(apr_status_t) apr_thread_pool_schedule(apr_thread_pool_t *me, 5931556Srgrimes apr_thread_start_t func, 5941556Srgrimes void *param, 5951556Srgrimes apr_interval_time_t time, 5961556Srgrimes void *owner) 5971556Srgrimes{ 5981556Srgrimes return schedule_task(me, func, param, owner, time); 5991556Srgrimes} 6001556Srgrimes 60135417SdesAPU_DECLARE(apr_status_t) apr_thread_pool_top(apr_thread_pool_t *me, 60288602Sjoe apr_thread_start_t func, 60388602Sjoe void *param, 60488602Sjoe apr_byte_t priority, 60588602Sjoe void *owner) 60637932Shoek{ 6071556Srgrimes return add_task(me, func, param, priority, 0, owner); 6081556Srgrimes} 6091556Srgrimes 6101556Srgrimesstatic apr_status_t remove_scheduled_tasks(apr_thread_pool_t *me, 6111556Srgrimes void *owner) 6121556Srgrimes{ 6131556Srgrimes apr_thread_pool_task_t *t_loc; 6141556Srgrimes apr_thread_pool_task_t *next; 6151556Srgrimes 6161556Srgrimes t_loc = APR_RING_FIRST(me->scheduled_tasks); 6171556Srgrimes while (t_loc != 6181556Srgrimes APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task, 6191556Srgrimes link)) { 62049373Ssheldonh next = APR_RING_NEXT(t_loc, link); 62149373Ssheldonh /* if this is the owner remove it */ 62249373Ssheldonh if (t_loc->owner == owner) { 62349373Ssheldonh --me->scheduled_task_cnt; 62449373Ssheldonh APR_RING_REMOVE(t_loc, link); 62549373Ssheldonh } 62649373Ssheldonh t_loc = next; 62749373Ssheldonh } 62849373Ssheldonh return APR_SUCCESS; 62949373Ssheldonh} 63049373Ssheldonh 6311556Srgrimesstatic apr_status_t remove_tasks(apr_thread_pool_t *me, void *owner) 6321556Srgrimes{ 6331556Srgrimes apr_thread_pool_task_t *t_loc; 6341556Srgrimes apr_thread_pool_task_t *next; 6351556Srgrimes int seg; 63661749Sjoe 63761749Sjoe t_loc = APR_RING_FIRST(me->tasks); 63861749Sjoe while (t_loc != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link)) { 63961749Sjoe next = APR_RING_NEXT(t_loc, link); 64061749Sjoe if (t_loc->owner == owner) { 64161749Sjoe --me->task_cnt; 64261749Sjoe seg = TASK_PRIORITY_SEG(t_loc); 6431556Srgrimes if (t_loc == me->task_idx[seg]) { 6441556Srgrimes me->task_idx[seg] = APR_RING_NEXT(t_loc, link); 6451556Srgrimes if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks, 6461556Srgrimes apr_thread_pool_task, 64786922Sgreen link) 64886922Sgreen || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) { 64986922Sgreen me->task_idx[seg] = NULL; 65086922Sgreen } 65186922Sgreen } 65286922Sgreen APR_RING_REMOVE(t_loc, link); 65386922Sgreen } 6541556Srgrimes t_loc = next; 65586922Sgreen } 65686922Sgreen return APR_SUCCESS; 6571556Srgrimes} 6581556Srgrimes 6591556Srgrimesstatic void wait_on_busy_threads(apr_thread_pool_t *me, void *owner) 6601556Srgrimes{ 6611556Srgrimes#ifndef NDEBUG 6621556Srgrimes apr_os_thread_t *os_thread; 6631556Srgrimes#endif 6641556Srgrimes struct apr_thread_list_elt *elt; 6651556Srgrimes apr_thread_mutex_lock(me->lock); 6661556Srgrimes elt = APR_RING_FIRST(me->busy_thds); 6671556Srgrimes while (elt != APR_RING_SENTINEL(me->busy_thds, apr_thread_list_elt, link)) { 6681556Srgrimes if (elt->current_owner != owner) { 6691556Srgrimes elt = APR_RING_NEXT(elt, link); 67088602Sjoe continue; 67161749Sjoe } 6721556Srgrimes#ifndef NDEBUG 67386922Sgreen /* make sure the thread is not the one calling tasks_cancel */ 67486922Sgreen apr_os_thread_get(&os_thread, elt->thd); 67586922Sgreen#ifdef WIN32 67686922Sgreen /* hack for apr win32 bug */ 67786922Sgreen assert(!apr_os_thread_equal(apr_os_thread_current(), os_thread)); 67886922Sgreen#else 6791556Srgrimes assert(!apr_os_thread_equal(apr_os_thread_current(), *os_thread)); 6801556Srgrimes#endif 6811556Srgrimes#endif 6821556Srgrimes while (elt->current_owner == owner) { 6831556Srgrimes apr_thread_mutex_unlock(me->lock); 6841556Srgrimes apr_sleep(200 * 1000); 6851556Srgrimes apr_thread_mutex_lock(me->lock); 6861556Srgrimes } 6871556Srgrimes elt = APR_RING_FIRST(me->busy_thds); 6881556Srgrimes } 6891556Srgrimes apr_thread_mutex_unlock(me->lock); 6901556Srgrimes return; 6911556Srgrimes} 6921556Srgrimes 6931556SrgrimesAPU_DECLARE(apr_status_t) apr_thread_pool_tasks_cancel(apr_thread_pool_t *me, 6941556Srgrimes void *owner) 6951556Srgrimes{ 6961556Srgrimes apr_status_t rv = APR_SUCCESS; 69786922Sgreen 6981556Srgrimes apr_thread_mutex_lock(me->lock); 6991556Srgrimes if (me->task_cnt > 0) { 7001556Srgrimes rv = remove_tasks(me, owner); 7011556Srgrimes } 7021556Srgrimes if (me->scheduled_task_cnt > 0) { 7031556Srgrimes rv = remove_scheduled_tasks(me, owner); 7041556Srgrimes } 7051556Srgrimes apr_thread_mutex_unlock(me->lock); 7061556Srgrimes wait_on_busy_threads(me, owner); 7071556Srgrimes 7081556Srgrimes return rv; 7091556Srgrimes} 7101556Srgrimes 7111556SrgrimesAPU_DECLARE(apr_size_t) apr_thread_pool_tasks_count(apr_thread_pool_t *me) 7121556Srgrimes{ 71386922Sgreen return me->task_cnt; 71486922Sgreen} 7151556Srgrimes 7161556SrgrimesAPU_DECLARE(apr_size_t) 7171556Srgrimes apr_thread_pool_scheduled_tasks_count(apr_thread_pool_t *me) 7181556Srgrimes{ 7191556Srgrimes return me->scheduled_task_cnt; 7201556Srgrimes} 7211556Srgrimes 7221556SrgrimesAPU_DECLARE(apr_size_t) apr_thread_pool_threads_count(apr_thread_pool_t *me) 7231556Srgrimes{ 7241556Srgrimes return me->thd_cnt; 7251556Srgrimes} 7261556Srgrimes 7271556SrgrimesAPU_DECLARE(apr_size_t) apr_thread_pool_busy_count(apr_thread_pool_t *me) 7281556Srgrimes{ 7291556Srgrimes return me->thd_cnt - me->idle_cnt; 7301556Srgrimes} 7311556Srgrimes 7321556SrgrimesAPU_DECLARE(apr_size_t) apr_thread_pool_idle_count(apr_thread_pool_t *me) 7331556Srgrimes{ 7341556Srgrimes return me->idle_cnt; 7351556Srgrimes} 7361556Srgrimes 7371556SrgrimesAPU_DECLARE(apr_size_t) 7381556Srgrimes apr_thread_pool_tasks_run_count(apr_thread_pool_t * me) 73929560Ssef{ 74029560Ssef return me->tasks_run; 7411556Srgrimes} 7421556Srgrimes 74329560SsefAPU_DECLARE(apr_size_t) 7441556Srgrimes apr_thread_pool_tasks_high_count(apr_thread_pool_t * me) 74529560Ssef{ 74629560Ssef return me->tasks_high; 7471556Srgrimes} 74837932Shoek 74937932ShoekAPU_DECLARE(apr_size_t) 75037932Shoek apr_thread_pool_threads_high_count(apr_thread_pool_t * me) 75137932Shoek{ 75237932Shoek return me->thd_high; 75337932Shoek} 75437932Shoek 75537932ShoekAPU_DECLARE(apr_size_t) 75637932Shoek apr_thread_pool_threads_idle_timeout_count(apr_thread_pool_t * me) 75737932Shoek{ 75837932Shoek return me->thd_timed_out; 75937932Shoek} 76037932Shoek 76137932Shoek 76237932ShoekAPU_DECLARE(apr_size_t) apr_thread_pool_idle_max_get(apr_thread_pool_t *me) 76337932Shoek{ 76437932Shoek return me->idle_max; 76537932Shoek} 76637932Shoek 76737932ShoekAPU_DECLARE(apr_interval_time_t) 768 apr_thread_pool_idle_wait_get(apr_thread_pool_t * me) 769{ 770 return me->idle_wait; 771} 772 773/* 774 * This function stop extra idle threads to the cnt. 775 * @return the number of threads stopped 776 * NOTE: There could be busy threads become idle during this function 777 */ 778static struct apr_thread_list_elt *trim_threads(apr_thread_pool_t *me, 779 apr_size_t *cnt, int idle) 780{ 781 struct apr_thread_list *thds; 782 apr_size_t n, n_dbg, i; 783 struct apr_thread_list_elt *head, *tail, *elt; 784 785 apr_thread_mutex_lock(me->lock); 786 if (idle) { 787 thds = me->idle_thds; 788 n = me->idle_cnt; 789 } 790 else { 791 thds = me->busy_thds; 792 n = me->thd_cnt - me->idle_cnt; 793 } 794 if (n <= *cnt) { 795 apr_thread_mutex_unlock(me->lock); 796 *cnt = 0; 797 return NULL; 798 } 799 n -= *cnt; 800 801 head = APR_RING_FIRST(thds); 802 for (i = 0; i < *cnt; i++) { 803 head = APR_RING_NEXT(head, link); 804 } 805 tail = APR_RING_LAST(thds); 806 if (idle) { 807 APR_RING_UNSPLICE(head, tail, link); 808 me->idle_cnt = *cnt; 809 } 810 811 n_dbg = 0; 812 for (elt = head; elt != tail; elt = APR_RING_NEXT(elt, link)) { 813 elt->state = TH_STOP; 814 n_dbg++; 815 } 816 elt->state = TH_STOP; 817 n_dbg++; 818 assert(n == n_dbg); 819 *cnt = n; 820 821 apr_thread_mutex_unlock(me->lock); 822 823 APR_RING_PREV(head, link) = NULL; 824 APR_RING_NEXT(tail, link) = NULL; 825 return head; 826} 827 828static apr_size_t trim_idle_threads(apr_thread_pool_t *me, apr_size_t cnt) 829{ 830 apr_size_t n_dbg; 831 struct apr_thread_list_elt *elt, *head, *tail; 832 apr_status_t rv; 833 834 elt = trim_threads(me, &cnt, 1); 835 836 apr_thread_mutex_lock(me->lock); 837 apr_thread_cond_broadcast(me->cond); 838 apr_thread_mutex_unlock(me->lock); 839 840 n_dbg = 0; 841 if (NULL != (head = elt)) { 842 while (elt) { 843 tail = elt; 844 apr_thread_join(&rv, elt->thd); 845 elt = APR_RING_NEXT(elt, link); 846 ++n_dbg; 847 } 848 apr_thread_mutex_lock(me->lock); 849 APR_RING_SPLICE_TAIL(me->recycled_thds, head, tail, 850 apr_thread_list_elt, link); 851 apr_thread_mutex_unlock(me->lock); 852 } 853 assert(cnt == n_dbg); 854 855 return cnt; 856} 857 858/* don't join on busy threads for performance reasons, who knows how long will 859 * the task takes to perform 860 */ 861static apr_size_t trim_busy_threads(apr_thread_pool_t *me, apr_size_t cnt) 862{ 863 trim_threads(me, &cnt, 0); 864 return cnt; 865} 866 867APU_DECLARE(apr_size_t) apr_thread_pool_idle_max_set(apr_thread_pool_t *me, 868 apr_size_t cnt) 869{ 870 me->idle_max = cnt; 871 cnt = trim_idle_threads(me, cnt); 872 return cnt; 873} 874 875APU_DECLARE(apr_interval_time_t) 876 apr_thread_pool_idle_wait_set(apr_thread_pool_t * me, 877 apr_interval_time_t timeout) 878{ 879 apr_interval_time_t oldtime; 880 881 oldtime = me->idle_wait; 882 me->idle_wait = timeout; 883 884 return oldtime; 885} 886 887APU_DECLARE(apr_size_t) apr_thread_pool_thread_max_get(apr_thread_pool_t *me) 888{ 889 return me->thd_max; 890} 891 892/* 893 * This function stop extra working threads to the new limit. 894 * NOTE: There could be busy threads become idle during this function 895 */ 896APU_DECLARE(apr_size_t) apr_thread_pool_thread_max_set(apr_thread_pool_t *me, 897 apr_size_t cnt) 898{ 899 unsigned int n; 900 901 me->thd_max = cnt; 902 if (0 == cnt || me->thd_cnt <= cnt) { 903 return 0; 904 } 905 906 n = me->thd_cnt - cnt; 907 if (n >= me->idle_cnt) { 908 trim_busy_threads(me, n - me->idle_cnt); 909 trim_idle_threads(me, 0); 910 } 911 else { 912 trim_idle_threads(me, me->idle_cnt - n); 913 } 914 return n; 915} 916 917APU_DECLARE(apr_size_t) apr_thread_pool_threshold_get(apr_thread_pool_t *me) 918{ 919 return me->threshold; 920} 921 922APU_DECLARE(apr_size_t) apr_thread_pool_threshold_set(apr_thread_pool_t *me, 923 apr_size_t val) 924{ 925 apr_size_t ov; 926 927 ov = me->threshold; 928 me->threshold = val; 929 return ov; 930} 931 932APU_DECLARE(apr_status_t) apr_thread_pool_task_owner_get(apr_thread_t *thd, 933 void **owner) 934{ 935 apr_status_t rv; 936 apr_thread_pool_task_t *task; 937 void *data; 938 939 rv = apr_thread_data_get(&data, "apr_thread_pool_task", thd); 940 if (rv != APR_SUCCESS) { 941 return rv; 942 } 943 944 task = data; 945 if (!task) { 946 *owner = NULL; 947 return APR_BADARG; 948 } 949 950 *owner = task->owner; 951 return APR_SUCCESS; 952} 953 954#endif /* APR_HAS_THREADS */ 955 956/* vim: set ts=4 sw=4 et cin tw=80: */ 957