event_iocp.c revision 290001
155714Skris/* 255714Skris * Copyright (c) 2009-2012 Niels Provos, Nick Mathewson 355714Skris * 455714Skris * Redistribution and use in source and binary forms, with or without 555714Skris * modification, are permitted provided that the following conditions 655714Skris * are met: 755714Skris * 1. Redistributions of source code must retain the above copyright 8280304Sjkim * notice, this list of conditions and the following disclaimer. 955714Skris * 2. Redistributions in binary form must reproduce the above copyright 1055714Skris * notice, this list of conditions and the following disclaimer in the 1155714Skris * documentation and/or other materials provided with the distribution. 1255714Skris * 3. The name of the author may not be used to endorse or promote products 1355714Skris * derived from this software without specific prior written permission. 1455714Skris * 15280304Sjkim * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 1655714Skris * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 1755714Skris * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 1855714Skris * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 1955714Skris * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 2055714Skris * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 2155714Skris * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 22280304Sjkim * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 2355714Skris * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 2455714Skris * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 2555714Skris */ 2655714Skris#include "evconfig-private.h" 2755714Skris 2855714Skris#ifndef _WIN32_WINNT 2955714Skris/* Minimum required for InitializeCriticalSectionAndSpinCount */ 3055714Skris#define _WIN32_WINNT 0x0403 3155714Skris#endif 3255714Skris#include <winsock2.h> 3355714Skris#include <windows.h> 3455714Skris#include <process.h> 3555714Skris#include <stdio.h> 3655714Skris#include <mswsock.h> 37280304Sjkim 3855714Skris#include "event2/util.h" 3955714Skris#include "util-internal.h" 40280304Sjkim#include "iocp-internal.h" 4155714Skris#include "log-internal.h" 4255714Skris#include "mm-internal.h" 4355714Skris#include "event-internal.h" 4455714Skris#include "evthread-internal.h" 4555714Skris 4655714Skris#define NOTIFICATION_KEY ((ULONG_PTR)-1) 4755714Skris 4855714Skrisvoid 4955714Skrisevent_overlapped_init_(struct event_overlapped *o, iocp_callback cb) 5055714Skris{ 5155714Skris memset(o, 0, sizeof(struct event_overlapped)); 52280304Sjkim o->cb = cb; 5355714Skris} 5455714Skris 5555714Skrisstatic void 5655714Skrishandle_entry(OVERLAPPED *o, ULONG_PTR completion_key, DWORD nBytes, int ok) 5755714Skris{ 5855714Skris struct event_overlapped *eo = 5955714Skris EVUTIL_UPCAST(o, struct event_overlapped, overlapped); 6055714Skris eo->cb(eo, completion_key, nBytes, ok); 6155714Skris} 6255714Skris 6355714Skrisstatic void 6455714Skrisloop(void *port_) 6555714Skris{ 6659191Skris struct event_iocp_port *port = port_; 6755714Skris long ms = port->ms; 6855714Skris HANDLE p = port->port; 69280304Sjkim 70280304Sjkim if (ms <= 0) 71280304Sjkim ms = INFINITE; 7255714Skris 7355714Skris while (1) { 74280304Sjkim OVERLAPPED *overlapped=NULL; 75280304Sjkim ULONG_PTR key=0; 76280304Sjkim DWORD bytes=0; 7755714Skris int ok = GetQueuedCompletionStatus(p, &bytes, &key, 7855714Skris &overlapped, ms); 79280304Sjkim EnterCriticalSection(&port->lock); 80280304Sjkim if (port->shutdown) { 81280304Sjkim if (--port->n_live_threads == 0) 8255714Skris ReleaseSemaphore(port->shutdownSemaphore, 1, 8355714Skris NULL); 84280304Sjkim LeaveCriticalSection(&port->lock); 85280304Sjkim return; 86280304Sjkim } 8755714Skris LeaveCriticalSection(&port->lock); 8855714Skris 89280304Sjkim if (key != NOTIFICATION_KEY && overlapped) 90280304Sjkim handle_entry(overlapped, key, bytes, ok); 91280304Sjkim else if (!overlapped) 9255714Skris break; 9355714Skris } 94280304Sjkim event_warnx("GetQueuedCompletionStatus exited with no event."); 95280304Sjkim EnterCriticalSection(&port->lock); 96280304Sjkim if (--port->n_live_threads == 0) 9755714Skris ReleaseSemaphore(port->shutdownSemaphore, 1, NULL); 9859191Skris LeaveCriticalSection(&port->lock); 9959191Skris} 100280304Sjkim 10159191Skrisint 10259191Skrisevent_iocp_port_associate_(struct event_iocp_port *port, evutil_socket_t fd, 103109998Smarkm ev_uintptr_t key) 104280304Sjkim{ 105109998Smarkm HANDLE h; 106280304Sjkim h = CreateIoCompletionPort((HANDLE)fd, port->port, key, port->n_threads); 107109998Smarkm if (!h) 108109998Smarkm return -1; 10955714Skris return 0; 110280304Sjkim} 111280304Sjkim 112280304Sjkimstatic void * 11355714Skrisget_extension_function(SOCKET s, const GUID *which_fn) 11455714Skris{ 115280304Sjkim void *ptr = NULL; 116280304Sjkim DWORD bytes=0; 117280304Sjkim WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, 11855714Skris (GUID*)which_fn, sizeof(*which_fn), 11955714Skris &ptr, sizeof(ptr), 120280304Sjkim &bytes, NULL, NULL); 121280304Sjkim 122280304Sjkim /* No need to detect errors here: if ptr is set, then we have a good 12355714Skris function pointer. Otherwise, we should behave as if we had no 12455714Skris function pointer. 125280304Sjkim */ 126280304Sjkim return ptr; 127280304Sjkim} 12855714Skris 12955714Skris/* Mingw doesn't have these in its mswsock.h. The values are copied from 130280304Sjkim wine.h. Perhaps if we copy them exactly, the cargo will come again. 131280304Sjkim*/ 132280304Sjkim#ifndef WSAID_ACCEPTEX 133280304Sjkim#define WSAID_ACCEPTEX \ 13455714Skris {0xb5367df1,0xcbac,0x11cf,{0x95,0xca,0x00,0x80,0x5f,0x48,0xa1,0x92}} 13555714Skris#endif 136280304Sjkim#ifndef WSAID_CONNECTEX 137280304Sjkim#define WSAID_CONNECTEX \ 138280304Sjkim {0x25a207b9,0xddf3,0x4660,{0x8e,0xe9,0x76,0xe5,0x8c,0x74,0x06,0x3e}} 13955714Skris#endif 14055714Skris#ifndef WSAID_GETACCEPTEXSOCKADDRS 141280304Sjkim#define WSAID_GETACCEPTEXSOCKADDRS \ 142280304Sjkim {0xb5367df2,0xcbac,0x11cf,{0x95,0xca,0x00,0x80,0x5f,0x48,0xa1,0x92}} 143280304Sjkim#endif 14455714Skris 14555714Skrisstatic int extension_fns_initialized = 0; 146280304Sjkim 147280304Sjkimstatic void 148280304Sjkiminit_extension_functions(struct win32_extension_fns *ext) 14955714Skris{ 15059191Skris const GUID acceptex = WSAID_ACCEPTEX; 15159191Skris const GUID connectex = WSAID_CONNECTEX; 152280304Sjkim const GUID getacceptexsockaddrs = WSAID_GETACCEPTEXSOCKADDRS; 15359191Skris SOCKET s = socket(AF_INET, SOCK_STREAM, 0); 15459191Skris if (s == INVALID_SOCKET) 155109998Smarkm return; 156280304Sjkim ext->AcceptEx = get_extension_function(s, &acceptex); 157109998Smarkm ext->ConnectEx = get_extension_function(s, &connectex); 158280304Sjkim ext->GetAcceptExSockaddrs = get_extension_function(s, 159280304Sjkim &getacceptexsockaddrs); 160109998Smarkm closesocket(s); 161109998Smarkm 16255714Skris extension_fns_initialized = 1; 163280304Sjkim} 164280304Sjkim 165280304Sjkimstatic struct win32_extension_fns the_extension_fns; 16655714Skris 16755714Skrisconst struct win32_extension_fns * 168280304Sjkimevent_get_win32_extension_fns_(void) 169280304Sjkim{ 170280304Sjkim return &the_extension_fns; 17155714Skris} 17255714Skris 173280304Sjkim#define N_CPUS_DEFAULT 2 174280304Sjkim 175280304Sjkimstruct event_iocp_port * 176280304Sjkimevent_iocp_port_launch_(int n_cpus) 17755714Skris{ 17855714Skris struct event_iocp_port *port; 179280304Sjkim int i; 180280304Sjkim 181280304Sjkim if (!extension_fns_initialized) 18255714Skris init_extension_functions(&the_extension_fns); 18355714Skris 184280304Sjkim if (!(port = mm_calloc(1, sizeof(struct event_iocp_port)))) 185280304Sjkim return NULL; 186280304Sjkim 18755714Skris if (n_cpus <= 0) 18855714Skris n_cpus = N_CPUS_DEFAULT; 189280304Sjkim port->n_threads = n_cpus * 2; 190280304Sjkim port->threads = mm_calloc(port->n_threads, sizeof(HANDLE)); 191280304Sjkim if (!port->threads) 19255714Skris goto err; 19355714Skris 194280304Sjkim port->port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 195280304Sjkim n_cpus); 196280304Sjkim port->ms = -1; 19755714Skris if (!port->port) 19859191Skris goto err; 19959191Skris 200280304Sjkim port->shutdownSemaphore = CreateSemaphore(NULL, 0, 1, NULL); 20159191Skris if (!port->shutdownSemaphore) 20259191Skris goto err; 203109998Smarkm 204280304Sjkim for (i=0; i<port->n_threads; ++i) { 205109998Smarkm ev_uintptr_t th = _beginthread(loop, 0, port); 206280304Sjkim if (th == (ev_uintptr_t)-1) 207109998Smarkm goto err; 208109998Smarkm port->threads[i] = (HANDLE)th; 20955714Skris ++port->n_live_threads; 210280304Sjkim } 21155714Skris 212 InitializeCriticalSectionAndSpinCount(&port->lock, 1000); 213 214 return port; 215err: 216 if (port->port) 217 CloseHandle(port->port); 218 if (port->threads) 219 mm_free(port->threads); 220 if (port->shutdownSemaphore) 221 CloseHandle(port->shutdownSemaphore); 222 mm_free(port); 223 return NULL; 224} 225 226static void 227event_iocp_port_unlock_and_free_(struct event_iocp_port *port) 228{ 229 DeleteCriticalSection(&port->lock); 230 CloseHandle(port->port); 231 CloseHandle(port->shutdownSemaphore); 232 mm_free(port->threads); 233 mm_free(port); 234} 235 236static int 237event_iocp_notify_all(struct event_iocp_port *port) 238{ 239 int i, r, ok=1; 240 for (i=0; i<port->n_threads; ++i) { 241 r = PostQueuedCompletionStatus(port->port, 0, NOTIFICATION_KEY, 242 NULL); 243 if (!r) 244 ok = 0; 245 } 246 return ok ? 0 : -1; 247} 248 249int 250event_iocp_shutdown_(struct event_iocp_port *port, long waitMsec) 251{ 252 DWORD ms = INFINITE; 253 int n; 254 255 EnterCriticalSection(&port->lock); 256 port->shutdown = 1; 257 LeaveCriticalSection(&port->lock); 258 event_iocp_notify_all(port); 259 260 if (waitMsec >= 0) 261 ms = waitMsec; 262 263 WaitForSingleObject(port->shutdownSemaphore, ms); 264 EnterCriticalSection(&port->lock); 265 n = port->n_live_threads; 266 LeaveCriticalSection(&port->lock); 267 if (n == 0) { 268 event_iocp_port_unlock_and_free_(port); 269 return 0; 270 } else { 271 return -1; 272 } 273} 274 275int 276event_iocp_activate_overlapped_( 277 struct event_iocp_port *port, struct event_overlapped *o, 278 ev_uintptr_t key, ev_uint32_t n) 279{ 280 BOOL r; 281 282 r = PostQueuedCompletionStatus(port->port, n, key, &o->overlapped); 283 return (r==0) ? -1 : 0; 284} 285 286struct event_iocp_port * 287event_base_get_iocp_(struct event_base *base) 288{ 289#ifdef _WIN32 290 return base->iocp; 291#else 292 return NULL; 293#endif 294} 295