1171169Smlaier/* 2171169Smlaier * Copyright (c) 2002-2004 Niels Provos <provos@citi.umich.edu> 3171169Smlaier * All rights reserved. 4171169Smlaier * 5171169Smlaier * Redistribution and use in source and binary forms, with or without 6171169Smlaier * modification, are permitted provided that the following conditions 7171169Smlaier * are met: 8171169Smlaier * 1. Redistributions of source code must retain the above copyright 9171169Smlaier * notice, this list of conditions and the following disclaimer. 10171169Smlaier * 2. Redistributions in binary form must reproduce the above copyright 11171169Smlaier * notice, this list of conditions and the following disclaimer in the 12171169Smlaier * documentation and/or other materials provided with the distribution. 13171169Smlaier * 3. The name of the author may not be used to endorse or promote products 14171169Smlaier * derived from this software without specific prior written permission. 15171169Smlaier * 16171169Smlaier * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 17171169Smlaier * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 18171169Smlaier * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 19171169Smlaier * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 20171169Smlaier * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 21171169Smlaier * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 22171169Smlaier * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 23171169Smlaier * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 24171169Smlaier * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 25171169Smlaier * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26171169Smlaier */ 27171169Smlaier 28171169Smlaier#include <sys/types.h> 29171169Smlaier 30171169Smlaier#ifdef HAVE_CONFIG_H 31171169Smlaier#include "config.h" 32171169Smlaier#endif 33171169Smlaier 34171169Smlaier#ifdef HAVE_SYS_TIME_H 35171169Smlaier#include <sys/time.h> 36171169Smlaier#endif 37171169Smlaier 38171169Smlaier#include <errno.h> 39171169Smlaier#include <stdio.h> 40171169Smlaier#include <stdlib.h> 41171169Smlaier#include <string.h> 42171169Smlaier#ifdef HAVE_STDARG_H 43171169Smlaier#include <stdarg.h> 44171169Smlaier#endif 45171169Smlaier 46171169Smlaier#include "event.h" 47171169Smlaier 48171169Smlaier/* prototypes */ 49171169Smlaier 50171169Smlaiervoid bufferevent_setwatermark(struct bufferevent *, short, size_t, size_t); 51171169Smlaiervoid bufferevent_read_pressure_cb(struct evbuffer *, size_t, size_t, void *); 52171169Smlaier 53171169Smlaierstatic int 54171169Smlaierbufferevent_add(struct event *ev, int timeout) 55171169Smlaier{ 56171169Smlaier struct timeval tv, *ptv = NULL; 57171169Smlaier 58171169Smlaier if (timeout) { 59171169Smlaier timerclear(&tv); 60171169Smlaier tv.tv_sec = timeout; 61171169Smlaier ptv = &tv; 62171169Smlaier } 63171169Smlaier 64171169Smlaier return (event_add(ev, ptv)); 65171169Smlaier} 66171169Smlaier 67171169Smlaier/* 68171169Smlaier * This callback is executed when the size of the input buffer changes. 69171169Smlaier * We use it to apply back pressure on the reading side. 70171169Smlaier */ 71171169Smlaier 72171169Smlaiervoid 73171169Smlaierbufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now, 74171169Smlaier void *arg) { 75171169Smlaier struct bufferevent *bufev = arg; 76171169Smlaier /* 77171169Smlaier * If we are below the watermark then reschedule reading if it's 78171169Smlaier * still enabled. 79171169Smlaier */ 80171169Smlaier if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) { 81171169Smlaier evbuffer_setcb(buf, NULL, NULL); 82171169Smlaier 83171169Smlaier if (bufev->enabled & EV_READ) 84171169Smlaier bufferevent_add(&bufev->ev_read, bufev->timeout_read); 85171169Smlaier } 86171169Smlaier} 87171169Smlaier 88171169Smlaierstatic void 89171169Smlaierbufferevent_readcb(int fd, short event, void *arg) 90171169Smlaier{ 91171169Smlaier struct bufferevent *bufev = arg; 92171169Smlaier int res = 0; 93171169Smlaier short what = EVBUFFER_READ; 94171169Smlaier size_t len; 95171169Smlaier int howmuch = -1; 96171169Smlaier 97171169Smlaier if (event == EV_TIMEOUT) { 98171169Smlaier what |= EVBUFFER_TIMEOUT; 99171169Smlaier goto error; 100171169Smlaier } 101171169Smlaier 102171169Smlaier /* 103171169Smlaier * If we have a high watermark configured then we don't want to 104171169Smlaier * read more data than would make us reach the watermark. 105171169Smlaier */ 106171169Smlaier if (bufev->wm_read.high != 0) 107171169Smlaier howmuch = bufev->wm_read.high; 108171169Smlaier 109171169Smlaier res = evbuffer_read(bufev->input, fd, howmuch); 110171169Smlaier if (res == -1) { 111171169Smlaier if (errno == EAGAIN || errno == EINTR) 112171169Smlaier goto reschedule; 113171169Smlaier /* error case */ 114171169Smlaier what |= EVBUFFER_ERROR; 115171169Smlaier } else if (res == 0) { 116171169Smlaier /* eof case */ 117171169Smlaier what |= EVBUFFER_EOF; 118171169Smlaier } 119171169Smlaier 120171169Smlaier if (res <= 0) 121171169Smlaier goto error; 122171169Smlaier 123171169Smlaier bufferevent_add(&bufev->ev_read, bufev->timeout_read); 124171169Smlaier 125171169Smlaier /* See if this callbacks meets the water marks */ 126171169Smlaier len = EVBUFFER_LENGTH(bufev->input); 127171169Smlaier if (bufev->wm_read.low != 0 && len < bufev->wm_read.low) 128171169Smlaier return; 129171169Smlaier if (bufev->wm_read.high != 0 && len > bufev->wm_read.high) { 130171169Smlaier struct evbuffer *buf = bufev->input; 131171169Smlaier event_del(&bufev->ev_read); 132171169Smlaier 133171169Smlaier /* Now schedule a callback for us */ 134171169Smlaier evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev); 135171169Smlaier return; 136171169Smlaier } 137171169Smlaier 138171169Smlaier /* Invoke the user callback - must always be called last */ 139171169Smlaier if (bufev->readcb != NULL) 140171169Smlaier (*bufev->readcb)(bufev, bufev->cbarg); 141171169Smlaier return; 142171169Smlaier 143171169Smlaier reschedule: 144171169Smlaier bufferevent_add(&bufev->ev_read, bufev->timeout_read); 145171169Smlaier return; 146171169Smlaier 147171169Smlaier error: 148171169Smlaier (*bufev->errorcb)(bufev, what, bufev->cbarg); 149171169Smlaier} 150171169Smlaier 151171169Smlaierstatic void 152171169Smlaierbufferevent_writecb(int fd, short event, void *arg) 153171169Smlaier{ 154171169Smlaier struct bufferevent *bufev = arg; 155171169Smlaier int res = 0; 156171169Smlaier short what = EVBUFFER_WRITE; 157171169Smlaier 158171169Smlaier if (event == EV_TIMEOUT) { 159171169Smlaier what |= EVBUFFER_TIMEOUT; 160171169Smlaier goto error; 161171169Smlaier } 162171169Smlaier 163171169Smlaier if (EVBUFFER_LENGTH(bufev->output)) { 164171169Smlaier res = evbuffer_write(bufev->output, fd); 165171169Smlaier if (res == -1) { 166171169Smlaier#ifndef WIN32 167171169Smlaier/*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not 168171169Smlaier *set errno. thus this error checking is not portable*/ 169171169Smlaier if (errno == EAGAIN || 170171169Smlaier errno == EINTR || 171171169Smlaier errno == EINPROGRESS) 172171169Smlaier goto reschedule; 173171169Smlaier /* error case */ 174171169Smlaier what |= EVBUFFER_ERROR; 175171169Smlaier 176171169Smlaier#else 177171169Smlaier goto reschedule; 178171169Smlaier#endif 179171169Smlaier 180171169Smlaier } else if (res == 0) { 181171169Smlaier /* eof case */ 182171169Smlaier what |= EVBUFFER_EOF; 183171169Smlaier } 184171169Smlaier if (res <= 0) 185171169Smlaier goto error; 186171169Smlaier } 187171169Smlaier 188171169Smlaier if (EVBUFFER_LENGTH(bufev->output) != 0) 189171169Smlaier bufferevent_add(&bufev->ev_write, bufev->timeout_write); 190171169Smlaier 191171169Smlaier /* 192171169Smlaier * Invoke the user callback if our buffer is drained or below the 193171169Smlaier * low watermark. 194171169Smlaier */ 195171169Smlaier if (bufev->writecb != NULL && 196171169Smlaier EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low) 197171169Smlaier (*bufev->writecb)(bufev, bufev->cbarg); 198171169Smlaier 199171169Smlaier return; 200171169Smlaier 201171169Smlaier reschedule: 202171169Smlaier if (EVBUFFER_LENGTH(bufev->output) != 0) 203171169Smlaier bufferevent_add(&bufev->ev_write, bufev->timeout_write); 204171169Smlaier return; 205171169Smlaier 206171169Smlaier error: 207171169Smlaier (*bufev->errorcb)(bufev, what, bufev->cbarg); 208171169Smlaier} 209171169Smlaier 210171169Smlaier/* 211171169Smlaier * Create a new buffered event object. 212171169Smlaier * 213171169Smlaier * The read callback is invoked whenever we read new data. 214171169Smlaier * The write callback is invoked whenever the output buffer is drained. 215171169Smlaier * The error callback is invoked on a write/read error or on EOF. 216171169Smlaier * 217171169Smlaier * Both read and write callbacks maybe NULL. The error callback is not 218171169Smlaier * allowed to be NULL and have to be provided always. 219171169Smlaier */ 220171169Smlaier 221171169Smlaierstruct bufferevent * 222171169Smlaierbufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb, 223171169Smlaier everrorcb errorcb, void *cbarg) 224171169Smlaier{ 225171169Smlaier struct bufferevent *bufev; 226171169Smlaier 227171169Smlaier if ((bufev = calloc(1, sizeof(struct bufferevent))) == NULL) 228171169Smlaier return (NULL); 229171169Smlaier 230171169Smlaier if ((bufev->input = evbuffer_new()) == NULL) { 231171169Smlaier free(bufev); 232171169Smlaier return (NULL); 233171169Smlaier } 234171169Smlaier 235171169Smlaier if ((bufev->output = evbuffer_new()) == NULL) { 236171169Smlaier evbuffer_free(bufev->input); 237171169Smlaier free(bufev); 238171169Smlaier return (NULL); 239171169Smlaier } 240171169Smlaier 241171169Smlaier event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev); 242171169Smlaier event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev); 243171169Smlaier 244171169Smlaier bufev->readcb = readcb; 245171169Smlaier bufev->writecb = writecb; 246171169Smlaier bufev->errorcb = errorcb; 247171169Smlaier 248171169Smlaier bufev->cbarg = cbarg; 249171169Smlaier 250171169Smlaier /* 251171169Smlaier * Set to EV_WRITE so that using bufferevent_write is going to 252171169Smlaier * trigger a callback. Reading needs to be explicitly enabled 253171169Smlaier * because otherwise no data will be available. 254171169Smlaier */ 255171169Smlaier bufev->enabled = EV_WRITE; 256171169Smlaier 257171169Smlaier return (bufev); 258171169Smlaier} 259171169Smlaier 260171169Smlaierint 261171169Smlaierbufferevent_priority_set(struct bufferevent *bufev, int priority) 262171169Smlaier{ 263171169Smlaier if (event_priority_set(&bufev->ev_read, priority) == -1) 264171169Smlaier return (-1); 265171169Smlaier if (event_priority_set(&bufev->ev_write, priority) == -1) 266171169Smlaier return (-1); 267171169Smlaier 268171169Smlaier return (0); 269171169Smlaier} 270171169Smlaier 271171169Smlaier/* Closing the file descriptor is the responsibility of the caller */ 272171169Smlaier 273171169Smlaiervoid 274171169Smlaierbufferevent_free(struct bufferevent *bufev) 275171169Smlaier{ 276171169Smlaier event_del(&bufev->ev_read); 277171169Smlaier event_del(&bufev->ev_write); 278171169Smlaier 279171169Smlaier evbuffer_free(bufev->input); 280171169Smlaier evbuffer_free(bufev->output); 281171169Smlaier 282171169Smlaier free(bufev); 283171169Smlaier} 284171169Smlaier 285171169Smlaier/* 286171169Smlaier * Returns 0 on success; 287171169Smlaier * -1 on failure. 288171169Smlaier */ 289171169Smlaier 290171169Smlaierint 291171169Smlaierbufferevent_write(struct bufferevent *bufev, void *data, size_t size) 292171169Smlaier{ 293171169Smlaier int res; 294171169Smlaier 295171169Smlaier res = evbuffer_add(bufev->output, data, size); 296171169Smlaier 297171169Smlaier if (res == -1) 298171169Smlaier return (res); 299171169Smlaier 300171169Smlaier /* If everything is okay, we need to schedule a write */ 301171169Smlaier if (size > 0 && (bufev->enabled & EV_WRITE)) 302171169Smlaier bufferevent_add(&bufev->ev_write, bufev->timeout_write); 303171169Smlaier 304171169Smlaier return (res); 305171169Smlaier} 306171169Smlaier 307171169Smlaierint 308171169Smlaierbufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf) 309171169Smlaier{ 310171169Smlaier int res; 311171169Smlaier 312171169Smlaier res = bufferevent_write(bufev, buf->buffer, buf->off); 313171169Smlaier if (res != -1) 314171169Smlaier evbuffer_drain(buf, buf->off); 315171169Smlaier 316171169Smlaier return (res); 317171169Smlaier} 318171169Smlaier 319171169Smlaiersize_t 320171169Smlaierbufferevent_read(struct bufferevent *bufev, void *data, size_t size) 321171169Smlaier{ 322171169Smlaier struct evbuffer *buf = bufev->input; 323171169Smlaier 324171169Smlaier if (buf->off < size) 325171169Smlaier size = buf->off; 326171169Smlaier 327171169Smlaier /* Copy the available data to the user buffer */ 328171169Smlaier memcpy(data, buf->buffer, size); 329171169Smlaier 330171169Smlaier if (size) 331171169Smlaier evbuffer_drain(buf, size); 332171169Smlaier 333171169Smlaier return (size); 334171169Smlaier} 335171169Smlaier 336171169Smlaierint 337171169Smlaierbufferevent_enable(struct bufferevent *bufev, short event) 338171169Smlaier{ 339171169Smlaier if (event & EV_READ) { 340171169Smlaier if (bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1) 341171169Smlaier return (-1); 342171169Smlaier } 343171169Smlaier if (event & EV_WRITE) { 344171169Smlaier if (bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1) 345171169Smlaier return (-1); 346171169Smlaier } 347171169Smlaier 348171169Smlaier bufev->enabled |= event; 349171169Smlaier return (0); 350171169Smlaier} 351171169Smlaier 352171169Smlaierint 353171169Smlaierbufferevent_disable(struct bufferevent *bufev, short event) 354171169Smlaier{ 355171169Smlaier if (event & EV_READ) { 356171169Smlaier if (event_del(&bufev->ev_read) == -1) 357171169Smlaier return (-1); 358171169Smlaier } 359171169Smlaier if (event & EV_WRITE) { 360171169Smlaier if (event_del(&bufev->ev_write) == -1) 361171169Smlaier return (-1); 362171169Smlaier } 363171169Smlaier 364171169Smlaier bufev->enabled &= ~event; 365171169Smlaier return (0); 366171169Smlaier} 367171169Smlaier 368171169Smlaier/* 369171169Smlaier * Sets the read and write timeout for a buffered event. 370171169Smlaier */ 371171169Smlaier 372171169Smlaiervoid 373171169Smlaierbufferevent_settimeout(struct bufferevent *bufev, 374171169Smlaier int timeout_read, int timeout_write) { 375171169Smlaier bufev->timeout_read = timeout_read; 376171169Smlaier bufev->timeout_write = timeout_write; 377171169Smlaier} 378171169Smlaier 379171169Smlaier/* 380171169Smlaier * Sets the water marks 381171169Smlaier */ 382171169Smlaier 383171169Smlaiervoid 384171169Smlaierbufferevent_setwatermark(struct bufferevent *bufev, short events, 385171169Smlaier size_t lowmark, size_t highmark) 386171169Smlaier{ 387171169Smlaier if (events & EV_READ) { 388171169Smlaier bufev->wm_read.low = lowmark; 389171169Smlaier bufev->wm_read.high = highmark; 390171169Smlaier } 391171169Smlaier 392171169Smlaier if (events & EV_WRITE) { 393171169Smlaier bufev->wm_write.low = lowmark; 394171169Smlaier bufev->wm_write.high = highmark; 395171169Smlaier } 396171169Smlaier 397171169Smlaier /* If the watermarks changed then see if we should call read again */ 398171169Smlaier bufferevent_read_pressure_cb(bufev->input, 399171169Smlaier 0, EVBUFFER_LENGTH(bufev->input), bufev); 400171169Smlaier} 401171169Smlaier 402171169Smlaierint 403171169Smlaierbufferevent_base_set(struct event_base *base, struct bufferevent *bufev) 404171169Smlaier{ 405171169Smlaier int res; 406171169Smlaier 407171169Smlaier res = event_base_set(base, &bufev->ev_read); 408171169Smlaier if (res == -1) 409171169Smlaier return (res); 410171169Smlaier 411171169Smlaier res = event_base_set(base, &bufev->ev_write); 412171169Smlaier return (res); 413171169Smlaier} 414