port.c revision 289166
1/* Licensed to the Apache Software Foundation (ASF) under one or more 2 * contributor license agreements. See the NOTICE file distributed with 3 * this work for additional information regarding copyright ownership. 4 * The ASF licenses this file to You under the Apache License, Version 2.0 5 * (the "License"); you may not use this file except in compliance with 6 * the License. You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#include "apr.h" 18#include "apr_poll.h" 19#include "apr_time.h" 20#include "apr_portable.h" 21#include "apr_atomic.h" 22#include "apr_arch_file_io.h" 23#include "apr_arch_networkio.h" 24#include "apr_arch_poll_private.h" 25#include "apr_arch_inherit.h" 26 27#if defined(HAVE_PORT_CREATE) 28 29static apr_int16_t get_event(apr_int16_t event) 30{ 31 apr_int16_t rv = 0; 32 33 if (event & APR_POLLIN) 34 rv |= POLLIN; 35 if (event & APR_POLLPRI) 36 rv |= POLLPRI; 37 if (event & APR_POLLOUT) 38 rv |= POLLOUT; 39 /* POLLERR, POLLHUP, and POLLNVAL aren't valid as requested events */ 40 41 return rv; 42} 43 44static apr_int16_t get_revent(apr_int16_t event) 45{ 46 apr_int16_t rv = 0; 47 48 if (event & POLLIN) 49 rv |= APR_POLLIN; 50 if (event & POLLPRI) 51 rv |= APR_POLLPRI; 52 if (event & POLLOUT) 53 rv |= APR_POLLOUT; 54 if (event & POLLERR) 55 rv |= APR_POLLERR; 56 if (event & POLLHUP) 57 rv |= APR_POLLHUP; 58 if (event & POLLNVAL) 59 rv |= APR_POLLNVAL; 60 61 return rv; 62} 63 64 65struct apr_pollset_private_t 66{ 67 int port_fd; 68 port_event_t *port_set; 69 apr_pollfd_t *result_set; 70#if APR_HAS_THREADS 71 /* A thread mutex to protect operations on the rings */ 72 apr_thread_mutex_t *ring_lock; 73#endif 74 /* A ring containing all of the pollfd_t that are active */ 75 APR_RING_HEAD(pfd_query_ring_t, pfd_elem_t) query_ring; 76 /* A ring containing the pollfd_t that will be added on the 77 * next call to apr_pollset_poll(). 78 */ 79 APR_RING_HEAD(pfd_add_ring_t, pfd_elem_t) add_ring; 80 /* A ring of pollfd_t that have been used, and then _remove'd */ 81 APR_RING_HEAD(pfd_free_ring_t, pfd_elem_t) free_ring; 82 /* A ring of pollfd_t where rings that have been _remove'd but 83 might still be inside a _poll */ 84 APR_RING_HEAD(pfd_dead_ring_t, pfd_elem_t) dead_ring; 85 /* number of threads in poll */ 86 volatile apr_uint32_t waiting; 87}; 88 89static apr_status_t call_port_getn(int port, port_event_t list[], 90 unsigned int max, unsigned int *nget, 91 apr_interval_time_t timeout) 92{ 93 struct timespec tv, *tvptr; 94 int ret; 95 apr_status_t rv = APR_SUCCESS; 96 97 if (timeout < 0) { 98 tvptr = NULL; 99 } 100 else { 101 tv.tv_sec = (long) apr_time_sec(timeout); 102 tv.tv_nsec = (long) apr_time_usec(timeout) * 1000; 103 tvptr = &tv; 104 } 105 106 list[0].portev_user = (void *)-1; /* so we can double check that an 107 * event was returned 108 */ 109 110 ret = port_getn(port, list, max, nget, tvptr); 111 /* Note: 32-bit port_getn() on Solaris 10 x86 returns large negative 112 * values instead of 0 when returning immediately. 113 */ 114 115 if (ret == -1) { 116 rv = apr_get_netos_error(); 117 118 switch(rv) { 119 case EINTR: 120 case ETIME: 121 if (*nget > 0 && list[0].portev_user != (void *)-1) { 122 /* This confusing API can return an event at the same time 123 * that it reports EINTR or ETIME. If that occurs, just 124 * report the event. With EINTR, nget can be > 0 without 125 * any event, so check that portev_user was filled in. 126 * 127 * (Maybe it will be simplified; see thread 128 * http://mail.opensolaris.org 129 * /pipermail/networking-discuss/2009-August/011979.html 130 * This code will still work afterwards.) 131 */ 132 rv = APR_SUCCESS; 133 break; 134 } 135 if (rv == ETIME) { 136 rv = APR_TIMEUP; 137 } 138 /* fall-through */ 139 default: 140 *nget = 0; 141 } 142 } 143 else if (*nget == 0) { 144 rv = APR_TIMEUP; 145 } 146 147 return rv; 148} 149 150static apr_status_t impl_pollset_cleanup(apr_pollset_t *pollset) 151{ 152 close(pollset->p->port_fd); 153 return APR_SUCCESS; 154} 155 156static apr_status_t impl_pollset_create(apr_pollset_t *pollset, 157 apr_uint32_t size, 158 apr_pool_t *p, 159 apr_uint32_t flags) 160{ 161 apr_status_t rv = APR_SUCCESS; 162 pollset->p = apr_palloc(p, sizeof(apr_pollset_private_t)); 163#if APR_HAS_THREADS 164 if (flags & APR_POLLSET_THREADSAFE && 165 ((rv = apr_thread_mutex_create(&pollset->p->ring_lock, 166 APR_THREAD_MUTEX_DEFAULT, 167 p)) != APR_SUCCESS)) { 168 pollset->p = NULL; 169 return rv; 170 } 171#else 172 if (flags & APR_POLLSET_THREADSAFE) { 173 pollset->p = NULL; 174 return APR_ENOTIMPL; 175 } 176#endif 177 pollset->p->waiting = 0; 178 179 pollset->p->port_set = apr_palloc(p, size * sizeof(port_event_t)); 180 181 pollset->p->port_fd = port_create(); 182 183 if (pollset->p->port_fd < 0) { 184 pollset->p = NULL; 185 return apr_get_netos_error(); 186 } 187 188 { 189 int flags; 190 191 if ((flags = fcntl(pollset->p->port_fd, F_GETFD)) == -1) { 192 rv = errno; 193 close(pollset->p->port_fd); 194 pollset->p = NULL; 195 return rv; 196 } 197 198 flags |= FD_CLOEXEC; 199 if (fcntl(pollset->p->port_fd, F_SETFD, flags) == -1) { 200 rv = errno; 201 close(pollset->p->port_fd); 202 pollset->p = NULL; 203 return rv; 204 } 205 } 206 207 pollset->p->result_set = apr_palloc(p, size * sizeof(apr_pollfd_t)); 208 209 APR_RING_INIT(&pollset->p->query_ring, pfd_elem_t, link); 210 APR_RING_INIT(&pollset->p->add_ring, pfd_elem_t, link); 211 APR_RING_INIT(&pollset->p->free_ring, pfd_elem_t, link); 212 APR_RING_INIT(&pollset->p->dead_ring, pfd_elem_t, link); 213 214 return rv; 215} 216 217static apr_status_t impl_pollset_add(apr_pollset_t *pollset, 218 const apr_pollfd_t *descriptor) 219{ 220 apr_os_sock_t fd; 221 pfd_elem_t *elem; 222 int res; 223 apr_status_t rv = APR_SUCCESS; 224 225 pollset_lock_rings(); 226 227 if (!APR_RING_EMPTY(&(pollset->p->free_ring), pfd_elem_t, link)) { 228 elem = APR_RING_FIRST(&(pollset->p->free_ring)); 229 APR_RING_REMOVE(elem, link); 230 } 231 else { 232 elem = (pfd_elem_t *) apr_palloc(pollset->pool, sizeof(pfd_elem_t)); 233 APR_RING_ELEM_INIT(elem, link); 234 elem->on_query_ring = 0; 235 } 236 elem->pfd = *descriptor; 237 238 if (descriptor->desc_type == APR_POLL_SOCKET) { 239 fd = descriptor->desc.s->socketdes; 240 } 241 else { 242 fd = descriptor->desc.f->filedes; 243 } 244 245 /* If another thread is polling, notify the kernel immediately; otherwise, 246 * wait until the next call to apr_pollset_poll(). 247 */ 248 if (apr_atomic_read32(&pollset->p->waiting)) { 249 res = port_associate(pollset->p->port_fd, PORT_SOURCE_FD, fd, 250 get_event(descriptor->reqevents), (void *)elem); 251 252 if (res < 0) { 253 rv = apr_get_netos_error(); 254 APR_RING_INSERT_TAIL(&(pollset->p->free_ring), elem, pfd_elem_t, link); 255 } 256 else { 257 elem->on_query_ring = 1; 258 APR_RING_INSERT_TAIL(&(pollset->p->query_ring), elem, pfd_elem_t, link); 259 } 260 } 261 else { 262 APR_RING_INSERT_TAIL(&(pollset->p->add_ring), elem, pfd_elem_t, link); 263 } 264 265 pollset_unlock_rings(); 266 267 return rv; 268} 269 270static apr_status_t impl_pollset_remove(apr_pollset_t *pollset, 271 const apr_pollfd_t *descriptor) 272{ 273 apr_os_sock_t fd; 274 pfd_elem_t *ep; 275 apr_status_t rv = APR_SUCCESS; 276 int res; 277 int err = 0; 278 int found; 279 280 pollset_lock_rings(); 281 282 if (descriptor->desc_type == APR_POLL_SOCKET) { 283 fd = descriptor->desc.s->socketdes; 284 } 285 else { 286 fd = descriptor->desc.f->filedes; 287 } 288 289 /* Search the add ring first. This ring is often shorter, 290 * and it often contains the descriptor being removed. 291 * (For the common scenario where apr_pollset_poll() 292 * returns activity for the descriptor and the descriptor 293 * is then removed from the pollset, it will have just 294 * been moved to the add ring by apr_pollset_poll().) 295 * 296 * If it is on the add ring, it isn't associated with the 297 * event port yet/anymore. 298 */ 299 found = 0; 300 for (ep = APR_RING_FIRST(&(pollset->p->add_ring)); 301 ep != APR_RING_SENTINEL(&(pollset->p->add_ring), 302 pfd_elem_t, link); 303 ep = APR_RING_NEXT(ep, link)) { 304 305 if (descriptor->desc.s == ep->pfd.desc.s) { 306 found = 1; 307 APR_RING_REMOVE(ep, link); 308 APR_RING_INSERT_TAIL(&(pollset->p->free_ring), 309 ep, pfd_elem_t, link); 310 break; 311 } 312 } 313 314 if (!found) { 315 res = port_dissociate(pollset->p->port_fd, PORT_SOURCE_FD, fd); 316 317 if (res < 0) { 318 /* The expected case for this failure is that another 319 * thread's call to port_getn() returned this fd and 320 * disassociated the fd from the event port, and 321 * impl_pollset_poll() is blocked on the ring lock, 322 * which this thread holds. 323 */ 324 err = errno; 325 rv = APR_NOTFOUND; 326 } 327 328 for (ep = APR_RING_FIRST(&(pollset->p->query_ring)); 329 ep != APR_RING_SENTINEL(&(pollset->p->query_ring), 330 pfd_elem_t, link); 331 ep = APR_RING_NEXT(ep, link)) { 332 333 if (descriptor->desc.s == ep->pfd.desc.s) { 334 APR_RING_REMOVE(ep, link); 335 ep->on_query_ring = 0; 336 APR_RING_INSERT_TAIL(&(pollset->p->dead_ring), 337 ep, pfd_elem_t, link); 338 if (ENOENT == err) { 339 rv = APR_SUCCESS; 340 } 341 break; 342 } 343 } 344 } 345 346 pollset_unlock_rings(); 347 348 return rv; 349} 350 351static apr_status_t impl_pollset_poll(apr_pollset_t *pollset, 352 apr_interval_time_t timeout, 353 apr_int32_t *num, 354 const apr_pollfd_t **descriptors) 355{ 356 apr_os_sock_t fd; 357 int ret, i, j; 358 unsigned int nget; 359 pfd_elem_t *ep; 360 apr_status_t rv = APR_SUCCESS; 361 apr_pollfd_t fp; 362 363 nget = 1; 364 365 pollset_lock_rings(); 366 367 apr_atomic_inc32(&pollset->p->waiting); 368 369 while (!APR_RING_EMPTY(&(pollset->p->add_ring), pfd_elem_t, link)) { 370 ep = APR_RING_FIRST(&(pollset->p->add_ring)); 371 APR_RING_REMOVE(ep, link); 372 373 if (ep->pfd.desc_type == APR_POLL_SOCKET) { 374 fd = ep->pfd.desc.s->socketdes; 375 } 376 else { 377 fd = ep->pfd.desc.f->filedes; 378 } 379 380 ret = port_associate(pollset->p->port_fd, PORT_SOURCE_FD, 381 fd, get_event(ep->pfd.reqevents), ep); 382 if (ret < 0) { 383 rv = apr_get_netos_error(); 384 APR_RING_INSERT_TAIL(&(pollset->p->free_ring), ep, pfd_elem_t, link); 385 break; 386 } 387 388 ep->on_query_ring = 1; 389 APR_RING_INSERT_TAIL(&(pollset->p->query_ring), ep, pfd_elem_t, link); 390 } 391 392 pollset_unlock_rings(); 393 394 if (rv != APR_SUCCESS) { 395 apr_atomic_dec32(&pollset->p->waiting); 396 return rv; 397 } 398 399 rv = call_port_getn(pollset->p->port_fd, pollset->p->port_set, 400 pollset->nalloc, &nget, timeout); 401 402 /* decrease the waiting ASAP to reduce the window for calling 403 port_associate within apr_pollset_add() */ 404 apr_atomic_dec32(&pollset->p->waiting); 405 406 (*num) = nget; 407 if (nget) { 408 409 pollset_lock_rings(); 410 411 for (i = 0, j = 0; i < nget; i++) { 412 fp = (((pfd_elem_t*)(pollset->p->port_set[i].portev_user))->pfd); 413 if ((pollset->flags & APR_POLLSET_WAKEABLE) && 414 fp.desc_type == APR_POLL_FILE && 415 fp.desc.f == pollset->wakeup_pipe[0]) { 416 apr_pollset_drain_wakeup_pipe(pollset); 417 rv = APR_EINTR; 418 } 419 else { 420 pollset->p->result_set[j] = fp; 421 pollset->p->result_set[j].rtnevents = 422 get_revent(pollset->p->port_set[i].portev_events); 423 424 /* If the ring element is still on the query ring, move it 425 * to the add ring for re-association with the event port 426 * later. (It may have already been moved to the dead ring 427 * by a call to pollset_remove on another thread.) 428 */ 429 ep = (pfd_elem_t *)pollset->p->port_set[i].portev_user; 430 if (ep->on_query_ring) { 431 APR_RING_REMOVE(ep, link); 432 ep->on_query_ring = 0; 433 APR_RING_INSERT_TAIL(&(pollset->p->add_ring), ep, 434 pfd_elem_t, link); 435 } 436 j++; 437 } 438 } 439 pollset_unlock_rings(); 440 if ((*num = j)) { /* any event besides wakeup pipe? */ 441 rv = APR_SUCCESS; 442 if (descriptors) { 443 *descriptors = pollset->p->result_set; 444 } 445 } 446 } 447 448 pollset_lock_rings(); 449 450 /* Shift all PFDs in the Dead Ring to the Free Ring */ 451 APR_RING_CONCAT(&(pollset->p->free_ring), &(pollset->p->dead_ring), pfd_elem_t, link); 452 453 pollset_unlock_rings(); 454 455 return rv; 456} 457 458static apr_pollset_provider_t impl = { 459 impl_pollset_create, 460 impl_pollset_add, 461 impl_pollset_remove, 462 impl_pollset_poll, 463 impl_pollset_cleanup, 464 "port" 465}; 466 467apr_pollset_provider_t *apr_pollset_provider_port = &impl; 468 469static apr_status_t cb_cleanup(void *p_) 470{ 471 apr_pollcb_t *pollcb = (apr_pollcb_t *) p_; 472 close(pollcb->fd); 473 return APR_SUCCESS; 474} 475 476static apr_status_t impl_pollcb_create(apr_pollcb_t *pollcb, 477 apr_uint32_t size, 478 apr_pool_t *p, 479 apr_uint32_t flags) 480{ 481 pollcb->fd = port_create(); 482 483 if (pollcb->fd < 0) { 484 return apr_get_netos_error(); 485 } 486 487 { 488 int flags; 489 apr_status_t rv; 490 491 if ((flags = fcntl(pollcb->fd, F_GETFD)) == -1) { 492 rv = errno; 493 close(pollcb->fd); 494 pollcb->fd = -1; 495 return rv; 496 } 497 498 flags |= FD_CLOEXEC; 499 if (fcntl(pollcb->fd, F_SETFD, flags) == -1) { 500 rv = errno; 501 close(pollcb->fd); 502 pollcb->fd = -1; 503 return rv; 504 } 505 } 506 507 pollcb->pollset.port = apr_palloc(p, size * sizeof(port_event_t)); 508 apr_pool_cleanup_register(p, pollcb, cb_cleanup, apr_pool_cleanup_null); 509 510 return APR_SUCCESS; 511} 512 513static apr_status_t impl_pollcb_add(apr_pollcb_t *pollcb, 514 apr_pollfd_t *descriptor) 515{ 516 int ret, fd; 517 518 if (descriptor->desc_type == APR_POLL_SOCKET) { 519 fd = descriptor->desc.s->socketdes; 520 } 521 else { 522 fd = descriptor->desc.f->filedes; 523 } 524 525 ret = port_associate(pollcb->fd, PORT_SOURCE_FD, fd, 526 get_event(descriptor->reqevents), descriptor); 527 528 if (ret == -1) { 529 return apr_get_netos_error(); 530 } 531 532 return APR_SUCCESS; 533} 534 535static apr_status_t impl_pollcb_remove(apr_pollcb_t *pollcb, 536 apr_pollfd_t *descriptor) 537{ 538 int fd, ret; 539 540 if (descriptor->desc_type == APR_POLL_SOCKET) { 541 fd = descriptor->desc.s->socketdes; 542 } 543 else { 544 fd = descriptor->desc.f->filedes; 545 } 546 547 ret = port_dissociate(pollcb->fd, PORT_SOURCE_FD, fd); 548 549 if (ret < 0) { 550 return APR_NOTFOUND; 551 } 552 553 return APR_SUCCESS; 554} 555 556static apr_status_t impl_pollcb_poll(apr_pollcb_t *pollcb, 557 apr_interval_time_t timeout, 558 apr_pollcb_cb_t func, 559 void *baton) 560{ 561 apr_pollfd_t *pollfd; 562 apr_status_t rv; 563 unsigned int i, nget = 1; 564 565 rv = call_port_getn(pollcb->fd, pollcb->pollset.port, pollcb->nalloc, 566 &nget, timeout); 567 568 if (nget) { 569 for (i = 0; i < nget; i++) { 570 pollfd = (apr_pollfd_t *)(pollcb->pollset.port[i].portev_user); 571 pollfd->rtnevents = get_revent(pollcb->pollset.port[i].portev_events); 572 573 rv = func(baton, pollfd); 574 if (rv) { 575 return rv; 576 } 577 rv = apr_pollcb_add(pollcb, pollfd); 578 } 579 } 580 581 return rv; 582} 583 584static apr_pollcb_provider_t impl_cb = { 585 impl_pollcb_create, 586 impl_pollcb_add, 587 impl_pollcb_remove, 588 impl_pollcb_poll, 589 "port" 590}; 591 592apr_pollcb_provider_t *apr_pollcb_provider_port = &impl_cb; 593 594#endif /* HAVE_PORT_CREATE */ 595