ntp_worker.c revision 293650
1/* 2 * ntp_worker.c 3 */ 4#include <config.h> 5#include "ntp_workimpl.h" 6 7#ifdef WORKER 8 9#include <stdio.h> 10#include <ctype.h> 11#include <signal.h> 12 13#include "iosignal.h" 14#include "ntp_stdlib.h" 15#include "ntp_malloc.h" 16#include "ntp_syslog.h" 17#include "ntpd.h" 18#include "ntp_io.h" 19#include "ntp_assert.h" 20#include "ntp_unixtime.h" 21#include "intreswork.h" 22 23 24#define CHILD_MAX_IDLE (3 * 60) /* seconds, idle worker limit */ 25 26blocking_child ** blocking_children; 27size_t blocking_children_alloc; 28int worker_per_query; /* boolean */ 29int intres_req_pending; 30 31 32#ifndef HAVE_IO_COMPLETION_PORT 33/* 34 * pipe_socketpair() 35 * 36 * Provides an AF_UNIX socketpair on systems which have them, otherwise 37 * pair of unidirectional pipes. 38 */ 39int 40pipe_socketpair( 41 int caller_fds[2], 42 int * is_pipe 43 ) 44{ 45 int rc; 46 int fds[2]; 47 int called_pipe; 48 49#ifdef HAVE_SOCKETPAIR 50 rc = socketpair(AF_UNIX, SOCK_STREAM, 0, &fds[0]); 51#else 52 rc = -1; 53#endif 54 55 if (-1 == rc) { 56 rc = pipe(&fds[0]); 57 called_pipe = TRUE; 58 } else { 59 called_pipe = FALSE; 60 } 61 62 if (-1 == rc) 63 return rc; 64 65 caller_fds[0] = fds[0]; 66 caller_fds[1] = fds[1]; 67 if (is_pipe != NULL) 68 *is_pipe = called_pipe; 69 70 return 0; 71} 72 73 74/* 75 * close_all_except() 76 * 77 * Close all file descriptors except the given keep_fd. 78 */ 79void 80close_all_except( 81 int keep_fd 82 ) 83{ 84 int fd; 85 86 for (fd = 0; fd < keep_fd; fd++) 87 close(fd); 88 89 close_all_beyond(keep_fd); 90} 91 92 93/* 94 * close_all_beyond() 95 * 96 * Close all file descriptors after the given keep_fd, which is the 97 * highest fd to keep open. 98 */ 99void 100close_all_beyond( 101 int keep_fd 102 ) 103{ 104# ifdef HAVE_CLOSEFROM 105 closefrom(keep_fd + 1); 106# elif defined(F_CLOSEM) 107 /* 108 * From 'Writing Reliable AIX Daemons,' SG24-4946-00, 109 * by Eric Agar (saves us from doing 32767 system 110 * calls) 111 */ 112 if (fcntl(keep_fd + 1, F_CLOSEM, 0) == -1) 113 msyslog(LOG_ERR, "F_CLOSEM(%d): %m", keep_fd + 1); 114# else /* !HAVE_CLOSEFROM && !F_CLOSEM follows */ 115 int fd; 116 int max_fd; 117 118 max_fd = GETDTABLESIZE(); 119 for (fd = keep_fd + 1; fd < max_fd; fd++) 120 close(fd); 121# endif /* !HAVE_CLOSEFROM && !F_CLOSEM */ 122} 123#endif /* HAVE_IO_COMPLETION_PORT */ 124 125 126u_int 127available_blocking_child_slot(void) 128{ 129 const size_t each = sizeof(blocking_children[0]); 130 u_int slot; 131 size_t prev_alloc; 132 size_t new_alloc; 133 size_t prev_octets; 134 size_t octets; 135 136 for (slot = 0; slot < blocking_children_alloc; slot++) { 137 if (NULL == blocking_children[slot]) 138 return slot; 139 if (blocking_children[slot]->reusable) { 140 blocking_children[slot]->reusable = FALSE; 141 return slot; 142 } 143 } 144 145 prev_alloc = blocking_children_alloc; 146 prev_octets = prev_alloc * each; 147 new_alloc = blocking_children_alloc + 4; 148 octets = new_alloc * each; 149 blocking_children = erealloc_zero(blocking_children, octets, 150 prev_octets); 151 blocking_children_alloc = new_alloc; 152 153 /* assume we'll never have enough workers to overflow u_int */ 154 return (u_int)prev_alloc; 155} 156 157 158int 159queue_blocking_request( 160 blocking_work_req rtype, 161 void * req, 162 size_t reqsize, 163 blocking_work_callback done_func, 164 void * context 165 ) 166{ 167 static u_int intres_slot = UINT_MAX; 168 u_int child_slot; 169 blocking_child * c; 170 blocking_pipe_header req_hdr; 171 172 req_hdr.octets = sizeof(req_hdr) + reqsize; 173 req_hdr.magic_sig = BLOCKING_REQ_MAGIC; 174 req_hdr.rtype = rtype; 175 req_hdr.done_func = done_func; 176 req_hdr.context = context; 177 178 child_slot = UINT_MAX; 179 if (worker_per_query || UINT_MAX == intres_slot || 180 blocking_children[intres_slot]->reusable) 181 child_slot = available_blocking_child_slot(); 182 if (!worker_per_query) { 183 if (UINT_MAX == intres_slot) 184 intres_slot = child_slot; 185 else 186 child_slot = intres_slot; 187 if (0 == intres_req_pending) 188 intres_timeout_req(0); 189 } 190 intres_req_pending++; 191 INSIST(UINT_MAX != child_slot); 192 c = blocking_children[child_slot]; 193 if (NULL == c) { 194 c = emalloc_zero(sizeof(*c)); 195#ifdef WORK_FORK 196 c->req_read_pipe = -1; 197 c->req_write_pipe = -1; 198#endif 199#ifdef WORK_PIPE 200 c->resp_read_pipe = -1; 201 c->resp_write_pipe = -1; 202#endif 203 blocking_children[child_slot] = c; 204 } 205 req_hdr.child_idx = child_slot; 206 207 return send_blocking_req_internal(c, &req_hdr, req); 208} 209 210 211int queue_blocking_response( 212 blocking_child * c, 213 blocking_pipe_header * resp, 214 size_t respsize, 215 const blocking_pipe_header * req 216 ) 217{ 218 resp->octets = respsize; 219 resp->magic_sig = BLOCKING_RESP_MAGIC; 220 resp->rtype = req->rtype; 221 resp->context = req->context; 222 resp->done_func = req->done_func; 223 224 return send_blocking_resp_internal(c, resp); 225} 226 227 228void 229process_blocking_resp( 230 blocking_child * c 231 ) 232{ 233 blocking_pipe_header * resp; 234 void * data; 235 236 /* 237 * On Windows send_blocking_resp_internal() may signal the 238 * blocking_response_ready event multiple times while we're 239 * processing a response, so always consume all available 240 * responses before returning to test the event again. 241 */ 242#ifdef WORK_THREAD 243 do { 244#endif 245 resp = receive_blocking_resp_internal(c); 246 if (NULL != resp) { 247 DEBUG_REQUIRE(BLOCKING_RESP_MAGIC == 248 resp->magic_sig); 249 data = (char *)resp + sizeof(*resp); 250 intres_req_pending--; 251 (*resp->done_func)(resp->rtype, resp->context, 252 resp->octets - sizeof(*resp), 253 data); 254 free(resp); 255 } 256#ifdef WORK_THREAD 257 } while (NULL != resp); 258#endif 259 if (!worker_per_query && 0 == intres_req_pending) 260 intres_timeout_req(CHILD_MAX_IDLE); 261 else if (worker_per_query) 262 req_child_exit(c); 263} 264 265 266/* 267 * blocking_child_common runs as a forked child or a thread 268 */ 269int 270blocking_child_common( 271 blocking_child *c 272 ) 273{ 274 int say_bye; 275 blocking_pipe_header *req; 276 277 say_bye = FALSE; 278 while (!say_bye) { 279 req = receive_blocking_req_internal(c); 280 if (NULL == req) { 281 say_bye = TRUE; 282 continue; 283 } 284 285 DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == req->magic_sig); 286 287 switch (req->rtype) { 288 case BLOCKING_GETADDRINFO: 289 if (blocking_getaddrinfo(c, req)) 290 say_bye = TRUE; 291 break; 292 293 case BLOCKING_GETNAMEINFO: 294 if (blocking_getnameinfo(c, req)) 295 say_bye = TRUE; 296 break; 297 298 default: 299 msyslog(LOG_ERR, "unknown req %d to blocking worker", req->rtype); 300 say_bye = TRUE; 301 } 302 303 free(req); 304 } 305 306 return 0; 307} 308 309 310/* 311 * worker_idle_timer_fired() 312 * 313 * The parent starts this timer when the last pending response has been 314 * received from the child, making it idle, and clears the timer when a 315 * request is dispatched to the child. Once the timer expires, the 316 * child is sent packing. 317 * 318 * This is called when worker_idle_timer is nonzero and less than or 319 * equal to current_time. 320 */ 321void 322worker_idle_timer_fired(void) 323{ 324 u_int idx; 325 blocking_child * c; 326 327 DEBUG_REQUIRE(0 == intres_req_pending); 328 329 intres_timeout_req(0); 330 for (idx = 0; idx < blocking_children_alloc; idx++) { 331 c = blocking_children[idx]; 332 if (NULL == c) 333 continue; 334 req_child_exit(c); 335 } 336} 337 338 339#else /* !WORKER follows */ 340int ntp_worker_nonempty_compilation_unit; 341#endif 342