work_thread.c revision 294569
1238106Sdes/* 2238106Sdes * work_thread.c - threads implementation for blocking worker child. 3238106Sdes */ 4238106Sdes#include <config.h> 5238106Sdes#include "ntp_workimpl.h" 6238106Sdes 7238106Sdes#ifdef WORK_THREAD 8238106Sdes 9238106Sdes#include <stdio.h> 10238106Sdes#include <ctype.h> 11238106Sdes#include <signal.h> 12238106Sdes#ifndef SYS_WINNT 13238106Sdes#include <pthread.h> 14238106Sdes#endif 15238106Sdes 16238106Sdes#include "ntp_stdlib.h" 17238106Sdes#include "ntp_malloc.h" 18238106Sdes#include "ntp_syslog.h" 19238106Sdes#include "ntpd.h" 20238106Sdes#include "ntp_io.h" 21238106Sdes#include "ntp_assert.h" 22238106Sdes#include "ntp_unixtime.h" 23238106Sdes#include "timespecops.h" 24238106Sdes#include "ntp_worker.h" 25269257Sdes 26269257Sdes#define CHILD_EXIT_REQ ((blocking_pipe_header *)(intptr_t)-1) 27269257Sdes#define CHILD_GONE_RESP CHILD_EXIT_REQ 28269257Sdes/* Queue size increments: 29269257Sdes * The request queue grows a bit faster than the response queue -- the 30269257Sdes * deamon can push requests and pull results faster on avarage than the 31269257Sdes * worker can process requests and push results... If this really pays 32269257Sdes * off is debatable. 33269257Sdes */ 34269257Sdes#define WORKITEMS_ALLOC_INC 16 35238106Sdes#define RESPONSES_ALLOC_INC 4 36238106Sdes 37238106Sdes/* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we 38238106Sdes * set the maximum to 256kB. If the minimum goes below the 39285206Sdes * system-defined minimum stack size, we have to adjust accordingly. 40238106Sdes */ 41238106Sdes#ifndef THREAD_MINSTACKSIZE 42238106Sdes# define THREAD_MINSTACKSIZE (64U * 1024) 43238106Sdes#endif 44238106Sdes#ifndef __sun 45238106Sdes#if defined(PTHREAD_STACK_MIN) && THREAD_MINSTACKSIZE < PTHREAD_STACK_MIN 46238106Sdes# undef THREAD_MINSTACKSIZE 47238106Sdes# define THREAD_MINSTACKSIZE PTHREAD_STACK_MIN 48238106Sdes#endif 49291767Sdes#endif 50238106Sdes 51238106Sdes#ifndef THREAD_MAXSTACKSIZE 52238106Sdes# define THREAD_MAXSTACKSIZE (256U * 1024) 53238106Sdes#endif 54238106Sdes#if THREAD_MAXSTACKSIZE < THREAD_MINSTACKSIZE 55238106Sdes# undef THREAD_MAXSTACKSIZE 56238106Sdes# define THREAD_MAXSTACKSIZE THREAD_MINSTACKSIZE 57238106Sdes#endif 58238106Sdes 59238106Sdes 60255595Sdes#ifdef SYS_WINNT 61255595Sdes 62238106Sdes# define thread_exit(c) _endthreadex(c) 63238106Sdes# define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL) 64238106Sdesu_int WINAPI blocking_thread(void *); 65238106Sdesstatic BOOL same_os_sema(const sem_ref obj, void * osobj); 66238106Sdes 67238106Sdes#else 68238106Sdes 69238106Sdes# define thread_exit(c) pthread_exit((void*)(size_t)(c)) 70238106Sdes# define tickle_sem sem_post 71238106Sdesvoid * blocking_thread(void *); 72238106Sdesstatic void block_thread_signals(sigset_t *); 73238106Sdes 74238106Sdes#endif 75238106Sdes 76238106Sdes#ifdef WORK_PIPE 77238106Sdesaddremove_io_fd_func addremove_io_fd; 78238106Sdes#else 79238106Sdesaddremove_io_semaphore_func addremove_io_semaphore; 80238106Sdes#endif 81238106Sdes 82238106Sdesstatic void start_blocking_thread(blocking_child *); 83238106Sdesstatic void start_blocking_thread_internal(blocking_child *); 84238106Sdesstatic void prepare_child_sems(blocking_child *); 85238106Sdesstatic int wait_for_sem(sem_ref, struct timespec *); 86238106Sdesstatic int ensure_workitems_empty_slot(blocking_child *); 87238106Sdesstatic int ensure_workresp_empty_slot(blocking_child *); 88238106Sdesstatic int queue_req_pointer(blocking_child *, blocking_pipe_header *); 89238106Sdesstatic void cleanup_after_child(blocking_child *); 90238106Sdes 91238106Sdes 92238106Sdesvoid 93238106Sdesexit_worker( 94238106Sdes int exitcode 95238106Sdes ) 96238106Sdes{ 97238106Sdes thread_exit(exitcode); /* see #define thread_exit */ 98238106Sdes} 99238106Sdes 100238106Sdes/* -------------------------------------------------------------------- 101238106Sdes * sleep for a given time or until the wakup semaphore is tickled. 102238106Sdes */ 103238106Sdesint 104238106Sdesworker_sleep( 105238106Sdes blocking_child * c, 106238106Sdes time_t seconds 107238106Sdes ) 108238106Sdes{ 109238106Sdes struct timespec until; 110294190Sdes int rc; 111294190Sdes 112294190Sdes# ifdef HAVE_CLOCK_GETTIME 113294190Sdes if (0 != clock_gettime(CLOCK_REALTIME, &until)) { 114294190Sdes msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m"); 115294190Sdes return -1; 116294190Sdes } 117294190Sdes# else 118238106Sdes if (0 != getclock(TIMEOFDAY, &until)) { 119238106Sdes msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m"); 120238106Sdes return -1; 121238106Sdes } 122238106Sdes# endif 123238106Sdes until.tv_sec += seconds; 124238106Sdes rc = wait_for_sem(c->wake_scheduled_sleep, &until); 125238106Sdes if (0 == rc) 126238106Sdes return -1; 127294190Sdes if (-1 == rc && ETIMEDOUT == errno) 128294190Sdes return 0; 129294190Sdes msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m"); 130294190Sdes return -1; 131294190Sdes} 132294190Sdes 133294190Sdes 134294190Sdes/* -------------------------------------------------------------------- 135238106Sdes * Wake up a worker that takes a nap. 136238106Sdes */ 137238106Sdesvoid 138238106Sdesinterrupt_worker_sleep(void) 139238106Sdes{ 140238106Sdes u_int idx; 141238106Sdes blocking_child * c; 142238106Sdes 143238106Sdes for (idx = 0; idx < blocking_children_alloc; idx++) { 144238106Sdes c = blocking_children[idx]; 145238106Sdes if (NULL == c || NULL == c->wake_scheduled_sleep) 146238106Sdes continue; 147238106Sdes tickle_sem(c->wake_scheduled_sleep); 148238106Sdes } 149238106Sdes} 150238106Sdes 151238106Sdes/* -------------------------------------------------------------------- 152238106Sdes * Make sure there is an empty slot at the head of the request 153238106Sdes * queue. Tell if the queue is currently empty. 154238106Sdes */ 155238106Sdesstatic int 156238106Sdesensure_workitems_empty_slot( 157238106Sdes blocking_child *c 158238106Sdes ) 159238106Sdes{ 160238106Sdes /* 161 ** !!! PRECONDITION: caller holds access lock! 162 ** 163 ** This simply tries to increase the size of the buffer if it 164 ** becomes full. The resize operation does *not* maintain the 165 ** order of requests, but that should be irrelevant since the 166 ** processing is considered asynchronous anyway. 167 ** 168 ** Return if the buffer is currently empty. 169 */ 170 171 static const size_t each = 172 sizeof(blocking_children[0]->workitems[0]); 173 174 size_t new_alloc; 175 size_t slots_used; 176 size_t sidx; 177 178 slots_used = c->head_workitem - c->tail_workitem; 179 if (slots_used >= c->workitems_alloc) { 180 new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC; 181 c->workitems = erealloc(c->workitems, new_alloc * each); 182 for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx) 183 c->workitems[sidx] = NULL; 184 c->tail_workitem = 0; 185 c->head_workitem = c->workitems_alloc; 186 c->workitems_alloc = new_alloc; 187 } 188 INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]); 189 return (0 == slots_used); 190} 191 192/* -------------------------------------------------------------------- 193 * Make sure there is an empty slot at the head of the response 194 * queue. Tell if the queue is currently empty. 195 */ 196static int 197ensure_workresp_empty_slot( 198 blocking_child *c 199 ) 200{ 201 /* 202 ** !!! PRECONDITION: caller holds access lock! 203 ** 204 ** Works like the companion function above. 205 */ 206 207 static const size_t each = 208 sizeof(blocking_children[0]->responses[0]); 209 210 size_t new_alloc; 211 size_t slots_used; 212 size_t sidx; 213 214 slots_used = c->head_response - c->tail_response; 215 if (slots_used >= c->responses_alloc) { 216 new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC; 217 c->responses = erealloc(c->responses, new_alloc * each); 218 for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx) 219 c->responses[sidx] = NULL; 220 c->tail_response = 0; 221 c->head_response = c->responses_alloc; 222 c->responses_alloc = new_alloc; 223 } 224 INSIST(NULL == c->responses[c->head_response % c->responses_alloc]); 225 return (0 == slots_used); 226} 227 228 229/* -------------------------------------------------------------------- 230 * queue_req_pointer() - append a work item or idle exit request to 231 * blocking_workitems[]. Employ proper locking. 232 */ 233static int 234queue_req_pointer( 235 blocking_child * c, 236 blocking_pipe_header * hdr 237 ) 238{ 239 size_t qhead; 240 241 /* >>>> ACCESS LOCKING STARTS >>>> */ 242 wait_for_sem(c->accesslock, NULL); 243 ensure_workitems_empty_slot(c); 244 qhead = c->head_workitem; 245 c->workitems[qhead % c->workitems_alloc] = hdr; 246 c->head_workitem = 1 + qhead; 247 tickle_sem(c->accesslock); 248 /* <<<< ACCESS LOCKING ENDS <<<< */ 249 250 /* queue consumer wake-up notification */ 251 tickle_sem(c->workitems_pending); 252 253 return 0; 254} 255 256/* -------------------------------------------------------------------- 257 * API function to make sure a worker is running, a proper private copy 258 * of the data is made, the data eneterd into the queue and the worker 259 * is signalled. 260 */ 261int 262send_blocking_req_internal( 263 blocking_child * c, 264 blocking_pipe_header * hdr, 265 void * data 266 ) 267{ 268 blocking_pipe_header * threadcopy; 269 size_t payload_octets; 270 271 REQUIRE(hdr != NULL); 272 REQUIRE(data != NULL); 273 DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig); 274 275 if (hdr->octets <= sizeof(*hdr)) 276 return 1; /* failure */ 277 payload_octets = hdr->octets - sizeof(*hdr); 278 279 if (NULL == c->thread_ref) 280 start_blocking_thread(c); 281 threadcopy = emalloc(hdr->octets); 282 memcpy(threadcopy, hdr, sizeof(*hdr)); 283 memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets); 284 285 return queue_req_pointer(c, threadcopy); 286} 287 288/* -------------------------------------------------------------------- 289 * Wait for the 'incoming queue no longer empty' signal, lock the shared 290 * structure and dequeue an item. 291 */ 292blocking_pipe_header * 293receive_blocking_req_internal( 294 blocking_child * c 295 ) 296{ 297 blocking_pipe_header * req; 298 size_t qhead, qtail; 299 300 req = NULL; 301 do { 302 /* wait for tickle from the producer side */ 303 wait_for_sem(c->workitems_pending, NULL); 304 305 /* >>>> ACCESS LOCKING STARTS >>>> */ 306 wait_for_sem(c->accesslock, NULL); 307 qhead = c->head_workitem; 308 do { 309 qtail = c->tail_workitem; 310 if (qhead == qtail) 311 break; 312 c->tail_workitem = qtail + 1; 313 qtail %= c->workitems_alloc; 314 req = c->workitems[qtail]; 315 c->workitems[qtail] = NULL; 316 } while (NULL == req); 317 tickle_sem(c->accesslock); 318 /* <<<< ACCESS LOCKING ENDS <<<< */ 319 320 } while (NULL == req); 321 322 INSIST(NULL != req); 323 if (CHILD_EXIT_REQ == req) { /* idled out */ 324 send_blocking_resp_internal(c, CHILD_GONE_RESP); 325 req = NULL; 326 } 327 328 return req; 329} 330 331/* -------------------------------------------------------------------- 332 * Push a response into the return queue and eventually tickle the 333 * receiver. 334 */ 335int 336send_blocking_resp_internal( 337 blocking_child * c, 338 blocking_pipe_header * resp 339 ) 340{ 341 size_t qhead; 342 int empty; 343 344 /* >>>> ACCESS LOCKING STARTS >>>> */ 345 wait_for_sem(c->accesslock, NULL); 346 empty = ensure_workresp_empty_slot(c); 347 qhead = c->head_response; 348 c->responses[qhead % c->responses_alloc] = resp; 349 c->head_response = 1 + qhead; 350 tickle_sem(c->accesslock); 351 /* <<<< ACCESS LOCKING ENDS <<<< */ 352 353 /* queue consumer wake-up notification */ 354 if (empty) 355 { 356# ifdef WORK_PIPE 357 write(c->resp_write_pipe, "", 1); 358# else 359 tickle_sem(c->responses_pending); 360# endif 361 } 362 return 0; 363} 364 365 366#ifndef WORK_PIPE 367 368/* -------------------------------------------------------------------- 369 * Check if a (Windows-)hanndle to a semaphore is actually the same we 370 * are using inside the sema wrapper. 371 */ 372static BOOL 373same_os_sema( 374 const sem_ref obj, 375 void* osh 376 ) 377{ 378 return obj && osh && (obj->shnd == (HANDLE)osh); 379} 380 381/* -------------------------------------------------------------------- 382 * Find the shared context that associates to an OS handle and make sure 383 * the data is dequeued and processed. 384 */ 385void 386handle_blocking_resp_sem( 387 void * context 388 ) 389{ 390 blocking_child * c; 391 u_int idx; 392 393 c = NULL; 394 for (idx = 0; idx < blocking_children_alloc; idx++) { 395 c = blocking_children[idx]; 396 if (c != NULL && 397 c->thread_ref != NULL && 398 same_os_sema(c->responses_pending, context)) 399 break; 400 } 401 if (idx < blocking_children_alloc) 402 process_blocking_resp(c); 403} 404#endif /* !WORK_PIPE */ 405 406/* -------------------------------------------------------------------- 407 * Fetch the next response from the return queue. In case of signalling 408 * via pipe, make sure the pipe is flushed, too. 409 */ 410blocking_pipe_header * 411receive_blocking_resp_internal( 412 blocking_child * c 413 ) 414{ 415 blocking_pipe_header * removed; 416 size_t qhead, qtail, slot; 417 418#ifdef WORK_PIPE 419 int rc; 420 char scratch[32]; 421 422 do 423 rc = read(c->resp_read_pipe, scratch, sizeof(scratch)); 424 while (-1 == rc && EINTR == errno); 425#endif 426 427 /* >>>> ACCESS LOCKING STARTS >>>> */ 428 wait_for_sem(c->accesslock, NULL); 429 qhead = c->head_response; 430 qtail = c->tail_response; 431 for (removed = NULL; !removed && (qhead != qtail); ++qtail) { 432 slot = qtail % c->responses_alloc; 433 removed = c->responses[slot]; 434 c->responses[slot] = NULL; 435 } 436 c->tail_response = qtail; 437 tickle_sem(c->accesslock); 438 /* <<<< ACCESS LOCKING ENDS <<<< */ 439 440 if (NULL != removed) { 441 DEBUG_ENSURE(CHILD_GONE_RESP == removed || 442 BLOCKING_RESP_MAGIC == removed->magic_sig); 443 } 444 if (CHILD_GONE_RESP == removed) { 445 cleanup_after_child(c); 446 removed = NULL; 447 } 448 449 return removed; 450} 451 452/* -------------------------------------------------------------------- 453 * Light up a new worker. 454 */ 455static void 456start_blocking_thread( 457 blocking_child * c 458 ) 459{ 460 461 DEBUG_INSIST(!c->reusable); 462 463 prepare_child_sems(c); 464 start_blocking_thread_internal(c); 465} 466 467/* -------------------------------------------------------------------- 468 * Create a worker thread. There are several differences between POSIX 469 * and Windows, of course -- most notably the Windows thread is no 470 * detached thread, and we keep the handle around until we want to get 471 * rid of the thread. The notification scheme also differs: Windows 472 * makes use of semaphores in both directions, POSIX uses a pipe for 473 * integration with 'select()' or alike. 474 */ 475static void 476start_blocking_thread_internal( 477 blocking_child * c 478 ) 479#ifdef SYS_WINNT 480{ 481 BOOL resumed; 482 483 c->thread_ref = NULL; 484 (*addremove_io_semaphore)(c->responses_pending->shnd, FALSE); 485 c->thr_table[0].thnd = 486 (HANDLE)_beginthreadex( 487 NULL, 488 0, 489 &blocking_thread, 490 c, 491 CREATE_SUSPENDED, 492 NULL); 493 494 if (NULL == c->thr_table[0].thnd) { 495 msyslog(LOG_ERR, "start blocking thread failed: %m"); 496 exit(-1); 497 } 498 /* remember the thread priority is only within the process class */ 499 if (!SetThreadPriority(c->thr_table[0].thnd, 500 THREAD_PRIORITY_BELOW_NORMAL)) 501 msyslog(LOG_ERR, "Error lowering blocking thread priority: %m"); 502 503 resumed = ResumeThread(c->thr_table[0].thnd); 504 DEBUG_INSIST(resumed); 505 c->thread_ref = &c->thr_table[0]; 506} 507#else /* pthreads start_blocking_thread_internal() follows */ 508{ 509# ifdef NEED_PTHREAD_INIT 510 static int pthread_init_called; 511# endif 512 pthread_attr_t thr_attr; 513 int rc; 514 int pipe_ends[2]; /* read then write */ 515 int is_pipe; 516 int flags; 517 size_t ostacksize; 518 size_t nstacksize; 519 sigset_t saved_sig_mask; 520 521 c->thread_ref = NULL; 522 523# ifdef NEED_PTHREAD_INIT 524 /* 525 * from lib/isc/unix/app.c: 526 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this. 527 */ 528 if (!pthread_init_called) { 529 pthread_init(); 530 pthread_init_called = TRUE; 531 } 532# endif 533 534 rc = pipe_socketpair(&pipe_ends[0], &is_pipe); 535 if (0 != rc) { 536 msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m"); 537 exit(1); 538 } 539 c->resp_read_pipe = move_fd(pipe_ends[0]); 540 c->resp_write_pipe = move_fd(pipe_ends[1]); 541 c->ispipe = is_pipe; 542 flags = fcntl(c->resp_read_pipe, F_GETFL, 0); 543 if (-1 == flags) { 544 msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m"); 545 exit(1); 546 } 547 rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags); 548 if (-1 == rc) { 549 msyslog(LOG_ERR, 550 "start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m"); 551 exit(1); 552 } 553 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE); 554 pthread_attr_init(&thr_attr); 555 pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED); 556#if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \ 557 defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE) 558 rc = pthread_attr_getstacksize(&thr_attr, &ostacksize); 559 if (0 != rc) { 560 msyslog(LOG_ERR, 561 "start_blocking_thread: pthread_attr_getstacksize() -> %s", 562 strerror(rc)); 563 } else { 564 if (ostacksize < THREAD_MINSTACKSIZE) 565 nstacksize = THREAD_MINSTACKSIZE; 566 else if (ostacksize > THREAD_MAXSTACKSIZE) 567 nstacksize = THREAD_MAXSTACKSIZE; 568 else 569 nstacksize = ostacksize; 570 if (nstacksize != ostacksize) 571 rc = pthread_attr_setstacksize(&thr_attr, nstacksize); 572 if (0 != rc) 573 msyslog(LOG_ERR, 574 "start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s", 575 (u_long)ostacksize, (u_long)nstacksize, 576 strerror(rc)); 577 } 578#else 579 UNUSED_ARG(nstacksize); 580 UNUSED_ARG(ostacksize); 581#endif 582#if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM) 583 pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM); 584#endif 585 c->thread_ref = emalloc_zero(sizeof(*c->thread_ref)); 586 block_thread_signals(&saved_sig_mask); 587 rc = pthread_create(&c->thr_table[0], &thr_attr, 588 &blocking_thread, c); 589 pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL); 590 pthread_attr_destroy(&thr_attr); 591 if (0 != rc) { 592 msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s", 593 strerror(rc)); 594 exit(1); 595 } 596 c->thread_ref = &c->thr_table[0]; 597} 598#endif 599 600/* -------------------------------------------------------------------- 601 * block_thread_signals() 602 * 603 * Temporarily block signals used by ntpd main thread, so that signal 604 * mask inherited by child threads leaves them blocked. Returns prior 605 * active signal mask via pmask, to be restored by the main thread 606 * after pthread_create(). 607 */ 608#ifndef SYS_WINNT 609void 610block_thread_signals( 611 sigset_t * pmask 612 ) 613{ 614 sigset_t block; 615 616 sigemptyset(&block); 617# ifdef HAVE_SIGNALED_IO 618# ifdef SIGIO 619 sigaddset(&block, SIGIO); 620# endif 621# ifdef SIGPOLL 622 sigaddset(&block, SIGPOLL); 623# endif 624# endif /* HAVE_SIGNALED_IO */ 625 sigaddset(&block, SIGALRM); 626 sigaddset(&block, MOREDEBUGSIG); 627 sigaddset(&block, LESSDEBUGSIG); 628# ifdef SIGDIE1 629 sigaddset(&block, SIGDIE1); 630# endif 631# ifdef SIGDIE2 632 sigaddset(&block, SIGDIE2); 633# endif 634# ifdef SIGDIE3 635 sigaddset(&block, SIGDIE3); 636# endif 637# ifdef SIGDIE4 638 sigaddset(&block, SIGDIE4); 639# endif 640# ifdef SIGBUS 641 sigaddset(&block, SIGBUS); 642# endif 643 sigemptyset(pmask); 644 pthread_sigmask(SIG_BLOCK, &block, pmask); 645} 646#endif /* !SYS_WINNT */ 647 648 649/* -------------------------------------------------------------------- 650 * Create & destroy semaphores. This is sufficiently different between 651 * POSIX and Windows to warrant wrapper functions and close enough to 652 * use the concept of synchronization via semaphore for all platforms. 653 */ 654static sem_ref 655create_sema( 656 sema_type* semptr, 657 u_int inival, 658 u_int maxval) 659{ 660#ifdef SYS_WINNT 661 662 long svini, svmax; 663 if (NULL != semptr) { 664 svini = (inival < LONG_MAX) 665 ? (long)inival : LONG_MAX; 666 svmax = (maxval < LONG_MAX && maxval > 0) 667 ? (long)maxval : LONG_MAX; 668 semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL); 669 if (NULL == semptr->shnd) 670 semptr = NULL; 671 } 672 673#else 674 675 (void)maxval; 676 if (semptr && sem_init(semptr, FALSE, inival)) 677 semptr = NULL; 678 679#endif 680 681 return semptr; 682} 683 684/* ------------------------------------------------------------------ */ 685static sem_ref 686delete_sema( 687 sem_ref obj) 688{ 689 690# ifdef SYS_WINNT 691 692 if (obj) { 693 if (obj->shnd) 694 CloseHandle(obj->shnd); 695 obj->shnd = NULL; 696 } 697 698# else 699 700 if (obj) 701 sem_destroy(obj); 702 703# endif 704 705 return NULL; 706} 707 708/* -------------------------------------------------------------------- 709 * prepare_child_sems() 710 * 711 * create sync & access semaphores 712 * 713 * All semaphores are cleared, only the access semaphore has 1 unit. 714 * Childs wait on 'workitems_pending', then grabs 'sema_access' 715 * and dequeues jobs. When done, 'sema_access' is given one unit back. 716 * 717 * The producer grabs 'sema_access', manages the queue, restores 718 * 'sema_access' and puts one unit into 'workitems_pending'. 719 * 720 * The story goes the same for the response queue. 721 */ 722static void 723prepare_child_sems( 724 blocking_child *c 725 ) 726{ 727 c->accesslock = create_sema(&c->sem_table[0], 1, 1); 728 c->workitems_pending = create_sema(&c->sem_table[1], 0, 0); 729 c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1); 730# ifndef WORK_PIPE 731 c->responses_pending = create_sema(&c->sem_table[3], 0, 0); 732# endif 733} 734 735/* -------------------------------------------------------------------- 736 * wait for semaphore. Where the wait can be interrupted, it will 737 * internally resume -- When this function returns, there is either no 738 * semaphore at all, a timeout occurred, or the caller could 739 * successfully take a token from the semaphore. 740 * 741 * For untimed wait, not checking the result of this function at all is 742 * definitely an option. 743 */ 744static int 745wait_for_sem( 746 sem_ref sem, 747 struct timespec * timeout /* wall-clock */ 748 ) 749#ifdef SYS_WINNT 750{ 751 struct timespec now; 752 struct timespec delta; 753 DWORD msec; 754 DWORD rc; 755 756 if (!(sem && sem->shnd)) { 757 errno = EINVAL; 758 return -1; 759 } 760 761 if (NULL == timeout) { 762 msec = INFINITE; 763 } else { 764 getclock(TIMEOFDAY, &now); 765 delta = sub_tspec(*timeout, now); 766 if (delta.tv_sec < 0) { 767 msec = 0; 768 } else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) { 769 msec = INFINITE; 770 } else { 771 msec = 1000 * (DWORD)delta.tv_sec; 772 msec += delta.tv_nsec / (1000 * 1000); 773 } 774 } 775 rc = WaitForSingleObject(sem->shnd, msec); 776 if (WAIT_OBJECT_0 == rc) 777 return 0; 778 if (WAIT_TIMEOUT == rc) { 779 errno = ETIMEDOUT; 780 return -1; 781 } 782 msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc); 783 errno = EFAULT; 784 return -1; 785} 786#else /* pthreads wait_for_sem() follows */ 787{ 788 int rc = -1; 789 790 if (sem) do { 791 if (NULL == timeout) 792 rc = sem_wait(sem); 793 else 794 rc = sem_timedwait(sem, timeout); 795 } while (rc == -1 && errno == EINTR); 796 else 797 errno = EINVAL; 798 799 return rc; 800} 801#endif 802 803/* -------------------------------------------------------------------- 804 * blocking_thread - thread functions have WINAPI (aka 'stdcall') 805 * calling conventions under Windows and POSIX-defined signature 806 * otherwise. 807 */ 808#ifdef SYS_WINNT 809u_int WINAPI 810#else 811void * 812#endif 813blocking_thread( 814 void * ThreadArg 815 ) 816{ 817 blocking_child *c; 818 819 c = ThreadArg; 820 exit_worker(blocking_child_common(c)); 821 822 /* NOTREACHED */ 823 return 0; 824} 825 826/* -------------------------------------------------------------------- 827 * req_child_exit() runs in the parent. 828 * 829 * This function is called from from the idle timer, too, and possibly 830 * without a thread being there any longer. Since we have folded up our 831 * tent in that case and all the semaphores are already gone, we simply 832 * ignore this request in this case. 833 * 834 * Since the existence of the semaphores is controlled exclusively by 835 * the parent, there's no risk of data race here. 836 */ 837int 838req_child_exit( 839 blocking_child *c 840 ) 841{ 842 return (c->accesslock) 843 ? queue_req_pointer(c, CHILD_EXIT_REQ) 844 : 0; 845} 846 847/* -------------------------------------------------------------------- 848 * cleanup_after_child() runs in parent. 849 */ 850static void 851cleanup_after_child( 852 blocking_child * c 853 ) 854{ 855 DEBUG_INSIST(!c->reusable); 856 857# ifdef SYS_WINNT 858 /* The thread was not created in detached state, so we better 859 * clean up. 860 */ 861 if (c->thread_ref && c->thread_ref->thnd) { 862 WaitForSingleObject(c->thread_ref->thnd, INFINITE); 863 INSIST(CloseHandle(c->thread_ref->thnd)); 864 c->thread_ref->thnd = NULL; 865 } 866# endif 867 c->thread_ref = NULL; 868 869 /* remove semaphores and (if signalling vi IO) pipes */ 870 871 c->accesslock = delete_sema(c->accesslock); 872 c->workitems_pending = delete_sema(c->workitems_pending); 873 c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep); 874 875# ifdef WORK_PIPE 876 DEBUG_INSIST(-1 != c->resp_read_pipe); 877 DEBUG_INSIST(-1 != c->resp_write_pipe); 878 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE); 879 close(c->resp_write_pipe); 880 close(c->resp_read_pipe); 881 c->resp_write_pipe = -1; 882 c->resp_read_pipe = -1; 883# else 884 DEBUG_INSIST(NULL != c->responses_pending); 885 (*addremove_io_semaphore)(c->responses_pending->shnd, TRUE); 886 c->responses_pending = delete_sema(c->responses_pending); 887# endif 888 889 /* Is it necessary to check if there are pending requests and 890 * responses? If so, and if there are, what to do with them? 891 */ 892 893 /* re-init buffer index sequencers */ 894 c->head_workitem = 0; 895 c->tail_workitem = 0; 896 c->head_response = 0; 897 c->tail_response = 0; 898 899 c->reusable = TRUE; 900} 901 902 903#else /* !WORK_THREAD follows */ 904char work_thread_nonempty_compilation_unit; 905#endif 906