bufferevent_filter.c revision 285612
1/* 2 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson 3 * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu> 4 * All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions 8 * are met: 9 * 1. Redistributions of source code must retain the above copyright 10 * notice, this list of conditions and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 3. The name of the author may not be used to endorse or promote products 15 * derived from this software without specific prior written permission. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 27 */ 28 29#include "evconfig-private.h" 30 31#include <sys/types.h> 32 33#include "event2/event-config.h" 34 35#ifdef EVENT__HAVE_SYS_TIME_H 36#include <sys/time.h> 37#endif 38 39#include <errno.h> 40#include <stdio.h> 41#include <stdlib.h> 42#include <string.h> 43#ifdef EVENT__HAVE_STDARG_H 44#include <stdarg.h> 45#endif 46 47#ifdef _WIN32 48#include <winsock2.h> 49#endif 50 51#include "event2/util.h" 52#include "event2/bufferevent.h" 53#include "event2/buffer.h" 54#include "event2/bufferevent_struct.h" 55#include "event2/event.h" 56#include "log-internal.h" 57#include "mm-internal.h" 58#include "bufferevent-internal.h" 59#include "util-internal.h" 60 61/* prototypes */ 62static int be_filter_enable(struct bufferevent *, short); 63static int be_filter_disable(struct bufferevent *, short); 64static void be_filter_unlink(struct bufferevent *); 65static void be_filter_destruct(struct bufferevent *); 66 67static void be_filter_readcb(struct bufferevent *, void *); 68static void be_filter_writecb(struct bufferevent *, void *); 69static void be_filter_eventcb(struct bufferevent *, short, void *); 70static int be_filter_flush(struct bufferevent *bufev, 71 short iotype, enum bufferevent_flush_mode mode); 72static int be_filter_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *); 73 74static void bufferevent_filtered_outbuf_cb(struct evbuffer *buf, 75 const struct evbuffer_cb_info *info, void *arg); 76 77struct bufferevent_filtered { 78 struct bufferevent_private bev; 79 80 /** The bufferevent that we read/write filtered data from/to. */ 81 struct bufferevent *underlying; 82 /** A callback on our outbuf to notice when somebody adds data */ 83 struct evbuffer_cb_entry *outbuf_cb; 84 /** True iff we have received an EOF callback from the underlying 85 * bufferevent. */ 86 unsigned got_eof; 87 88 /** Function to free context when we're done. */ 89 void (*free_context)(void *); 90 /** Input filter */ 91 bufferevent_filter_cb process_in; 92 /** Output filter */ 93 bufferevent_filter_cb process_out; 94 /** User-supplied argument to the filters. */ 95 void *context; 96}; 97 98const struct bufferevent_ops bufferevent_ops_filter = { 99 "filter", 100 evutil_offsetof(struct bufferevent_filtered, bev.bev), 101 be_filter_enable, 102 be_filter_disable, 103 be_filter_unlink, 104 be_filter_destruct, 105 bufferevent_generic_adj_timeouts_, 106 be_filter_flush, 107 be_filter_ctrl, 108}; 109 110/* Given a bufferevent that's really the bev filter of a bufferevent_filtered, 111 * return that bufferevent_filtered. Returns NULL otherwise.*/ 112static inline struct bufferevent_filtered * 113upcast(struct bufferevent *bev) 114{ 115 struct bufferevent_filtered *bev_f; 116 if (bev->be_ops != &bufferevent_ops_filter) 117 return NULL; 118 bev_f = (void*)( ((char*)bev) - 119 evutil_offsetof(struct bufferevent_filtered, bev.bev)); 120 EVUTIL_ASSERT(bev_f->bev.bev.be_ops == &bufferevent_ops_filter); 121 return bev_f; 122} 123 124#define downcast(bev_f) (&(bev_f)->bev.bev) 125 126/** Return 1 iff bevf's underlying bufferevent's output buffer is at or 127 * over its high watermark such that we should not write to it in a given 128 * flush mode. */ 129static int 130be_underlying_writebuf_full(struct bufferevent_filtered *bevf, 131 enum bufferevent_flush_mode state) 132{ 133 struct bufferevent *u = bevf->underlying; 134 return state == BEV_NORMAL && 135 u->wm_write.high && 136 evbuffer_get_length(u->output) >= u->wm_write.high; 137} 138 139/** Return 1 if our input buffer is at or over its high watermark such that we 140 * should not write to it in a given flush mode. */ 141static int 142be_readbuf_full(struct bufferevent_filtered *bevf, 143 enum bufferevent_flush_mode state) 144{ 145 struct bufferevent *bufev = downcast(bevf); 146 return state == BEV_NORMAL && 147 bufev->wm_read.high && 148 evbuffer_get_length(bufev->input) >= bufev->wm_read.high; 149} 150 151 152/* Filter to use when we're created with a NULL filter. */ 153static enum bufferevent_filter_result 154be_null_filter(struct evbuffer *src, struct evbuffer *dst, ev_ssize_t lim, 155 enum bufferevent_flush_mode state, void *ctx) 156{ 157 (void)state; 158 if (evbuffer_remove_buffer(src, dst, lim) == 0) 159 return BEV_OK; 160 else 161 return BEV_ERROR; 162} 163 164struct bufferevent * 165bufferevent_filter_new(struct bufferevent *underlying, 166 bufferevent_filter_cb input_filter, 167 bufferevent_filter_cb output_filter, 168 int options, 169 void (*free_context)(void *), 170 void *ctx) 171{ 172 struct bufferevent_filtered *bufev_f; 173 int tmp_options = options & ~BEV_OPT_THREADSAFE; 174 175 if (!underlying) 176 return NULL; 177 178 if (!input_filter) 179 input_filter = be_null_filter; 180 if (!output_filter) 181 output_filter = be_null_filter; 182 183 bufev_f = mm_calloc(1, sizeof(struct bufferevent_filtered)); 184 if (!bufev_f) 185 return NULL; 186 187 if (bufferevent_init_common_(&bufev_f->bev, underlying->ev_base, 188 &bufferevent_ops_filter, tmp_options) < 0) { 189 mm_free(bufev_f); 190 return NULL; 191 } 192 if (options & BEV_OPT_THREADSAFE) { 193 bufferevent_enable_locking_(downcast(bufev_f), NULL); 194 } 195 196 bufev_f->underlying = underlying; 197 198 bufev_f->process_in = input_filter; 199 bufev_f->process_out = output_filter; 200 bufev_f->free_context = free_context; 201 bufev_f->context = ctx; 202 203 bufferevent_setcb(bufev_f->underlying, 204 be_filter_readcb, be_filter_writecb, be_filter_eventcb, bufev_f); 205 206 bufev_f->outbuf_cb = evbuffer_add_cb(downcast(bufev_f)->output, 207 bufferevent_filtered_outbuf_cb, bufev_f); 208 209 bufferevent_init_generic_timeout_cbs_(downcast(bufev_f)); 210 bufferevent_incref_(underlying); 211 212 bufferevent_enable(underlying, EV_READ|EV_WRITE); 213 bufferevent_suspend_read_(underlying, BEV_SUSPEND_FILT_READ); 214 215 return downcast(bufev_f); 216} 217 218static void 219be_filter_unlink(struct bufferevent *bev) 220{ 221 struct bufferevent_filtered *bevf = upcast(bev); 222 EVUTIL_ASSERT(bevf); 223 224 if (bevf->bev.options & BEV_OPT_CLOSE_ON_FREE) { 225 /* Yes, there is also a decref in bufferevent_decref_. 226 * That decref corresponds to the incref when we set 227 * underlying for the first time. This decref is an 228 * extra one to remove the last reference. 229 */ 230 if (BEV_UPCAST(bevf->underlying)->refcnt < 2) { 231 event_warnx("BEV_OPT_CLOSE_ON_FREE set on an " 232 "bufferevent with too few references"); 233 } else { 234 bufferevent_free(bevf->underlying); 235 } 236 } else { 237 if (bevf->underlying) { 238 if (bevf->underlying->errorcb == be_filter_eventcb) 239 bufferevent_setcb(bevf->underlying, 240 NULL, NULL, NULL, NULL); 241 bufferevent_unsuspend_read_(bevf->underlying, 242 BEV_SUSPEND_FILT_READ); 243 } 244 } 245} 246 247static void 248be_filter_destruct(struct bufferevent *bev) 249{ 250 struct bufferevent_filtered *bevf = upcast(bev); 251 EVUTIL_ASSERT(bevf); 252 if (bevf->free_context) 253 bevf->free_context(bevf->context); 254} 255 256static int 257be_filter_enable(struct bufferevent *bev, short event) 258{ 259 struct bufferevent_filtered *bevf = upcast(bev); 260 if (event & EV_WRITE) 261 BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); 262 263 if (event & EV_READ) { 264 BEV_RESET_GENERIC_READ_TIMEOUT(bev); 265 bufferevent_unsuspend_read_(bevf->underlying, 266 BEV_SUSPEND_FILT_READ); 267 } 268 return 0; 269} 270 271static int 272be_filter_disable(struct bufferevent *bev, short event) 273{ 274 struct bufferevent_filtered *bevf = upcast(bev); 275 if (event & EV_WRITE) 276 BEV_DEL_GENERIC_WRITE_TIMEOUT(bev); 277 if (event & EV_READ) { 278 BEV_DEL_GENERIC_READ_TIMEOUT(bev); 279 bufferevent_suspend_read_(bevf->underlying, 280 BEV_SUSPEND_FILT_READ); 281 } 282 return 0; 283} 284 285static enum bufferevent_filter_result 286be_filter_process_input(struct bufferevent_filtered *bevf, 287 enum bufferevent_flush_mode state, 288 int *processed_out) 289{ 290 enum bufferevent_filter_result res; 291 struct bufferevent *bev = downcast(bevf); 292 293 if (state == BEV_NORMAL) { 294 /* If we're in 'normal' mode, don't urge data on the filter 295 * unless we're reading data and under our high-water mark.*/ 296 if (!(bev->enabled & EV_READ) || 297 be_readbuf_full(bevf, state)) 298 return BEV_OK; 299 } 300 301 do { 302 ev_ssize_t limit = -1; 303 if (state == BEV_NORMAL && bev->wm_read.high) 304 limit = bev->wm_read.high - 305 evbuffer_get_length(bev->input); 306 307 res = bevf->process_in(bevf->underlying->input, 308 bev->input, limit, state, bevf->context); 309 310 if (res == BEV_OK) 311 *processed_out = 1; 312 } while (res == BEV_OK && 313 (bev->enabled & EV_READ) && 314 evbuffer_get_length(bevf->underlying->input) && 315 !be_readbuf_full(bevf, state)); 316 317 if (*processed_out) 318 BEV_RESET_GENERIC_READ_TIMEOUT(bev); 319 320 return res; 321} 322 323 324static enum bufferevent_filter_result 325be_filter_process_output(struct bufferevent_filtered *bevf, 326 enum bufferevent_flush_mode state, 327 int *processed_out) 328{ 329 /* Requires references and lock: might call writecb */ 330 enum bufferevent_filter_result res = BEV_OK; 331 struct bufferevent *bufev = downcast(bevf); 332 int again = 0; 333 334 if (state == BEV_NORMAL) { 335 /* If we're in 'normal' mode, don't urge data on the 336 * filter unless we're writing data, and the underlying 337 * bufferevent is accepting data, and we have data to 338 * give the filter. If we're in 'flush' or 'finish', 339 * call the filter no matter what. */ 340 if (!(bufev->enabled & EV_WRITE) || 341 be_underlying_writebuf_full(bevf, state) || 342 !evbuffer_get_length(bufev->output)) 343 return BEV_OK; 344 } 345 346 /* disable the callback that calls this function 347 when the user adds to the output buffer. */ 348 evbuffer_cb_set_flags(bufev->output, bevf->outbuf_cb, 0); 349 350 do { 351 int processed = 0; 352 again = 0; 353 354 do { 355 ev_ssize_t limit = -1; 356 if (state == BEV_NORMAL && 357 bevf->underlying->wm_write.high) 358 limit = bevf->underlying->wm_write.high - 359 evbuffer_get_length(bevf->underlying->output); 360 361 res = bevf->process_out(downcast(bevf)->output, 362 bevf->underlying->output, 363 limit, 364 state, 365 bevf->context); 366 367 if (res == BEV_OK) 368 processed = *processed_out = 1; 369 } while (/* Stop if the filter wasn't successful...*/ 370 res == BEV_OK && 371 /* Or if we aren't writing any more. */ 372 (bufev->enabled & EV_WRITE) && 373 /* Of if we have nothing more to write and we are 374 * not flushing. */ 375 evbuffer_get_length(bufev->output) && 376 /* Or if we have filled the underlying output buffer. */ 377 !be_underlying_writebuf_full(bevf,state)); 378 379 if (processed) { 380 /* call the write callback.*/ 381 bufferevent_trigger_nolock_(bufev, EV_WRITE, 0); 382 383 if (res == BEV_OK && 384 (bufev->enabled & EV_WRITE) && 385 evbuffer_get_length(bufev->output) && 386 !be_underlying_writebuf_full(bevf, state)) { 387 again = 1; 388 } 389 } 390 } while (again); 391 392 /* reenable the outbuf_cb */ 393 evbuffer_cb_set_flags(bufev->output,bevf->outbuf_cb, 394 EVBUFFER_CB_ENABLED); 395 396 if (*processed_out) 397 BEV_RESET_GENERIC_WRITE_TIMEOUT(bufev); 398 399 return res; 400} 401 402/* Called when the size of our outbuf changes. */ 403static void 404bufferevent_filtered_outbuf_cb(struct evbuffer *buf, 405 const struct evbuffer_cb_info *cbinfo, void *arg) 406{ 407 struct bufferevent_filtered *bevf = arg; 408 struct bufferevent *bev = downcast(bevf); 409 410 if (cbinfo->n_added) { 411 int processed_any = 0; 412 /* Somebody added more data to the output buffer. Try to 413 * process it, if we should. */ 414 bufferevent_incref_and_lock_(bev); 415 be_filter_process_output(bevf, BEV_NORMAL, &processed_any); 416 bufferevent_decref_and_unlock_(bev); 417 } 418} 419 420/* Called when the underlying socket has read. */ 421static void 422be_filter_readcb(struct bufferevent *underlying, void *me_) 423{ 424 struct bufferevent_filtered *bevf = me_; 425 enum bufferevent_filter_result res; 426 enum bufferevent_flush_mode state; 427 struct bufferevent *bufev = downcast(bevf); 428 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); 429 int processed_any = 0; 430 431 BEV_LOCK(bufev); 432 433 // It's possible our refcount is 0 at this point if another thread free'd our filterevent 434 EVUTIL_ASSERT(bufev_private->refcnt >= 0); 435 436 // If our refcount is > 0 437 if (bufev_private->refcnt > 0) { 438 439 if (bevf->got_eof) 440 state = BEV_FINISHED; 441 else 442 state = BEV_NORMAL; 443 444 /* XXXX use return value */ 445 res = be_filter_process_input(bevf, state, &processed_any); 446 (void)res; 447 448 /* XXX This should be in process_input, not here. There are 449 * other places that can call process-input, and they should 450 * force readcb calls as needed. */ 451 if (processed_any) 452 bufferevent_trigger_nolock_(bufev, EV_READ, 0); 453 } 454 455 BEV_UNLOCK(bufev); 456} 457 458/* Called when the underlying socket has drained enough that we can write to 459 it. */ 460static void 461be_filter_writecb(struct bufferevent *underlying, void *me_) 462{ 463 struct bufferevent_filtered *bevf = me_; 464 struct bufferevent *bev = downcast(bevf); 465 struct bufferevent_private *bufev_private = BEV_UPCAST(bev); 466 int processed_any = 0; 467 468 BEV_LOCK(bev); 469 470 // It's possible our refcount is 0 at this point if another thread free'd our filterevent 471 EVUTIL_ASSERT(bufev_private->refcnt >= 0); 472 473 // If our refcount is > 0 474 if (bufev_private->refcnt > 0) { 475 be_filter_process_output(bevf, BEV_NORMAL, &processed_any); 476 } 477 478 BEV_UNLOCK(bev); 479} 480 481/* Called when the underlying socket has given us an error */ 482static void 483be_filter_eventcb(struct bufferevent *underlying, short what, void *me_) 484{ 485 struct bufferevent_filtered *bevf = me_; 486 struct bufferevent *bev = downcast(bevf); 487 struct bufferevent_private *bufev_private = BEV_UPCAST(bev); 488 489 BEV_LOCK(bev); 490 491 // It's possible our refcount is 0 at this point if another thread free'd our filterevent 492 EVUTIL_ASSERT(bufev_private->refcnt >= 0); 493 494 // If our refcount is > 0 495 if (bufev_private->refcnt > 0) { 496 497 /* All we can really to is tell our own eventcb. */ 498 bufferevent_run_eventcb_(bev, what, 0); 499 } 500 501 BEV_UNLOCK(bev); 502} 503 504static int 505be_filter_flush(struct bufferevent *bufev, 506 short iotype, enum bufferevent_flush_mode mode) 507{ 508 struct bufferevent_filtered *bevf = upcast(bufev); 509 int processed_any = 0; 510 EVUTIL_ASSERT(bevf); 511 512 bufferevent_incref_and_lock_(bufev); 513 514 if (iotype & EV_READ) { 515 be_filter_process_input(bevf, mode, &processed_any); 516 } 517 if (iotype & EV_WRITE) { 518 be_filter_process_output(bevf, mode, &processed_any); 519 } 520 /* XXX check the return value? */ 521 /* XXX does this want to recursively call lower-level flushes? */ 522 bufferevent_flush(bevf->underlying, iotype, mode); 523 524 bufferevent_decref_and_unlock_(bufev); 525 526 return processed_any; 527} 528 529static int 530be_filter_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op, 531 union bufferevent_ctrl_data *data) 532{ 533 struct bufferevent_filtered *bevf; 534 switch (op) { 535 case BEV_CTRL_GET_UNDERLYING: 536 bevf = upcast(bev); 537 data->ptr = bevf->underlying; 538 return 0; 539 case BEV_CTRL_GET_FD: 540 case BEV_CTRL_SET_FD: 541 case BEV_CTRL_CANCEL_ALL: 542 default: 543 return -1; 544 } 545} 546