z_asio.c revision 289166
1/* Licensed to the Apache Software Foundation (ASF) under one or more 2 * contributor license agreements. See the NOTICE file distributed with 3 * this work for additional information regarding copyright ownership. 4 * The ASF licenses this file to You under the Apache License, Version 2.0 5 * (the "License"); you may not use this file except in compliance with 6 * the License. You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 * 16 * 17 ****************************************************************************** 18 * 19 * This implementation is based on a design by John Brooks (IBM Pok) which uses 20 * the z/OS sockets async i/o facility. When a 21 * socket is added to the pollset, an async poll is issued for that individual 22 * socket. It specifies that the kernel should send an IPC message when the 23 * socket becomes ready. The IPC messages are sent to a single message queue 24 * that is part of the pollset. apr_pollset_poll waits on the arrival of IPC 25 * messages or the specified timeout. 26 * 27 * Since z/OS does not support async i/o for pipes or files at present, this 28 * implementation falls back to using ordinary poll() when 29 * APR_POLLSET_THREADSAFE is unset. 30 * 31 * Greg Ames 32 * April 2012 33 */ 34 35#include "apr.h" 36#include "apr_hash.h" 37#include "apr_poll.h" 38#include "apr_time.h" 39#include "apr_portable.h" 40#include "apr_arch_inherit.h" 41#include "apr_arch_file_io.h" 42#include "apr_arch_networkio.h" 43#include "apr_arch_poll_private.h" 44 45#ifdef HAVE_AIO_MSGQ 46 47#include <sys/msg.h> /* msgget etc */ 48#include <time.h> /* timestruct */ 49#include <poll.h> /* pollfd */ 50#include <limits.h> /* MAX_INT */ 51 52struct apr_pollset_private_t 53{ 54 int msg_q; /* IPC message queue. The z/OS kernel sends messages 55 * to this queue when our async polls on individual 56 * file descriptors complete 57 */ 58 apr_pollfd_t *result_set; 59 apr_uint32_t size; 60 61#if APR_HAS_THREADS 62 /* A thread mutex to protect operations on the rings and the hash */ 63 apr_thread_mutex_t *ring_lock; 64#endif 65 66 /* A hash of all active elements used for O(1) _remove operations */ 67 apr_hash_t *elems; 68 69 APR_RING_HEAD(ready_ring_t, asio_elem_t) ready_ring; 70 APR_RING_HEAD(prior_ready_ring_t, asio_elem_t) prior_ready_ring; 71 APR_RING_HEAD(free_ring_t, asio_elem_t) free_ring; 72 73 /* for pipes etc with no asio */ 74 struct pollfd *pollset; 75 apr_pollfd_t *query_set; 76}; 77 78typedef enum { 79 ASIO_INIT = 0, 80 ASIO_REMOVED, 81 ASIO_COMPLETE 82} asio_state_e; 83 84typedef struct asio_elem_t asio_elem_t; 85 86struct asio_msgbuf_t { 87 long msg_type; /* must be > 0 */ 88 asio_elem_t *msg_elem; 89}; 90 91struct asio_elem_t 92{ 93 APR_RING_ENTRY(asio_elem_t) link; 94 apr_pollfd_t pfd; 95 struct pollfd os_pfd; 96 struct aiocb a; 97 asio_state_e state; 98 struct asio_msgbuf_t msg; 99}; 100 101#define DEBUG 0 102 103/* DEBUG settings: 0 - no debug messages at all, 104 * 1 - should not occur messages, 105 * 2 - apr_pollset_* entry and exit messages, 106 * 3 - state changes, memory usage, 107 * 4 - z/OS, APR, and internal calls, 108 * 5 - everything else except the timer pop path, 109 * 6 - everything, including the Event 1 sec timer pop path 110 * 111 * each DEBUG level includes all messages produced by lower numbered levels 112 */ 113 114#if DEBUG 115 116#include <assert.h> 117#include <unistd.h> /* getpid */ 118 119#define DBG_BUFF char dbg_msg_buff[256]; 120 121#define DBG_TEST(lvl) if (lvl <= DEBUG) { 122 123#define DBG_CORE(msg) sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \ 124 " " msg, getpid()), \ 125 fprintf(stderr, "%s", dbg_msg_buff); 126#define DBG_CORE1(msg, var1) sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \ 127 " " msg, getpid(), var1), \ 128 fprintf(stderr, "%s", dbg_msg_buff); 129#define DBG_CORE2(msg, var1, var2) sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \ 130 " " msg, getpid(), var1, var2), \ 131 fprintf(stderr, "%s", dbg_msg_buff); 132#define DBG_CORE3(msg, var1, var2, var3) \ 133 sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \ 134 " " msg, getpid(), var1, var2, var3), \ 135 fprintf(stderr, "%s", dbg_msg_buff); 136#define DBG_CORE4(msg, var1, var2, var3, var4) \ 137 sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \ 138 " " msg, getpid(), var1, var2, var3, var4),\ 139 fprintf(stderr, "%s", dbg_msg_buff); 140 141#define DBG_END } 142 143#define DBG(lvl, msg) DBG_TEST(lvl) \ 144 DBG_CORE(msg) \ 145 DBG_END 146 147#define DBG1(lvl, msg, var1) DBG_TEST(lvl) \ 148 DBG_CORE1(msg, var1) \ 149 DBG_END 150 151#define DBG2(lvl, msg, var1, var2) DBG_TEST(lvl) \ 152 DBG_CORE2(msg, var1, var2) \ 153 DBG_END 154 155#define DBG3(lvl, msg, var1, var2, var3) \ 156 DBG_TEST(lvl) \ 157 DBG_CORE3(msg, var1, var2, var3) \ 158 DBG_END 159 160#define DBG4(lvl, msg, var1, var2, var3, var4) \ 161 DBG_TEST(lvl) \ 162 DBG_CORE4(msg, var1, var2, var3, var4) \ 163 DBG_END 164 165#else /* DEBUG is 0 */ 166#define DBG_BUFF 167#define DBG(lvl, msg) ((void)0) 168#define DBG1(lvl, msg, var1) ((void)0) 169#define DBG2(lvl, msg, var1, var2) ((void)0) 170#define DBG3(lvl, msg, var1, var2, var3) ((void)0) 171#define DBG4(lvl, msg, var1, var2, var3, var4) ((void)0) 172 173#endif /* DEBUG */ 174 175static int asyncio(struct aiocb *a) 176{ 177 DBG_BUFF 178 int rv; 179 180#ifdef _LP64 181#define AIO BPX4AIO 182#else 183#define AIO BPX1AIO 184#endif 185 186 AIO(sizeof(struct aiocb), a, &rv, &errno, __err2ad()); 187 DBG2(4, "BPX4AIO aiocb %p rv %d\n", 188 a, rv); 189#ifdef DEBUG 190 if (rv < 0) { 191 DBG2(4, "errno %d errnojr %08x\n", 192 errno, *__err2ad()); 193 } 194#endif 195 return rv; 196} 197 198static apr_int16_t get_event(apr_int16_t event) 199{ 200 DBG_BUFF 201 apr_int16_t rv = 0; 202 DBG(4, "entered\n"); 203 204 if (event & APR_POLLIN) 205 rv |= POLLIN; 206 if (event & APR_POLLPRI) 207 rv |= POLLPRI; 208 if (event & APR_POLLOUT) 209 rv |= POLLOUT; 210 if (event & APR_POLLERR) 211 rv |= POLLERR; 212 if (event & APR_POLLHUP) 213 rv |= POLLHUP; 214 if (event & APR_POLLNVAL) 215 rv |= POLLNVAL; 216 217 DBG(4, "exiting\n"); 218 return rv; 219} 220 221static apr_int16_t get_revent(apr_int16_t event) 222{ 223 DBG_BUFF 224 apr_int16_t rv = 0; 225 DBG(4, "entered\n"); 226 227 if (event & POLLIN) 228 rv |= APR_POLLIN; 229 if (event & POLLPRI) 230 rv |= APR_POLLPRI; 231 if (event & POLLOUT) 232 rv |= APR_POLLOUT; 233 if (event & POLLERR) 234 rv |= APR_POLLERR; 235 if (event & POLLHUP) 236 rv |= APR_POLLHUP; 237 if (event & POLLNVAL) 238 rv |= APR_POLLNVAL; 239 240 DBG(4, "exiting\n"); 241 return rv; 242} 243 244static apr_status_t asio_pollset_cleanup(apr_pollset_t *pollset) 245{ 246 DBG_BUFF 247 int rv; 248 249 DBG(4, "entered\n"); 250 rv = msgctl(pollset->p->msg_q, IPC_RMID, NULL); 251 252 DBG1(4, "exiting, msgctl(IPC_RMID) returned %d\n", rv); 253 return rv; 254} 255 256static apr_status_t asio_pollset_create(apr_pollset_t *pollset, 257 apr_uint32_t size, 258 apr_pool_t *p, 259 apr_uint32_t flags) 260{ 261 DBG_BUFF 262 apr_status_t rv; 263 apr_pollset_private_t *priv; 264 265 DBG1(2, "entered, flags: %x\n", flags); 266 267 priv = pollset->p = apr_palloc(p, sizeof(*priv)); 268 269 if (flags & APR_POLLSET_THREADSAFE) { 270#if APR_HAS_THREADS 271 if (rv = apr_thread_mutex_create(&(priv->ring_lock), 272 APR_THREAD_MUTEX_DEFAULT, 273 p) != APR_SUCCESS) { 274 DBG1(1, "apr_thread_mutex_create returned %d\n", rv); 275 pollset->p = NULL; 276 return rv; 277 } 278 rv = msgget(IPC_PRIVATE, S_IWUSR+S_IRUSR); /* user r/w perms */ 279 if (rv < 0) { 280#if DEBUG 281 perror(__FUNCTION__ " msgget returned < 0 "); 282#endif 283 pollset->p = NULL; 284 return rv; 285 } 286 287 DBG2(4, "pollset %p msgget was OK, rv=%d\n", pollset, rv); 288 priv->msg_q = rv; 289 priv->elems = apr_hash_make(p); 290 291 APR_RING_INIT(&priv->free_ring, asio_elem_t, link); 292 APR_RING_INIT(&priv->prior_ready_ring, asio_elem_t, link); 293 294#else /* APR doesn't have threads but caller wants a threadsafe pollset */ 295 pollset->p = NULL; 296 return APR_ENOTIMPL; 297#endif 298 299 } else { /* APR_POLLSET_THREADSAFE not set, i.e. no async i/o, 300 * init fields only needed in old style pollset 301 */ 302 303 priv->pollset = apr_palloc(p, size * sizeof(struct pollfd)); 304 priv->query_set = apr_palloc(p, size * sizeof(apr_pollfd_t)); 305 306 if ((!priv->pollset) || (!priv->query_set)) { 307 pollset->p = NULL; 308 return APR_ENOMEM; 309 } 310 } 311 312 pollset->nelts = 0; 313 pollset->flags = flags; 314 pollset->pool = p; 315 priv->size = size; 316 priv->result_set = apr_palloc(p, size * sizeof(apr_pollfd_t)); 317 if (!priv->result_set) { 318 if (flags & APR_POLLSET_THREADSAFE) { 319 msgctl(priv->msg_q, IPC_RMID, NULL); 320 } 321 pollset->p = NULL; 322 return APR_ENOMEM; 323 } 324 325 DBG2(2, "exiting, pollset: %p, type: %s\n", 326 pollset, 327 flags & APR_POLLSET_THREADSAFE ? "async" : "POSIX"); 328 329 330 return APR_SUCCESS; 331 332} /* end of asio_pollset_create */ 333 334static apr_status_t posix_add(apr_pollset_t *pollset, 335 const apr_pollfd_t *descriptor) 336{ 337 DBG_BUFF 338 int fd; 339 apr_pool_t *p = pollset->pool; 340 apr_pollset_private_t *priv = pollset->p; 341 342 DBG(4, "entered\n"); 343 344 if (pollset->nelts == priv->size) { 345 return APR_ENOMEM; 346 } 347 348 priv->query_set[pollset->nelts] = *descriptor; 349 if (descriptor->desc_type == APR_POLL_SOCKET) { 350 fd = descriptor->desc.s->socketdes; 351 } 352 else { 353 fd = descriptor->desc.f->filedes; 354 } 355 356 priv->pollset[pollset->nelts].fd = fd; 357 358 priv->pollset[pollset->nelts].events = 359 get_event(descriptor->reqevents); 360 361 pollset->nelts++; 362 363 DBG2(4, "exiting, fd %d added to pollset %p\n", fd, pollset); 364 365 return APR_SUCCESS; 366} /* end of posix_add */ 367 368 369static apr_status_t asio_pollset_add(apr_pollset_t *pollset, 370 const apr_pollfd_t *descriptor) 371{ 372 DBG_BUFF 373 asio_elem_t *elem; 374 apr_status_t rv = APR_SUCCESS; 375 apr_pollset_private_t *priv = pollset->p; 376 377 pollset_lock_rings(); 378 DBG(2, "entered\n"); 379 380 if (pollset->flags & APR_POLLSET_THREADSAFE) { 381 382 if (!APR_RING_EMPTY(&(priv->free_ring), asio_elem_t, link)) { 383 elem = APR_RING_FIRST(&(priv->free_ring)); 384 APR_RING_REMOVE(elem, link); 385 DBG1(3, "used recycled memory at %08p\n", elem); 386 elem->state = ASIO_INIT; 387 elem->a.aio_cflags = 0; 388 } 389 else { 390 elem = (asio_elem_t *) apr_pcalloc(pollset->pool, sizeof(asio_elem_t)); 391 DBG1(3, "alloced new memory at %08p\n", elem); 392 393 elem->a.aio_notifytype = AIO_MSGQ; 394 elem->a.aio_msgev_qid = priv->msg_q; 395 DBG1(5, "aio_msgev_quid = %d \n", elem->a.aio_msgev_qid); 396 elem->a.aio_msgev_size = sizeof(asio_elem_t *); 397 elem->a.aio_msgev_flag = 0; /* wait if queue is full */ 398 elem->a.aio_msgev_addr = &(elem->msg); 399 elem->a.aio_buf = &(elem->os_pfd); 400 elem->a.aio_nbytes = 1; /* number of pfds to poll */ 401 elem->msg.msg_type = 1; 402 elem->msg.msg_elem = elem; 403 } 404 405 /* z/OS only supports async I/O for sockets for now */ 406 elem->os_pfd.fd = descriptor->desc.s->socketdes; 407 408 APR_RING_ELEM_INIT(elem, link); 409 elem->a.aio_cmd = AIO_SELPOLL; 410 elem->a.aio_cflags &= ~AIO_OK2COMPIMD; /* not OK to complete inline*/ 411 elem->pfd = *descriptor; 412 elem->os_pfd.events = get_event(descriptor->reqevents); 413 414 if (0 != asyncio(&elem->a)) { 415 rv = errno; 416 DBG3(4, "pollset %p asio failed fd %d, errno %p\n", 417 pollset, elem->os_pfd.fd, rv); 418#if DEBUG 419 perror(__FUNCTION__ " asio failure"); 420#endif 421 } 422 else { 423 DBG2(4, "good asio call, adding fd %d to pollset %p\n", 424 elem->os_pfd.fd, pollset); 425 426 pollset->nelts++; 427 apr_hash_set(priv->elems, &(elem->os_pfd.fd), sizeof(int), elem); 428 } 429 } 430 else { 431 /* APR_POLLSET_THREADSAFE isn't set. use POSIX poll in case 432 * pipes or files are used with this pollset 433 */ 434 435 rv = posix_add(pollset, descriptor); 436 } 437 438 DBG1(2, "exiting, rv = %d\n", rv); 439 440 pollset_unlock_rings(); 441 return rv; 442} /* end of asio_pollset_add */ 443 444static posix_remove(apr_pollset_t *pollset, const apr_pollfd_t *descriptor) 445{ 446 DBG_BUFF 447 apr_uint32_t i; 448 apr_pollset_private_t *priv = pollset->p; 449 450 DBG(4, "entered\n"); 451 for (i = 0; i < pollset->nelts; i++) { 452 if (descriptor->desc.s == priv->query_set[i].desc.s) { 453 /* Found an instance of the fd: remove this and any other copies */ 454 apr_uint32_t dst = i; 455 apr_uint32_t old_nelts = pollset->nelts; 456 pollset->nelts--; 457 for (i++; i < old_nelts; i++) { 458 if (descriptor->desc.s == priv->query_set[i].desc.s) { 459 pollset->nelts--; 460 } 461 else { 462 priv->pollset[dst] = priv->pollset[i]; 463 priv->query_set[dst] = priv->query_set[i]; 464 dst++; 465 } 466 } 467 DBG(4, "returning OK\n"); 468 return APR_SUCCESS; 469 } 470 } 471 472 DBG(1, "returning APR_NOTFOUND\n"); 473 return APR_NOTFOUND; 474 475} /* end of posix_remove */ 476 477static apr_status_t asio_pollset_remove(apr_pollset_t *pollset, 478 const apr_pollfd_t *descriptor) 479{ 480 DBG_BUFF 481 asio_elem_t *elem; 482 apr_status_t rv = APR_SUCCESS; 483 apr_pollset_private_t *priv = pollset->p; 484 struct aiocb cancel_a; /* AIO_CANCEL is synchronous, so autodata works fine */ 485 486 int fd; 487 488 DBG(2, "entered\n"); 489 490 if (!(pollset->flags & APR_POLLSET_THREADSAFE)) { 491 return posix_remove(pollset, descriptor); 492 } 493 494 pollset_lock_rings(); 495 496#if DEBUG 497 assert(descriptor->desc_type == APR_POLL_SOCKET); 498#endif 499 /* zOS 1.12 doesn't support files for async i/o */ 500 fd = descriptor->desc.s->socketdes; 501 502 elem = apr_hash_get(priv->elems, &(fd), sizeof(int)); 503 if (elem == NULL) { 504 DBG1(1, "couldn't find fd %d\n", fd); 505 rv = APR_NOTFOUND; 506 } else { 507 DBG1(5, "hash found fd %d\n", fd); 508 /* delete this fd from the hash */ 509 apr_hash_set(priv->elems, &(fd), sizeof(int), NULL); 510 511 if (elem->state == ASIO_INIT) { 512 /* asyncio call to cancel */ 513 cancel_a.aio_cmd = AIO_CANCEL; 514 cancel_a.aio_buf = &elem->a; /* point to original aiocb */ 515 516 cancel_a.aio_cflags = 0; 517 cancel_a.aio_cflags2 = 0; 518 519 /* we want the original aiocb to show up on the pollset message queue 520 * before recycling its memory to eliminate race conditions 521 */ 522 523 rv = asyncio(&cancel_a); 524 DBG1(4, "asyncio returned %d\n", rv); 525 526#if DEBUG 527 assert(rv == 1); 528#endif 529 } 530 elem->state = ASIO_REMOVED; 531 rv = APR_SUCCESS; 532 } 533 534 DBG1(2, "exiting, rv: %d\n", rv); 535 536 pollset_unlock_rings(); 537 538 return rv; 539} /* end of asio_pollset_remove */ 540 541static posix_poll(apr_pollset_t *pollset, 542 apr_interval_time_t timeout, 543 apr_int32_t *num, 544 const apr_pollfd_t **descriptors) 545{ 546 DBG_BUFF 547 int rv; 548 apr_uint32_t i, j; 549 apr_pollset_private_t *priv = pollset->p; 550 551 DBG(4, "entered\n"); 552 553 if (timeout > 0) { 554 timeout /= 1000; 555 } 556 rv = poll(priv->pollset, pollset->nelts, timeout); 557 (*num) = rv; 558 if (rv < 0) { 559 return apr_get_netos_error(); 560 } 561 if (rv == 0) { 562 return APR_TIMEUP; 563 } 564 j = 0; 565 for (i = 0; i < pollset->nelts; i++) { 566 if (priv->pollset[i].revents != 0) { 567 priv->result_set[j] = priv->query_set[i]; 568 priv->result_set[j].rtnevents = 569 get_revent(priv->pollset[i].revents); 570 j++; 571 } 572 } 573 if (descriptors) 574 *descriptors = priv->result_set; 575 576 DBG(4, "exiting ok\n"); 577 return APR_SUCCESS; 578 579} /* end of posix_poll */ 580 581static process_msg(apr_pollset_t *pollset, struct asio_msgbuf_t *msg) 582{ 583 DBG_BUFF 584 asio_elem_t *elem = msg->msg_elem; 585 586 switch(elem->state) { 587 case ASIO_REMOVED: 588 DBG2(5, "for cancelled elem, recycling memory - elem %08p, fd %d\n", 589 elem, elem->os_pfd.fd); 590 APR_RING_INSERT_TAIL(&(pollset->p->free_ring), elem, 591 asio_elem_t, link); 592 break; 593 case ASIO_INIT: 594 DBG2(4, "adding to ready ring: elem %08p, fd %d\n", 595 elem, elem->os_pfd.fd); 596 elem->state = ASIO_COMPLETE; 597 APR_RING_INSERT_TAIL(&(pollset->p->ready_ring), elem, 598 asio_elem_t, link); 599 break; 600 default: 601 DBG3(1, "unexpected state: elem %08p, fd %d, state %d\n", 602 elem, elem->os_pfd.fd, elem->state); 603#if DEBUG 604 assert(0); 605#endif 606 } 607} 608 609static apr_status_t asio_pollset_poll(apr_pollset_t *pollset, 610 apr_interval_time_t timeout, 611 apr_int32_t *num, 612 const apr_pollfd_t **descriptors) 613{ 614 DBG_BUFF 615 int i, ret; 616 asio_elem_t *elem, *next_elem; 617 struct asio_msgbuf_t msg_buff; 618 struct timespec tv; 619 apr_status_t rv = APR_SUCCESS; 620 apr_pollset_private_t *priv = pollset->p; 621 622 DBG(6, "entered\n"); /* chatty - traces every second w/Event */ 623 624 if ((pollset->flags & APR_POLLSET_THREADSAFE) == 0 ) { 625 return posix_poll(pollset, timeout, num, descriptors); 626 } 627 628 pollset_lock_rings(); 629 APR_RING_INIT(&(priv->ready_ring), asio_elem_t, link); 630 631 while (!APR_RING_EMPTY(&(priv->prior_ready_ring), asio_elem_t, link)) { 632 elem = APR_RING_FIRST(&(priv->prior_ready_ring)); 633 DBG3(5, "pollset %p elem %p fd %d on prior ready ring\n", 634 pollset, 635 elem, 636 elem->os_pfd.fd); 637 638 APR_RING_REMOVE(elem, link); 639 640 /* 641 * since USS does not remember what's in our pollset, we have 642 * to re-add fds which have not been apr_pollset_remove'd 643 * 644 * there may have been too many ready fd's to return in the 645 * result set last time. re-poll inline for both cases 646 */ 647 648 if (elem->state == ASIO_REMOVED) { 649 650 /* 651 * async i/o is done since it was found on prior_ready 652 * the state says the caller is done with it too 653 * so recycle the elem 654 */ 655 656 APR_RING_INSERT_TAIL(&(priv->free_ring), elem, 657 asio_elem_t, link); 658 continue; /* do not re-add if it has been _removed */ 659 } 660 661 elem->state = ASIO_INIT; 662 elem->a.aio_cflags = AIO_OK2COMPIMD; 663 664 if (0 != (ret = asyncio(&elem->a))) { 665 if (ret == 1) { 666 DBG(4, "asyncio() completed inline\n"); 667 /* it's ready now */ 668 elem->state = ASIO_COMPLETE; 669 APR_RING_INSERT_TAIL(&(priv->ready_ring), elem, asio_elem_t, 670 link); 671 } 672 else { 673 DBG2(1, "asyncio() failed, ret: %d, errno: %d\n", 674 ret, errno); 675 pollset_unlock_rings(); 676 return errno; 677 } 678 } 679 DBG1(4, "asyncio() completed rc %d\n", ret); 680 } 681 682 DBG(6, "after prior ready loop\n"); /* chatty w/timeouts, hence 6 */ 683 684 /* Gather async poll completions that have occurred since the last call */ 685 while (0 < msgrcv(priv->msg_q, &msg_buff, sizeof(asio_elem_t *), 0, 686 IPC_NOWAIT)) { 687 process_msg(pollset, &msg_buff); 688 } 689 690 /* Suspend if nothing is ready yet. */ 691 if (APR_RING_EMPTY(&(priv->ready_ring), asio_elem_t, link)) { 692 693 if (timeout >= 0) { 694 tv.tv_sec = apr_time_sec(timeout); 695 tv.tv_nsec = apr_time_usec(timeout) * 1000; 696 } else { 697 tv.tv_sec = INT_MAX; /* block until something is ready */ 698 } 699 700 DBG2(6, "nothing on the ready ring " 701 "- blocking for %d seconds %d ns\n", 702 tv.tv_sec, tv.tv_nsec); 703 704 pollset_unlock_rings(); /* allow other apr_pollset_* calls while blocked */ 705 706 if (0 >= (ret = __msgrcv_timed(priv->msg_q, &msg_buff, 707 sizeof(asio_elem_t *), 0, NULL, &tv))) { 708#if DEBUG 709 if (errno == EAGAIN) { 710 DBG(6, "__msgrcv_timed timed out\n"); /* timeout path, so 6 */ 711 } 712 else { 713 DBG(1, "__msgrcv_timed failed!\n"); 714 } 715#endif 716 return (errno == EAGAIN) ? APR_TIMEUP : errno; 717 } 718 719 pollset_lock_rings(); 720 721 process_msg(pollset, &msg_buff); 722 } 723 724 APR_RING_INIT(&priv->prior_ready_ring, asio_elem_t, link); 725 726 (*num) = 0; 727 elem = APR_RING_FIRST(&(priv->ready_ring)); 728 729 for (i = 0; 730 731 i < priv->size 732 && elem != APR_RING_SENTINEL(&(priv->ready_ring), asio_elem_t, link); 733 i++) { 734 DBG2(5, "ready ring: elem %08p, fd %d\n", elem, elem->os_pfd.fd); 735 736 priv->result_set[i] = elem->pfd; 737 priv->result_set[i].rtnevents 738 = get_revent(elem->os_pfd.revents); 739 (*num)++; 740 741 elem = APR_RING_NEXT(elem, link); 742 743#if DEBUG 744 if (elem == APR_RING_SENTINEL(&(priv->ready_ring), asio_elem_t, link)) { 745 DBG(5, "end of ready ring reached\n"); 746 } 747#endif 748 } 749 750 if (descriptors) { 751 *descriptors = priv->result_set; 752 } 753 754 /* if the result size is too small, remember which descriptors 755 * haven't had results reported yet. we will look 756 * at these descriptors on the next apr_pollset_poll call 757 */ 758 759 APR_RING_CONCAT(&priv->prior_ready_ring, &(priv->ready_ring), asio_elem_t, link); 760 761 DBG1(2, "exiting, rv = %d\n", rv); 762 763 pollset_unlock_rings(); 764 765 return rv; 766} /* end of asio_pollset_poll */ 767 768static apr_pollset_provider_t impl = { 769 asio_pollset_create, 770 asio_pollset_add, 771 asio_pollset_remove, 772 asio_pollset_poll, 773 asio_pollset_cleanup, 774 "asio" 775}; 776 777apr_pollset_provider_t *apr_pollset_provider_aio_msgq = &impl; 778 779#endif /* HAVE_AIO_MSGQ */ 780