1275970Scy/* 2275970Scy * Copyright (c) 2000-2007 Niels Provos <provos@citi.umich.edu> 3275970Scy * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson 4275970Scy * 5275970Scy * Redistribution and use in source and binary forms, with or without 6275970Scy * modification, are permitted provided that the following conditions 7275970Scy * are met: 8275970Scy * 1. Redistributions of source code must retain the above copyright 9275970Scy * notice, this list of conditions and the following disclaimer. 10275970Scy * 2. Redistributions in binary form must reproduce the above copyright 11275970Scy * notice, this list of conditions and the following disclaimer in the 12275970Scy * documentation and/or other materials provided with the distribution. 13275970Scy * 3. The name of the author may not be used to endorse or promote products 14275970Scy * derived from this software without specific prior written permission. 15275970Scy * 16275970Scy * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 17275970Scy * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 18275970Scy * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 19275970Scy * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 20275970Scy * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 21275970Scy * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 22275970Scy * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 23275970Scy * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 24275970Scy * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 25275970Scy * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26275970Scy */ 27275970Scy#include "event2/event-config.h" 28275970Scy#include "evconfig-private.h" 29275970Scy 30275970Scy#ifdef _WIN32 31275970Scy#define WIN32_LEAN_AND_MEAN 32275970Scy#include <winsock2.h> 33275970Scy#include <windows.h> 34275970Scy#undef WIN32_LEAN_AND_MEAN 35275970Scy#endif 36275970Scy 37275970Scy#include <sys/types.h> 38275970Scy#ifndef _WIN32 39275970Scy#include <sys/socket.h> 40275970Scy#endif 41275970Scy#ifdef EVENT__HAVE_SYS_TIME_H 42275970Scy#include <sys/time.h> 43275970Scy#endif 44275970Scy#include <sys/queue.h> 45275970Scy#include <stdio.h> 46275970Scy#include <stdlib.h> 47275970Scy#ifndef _WIN32 48275970Scy#include <unistd.h> 49275970Scy#endif 50275970Scy#include <errno.h> 51275970Scy#include <signal.h> 52275970Scy#include <string.h> 53275970Scy 54275970Scy#include <sys/queue.h> 55275970Scy 56275970Scy#include "event2/event.h" 57275970Scy#include "event2/event_struct.h" 58275970Scy#include "event2/rpc.h" 59275970Scy#include "event2/rpc_struct.h" 60275970Scy#include "evrpc-internal.h" 61275970Scy#include "event2/http.h" 62275970Scy#include "event2/buffer.h" 63275970Scy#include "event2/tag.h" 64275970Scy#include "event2/http_struct.h" 65275970Scy#include "event2/http_compat.h" 66275970Scy#include "event2/util.h" 67275970Scy#include "util-internal.h" 68275970Scy#include "log-internal.h" 69275970Scy#include "mm-internal.h" 70275970Scy 71275970Scystruct evrpc_base * 72275970Scyevrpc_init(struct evhttp *http_server) 73275970Scy{ 74275970Scy struct evrpc_base* base = mm_calloc(1, sizeof(struct evrpc_base)); 75275970Scy if (base == NULL) 76275970Scy return (NULL); 77275970Scy 78275970Scy /* we rely on the tagging sub system */ 79275970Scy evtag_init(); 80275970Scy 81275970Scy TAILQ_INIT(&base->registered_rpcs); 82275970Scy TAILQ_INIT(&base->input_hooks); 83275970Scy TAILQ_INIT(&base->output_hooks); 84275970Scy 85275970Scy TAILQ_INIT(&base->paused_requests); 86275970Scy 87275970Scy base->http_server = http_server; 88275970Scy 89275970Scy return (base); 90275970Scy} 91275970Scy 92275970Scyvoid 93275970Scyevrpc_free(struct evrpc_base *base) 94275970Scy{ 95275970Scy struct evrpc *rpc; 96275970Scy struct evrpc_hook *hook; 97275970Scy struct evrpc_hook_ctx *pause; 98275970Scy int r; 99275970Scy 100275970Scy while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) { 101275970Scy r = evrpc_unregister_rpc(base, rpc->uri); 102275970Scy EVUTIL_ASSERT(r == 0); 103275970Scy } 104275970Scy while ((pause = TAILQ_FIRST(&base->paused_requests)) != NULL) { 105275970Scy TAILQ_REMOVE(&base->paused_requests, pause, next); 106275970Scy mm_free(pause); 107275970Scy } 108275970Scy while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) { 109275970Scy r = evrpc_remove_hook(base, EVRPC_INPUT, hook); 110275970Scy EVUTIL_ASSERT(r); 111275970Scy } 112275970Scy while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) { 113275970Scy r = evrpc_remove_hook(base, EVRPC_OUTPUT, hook); 114275970Scy EVUTIL_ASSERT(r); 115275970Scy } 116275970Scy mm_free(base); 117275970Scy} 118275970Scy 119275970Scyvoid * 120275970Scyevrpc_add_hook(void *vbase, 121275970Scy enum EVRPC_HOOK_TYPE hook_type, 122275970Scy int (*cb)(void *, struct evhttp_request *, struct evbuffer *, void *), 123275970Scy void *cb_arg) 124275970Scy{ 125275970Scy struct evrpc_hooks_ *base = vbase; 126275970Scy struct evrpc_hook_list *head = NULL; 127275970Scy struct evrpc_hook *hook = NULL; 128275970Scy switch (hook_type) { 129275970Scy case EVRPC_INPUT: 130275970Scy head = &base->in_hooks; 131275970Scy break; 132275970Scy case EVRPC_OUTPUT: 133275970Scy head = &base->out_hooks; 134275970Scy break; 135275970Scy default: 136275970Scy EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT); 137275970Scy } 138275970Scy 139275970Scy hook = mm_calloc(1, sizeof(struct evrpc_hook)); 140275970Scy EVUTIL_ASSERT(hook != NULL); 141275970Scy 142275970Scy hook->process = cb; 143275970Scy hook->process_arg = cb_arg; 144275970Scy TAILQ_INSERT_TAIL(head, hook, next); 145275970Scy 146275970Scy return (hook); 147275970Scy} 148275970Scy 149275970Scystatic int 150275970Scyevrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle) 151275970Scy{ 152275970Scy struct evrpc_hook *hook = NULL; 153275970Scy TAILQ_FOREACH(hook, head, next) { 154275970Scy if (hook == handle) { 155275970Scy TAILQ_REMOVE(head, hook, next); 156275970Scy mm_free(hook); 157275970Scy return (1); 158275970Scy } 159275970Scy } 160275970Scy 161275970Scy return (0); 162275970Scy} 163275970Scy 164275970Scy/* 165275970Scy * remove the hook specified by the handle 166275970Scy */ 167275970Scy 168275970Scyint 169275970Scyevrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle) 170275970Scy{ 171275970Scy struct evrpc_hooks_ *base = vbase; 172275970Scy struct evrpc_hook_list *head = NULL; 173275970Scy switch (hook_type) { 174275970Scy case EVRPC_INPUT: 175275970Scy head = &base->in_hooks; 176275970Scy break; 177275970Scy case EVRPC_OUTPUT: 178275970Scy head = &base->out_hooks; 179275970Scy break; 180275970Scy default: 181275970Scy EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT); 182275970Scy } 183275970Scy 184275970Scy return (evrpc_remove_hook_internal(head, handle)); 185275970Scy} 186275970Scy 187275970Scystatic int 188275970Scyevrpc_process_hooks(struct evrpc_hook_list *head, void *ctx, 189275970Scy struct evhttp_request *req, struct evbuffer *evbuf) 190275970Scy{ 191275970Scy struct evrpc_hook *hook; 192275970Scy TAILQ_FOREACH(hook, head, next) { 193275970Scy int res = hook->process(ctx, req, evbuf, hook->process_arg); 194275970Scy if (res != EVRPC_CONTINUE) 195275970Scy return (res); 196275970Scy } 197275970Scy 198275970Scy return (EVRPC_CONTINUE); 199275970Scy} 200275970Scy 201275970Scystatic void evrpc_pool_schedule(struct evrpc_pool *pool); 202275970Scystatic void evrpc_request_cb(struct evhttp_request *, void *); 203275970Scy 204275970Scy/* 205275970Scy * Registers a new RPC with the HTTP server. The evrpc object is expected 206275970Scy * to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn 207275970Scy * calls this function. 208275970Scy */ 209275970Scy 210275970Scystatic char * 211275970Scyevrpc_construct_uri(const char *uri) 212275970Scy{ 213275970Scy char *constructed_uri; 214275970Scy size_t constructed_uri_len; 215275970Scy 216275970Scy constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1; 217275970Scy if ((constructed_uri = mm_malloc(constructed_uri_len)) == NULL) 218275970Scy event_err(1, "%s: failed to register rpc at %s", 219275970Scy __func__, uri); 220275970Scy memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX)); 221275970Scy memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri)); 222275970Scy constructed_uri[constructed_uri_len - 1] = '\0'; 223275970Scy 224275970Scy return (constructed_uri); 225275970Scy} 226275970Scy 227275970Scyint 228275970Scyevrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc, 229275970Scy void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg) 230275970Scy{ 231275970Scy char *constructed_uri = evrpc_construct_uri(rpc->uri); 232275970Scy 233275970Scy rpc->base = base; 234275970Scy rpc->cb = cb; 235275970Scy rpc->cb_arg = cb_arg; 236275970Scy 237275970Scy TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next); 238275970Scy 239275970Scy evhttp_set_cb(base->http_server, 240275970Scy constructed_uri, 241275970Scy evrpc_request_cb, 242275970Scy rpc); 243275970Scy 244275970Scy mm_free(constructed_uri); 245275970Scy 246275970Scy return (0); 247275970Scy} 248275970Scy 249275970Scyint 250275970Scyevrpc_unregister_rpc(struct evrpc_base *base, const char *name) 251275970Scy{ 252275970Scy char *registered_uri = NULL; 253275970Scy struct evrpc *rpc; 254275970Scy int r; 255275970Scy 256275970Scy /* find the right rpc; linear search might be slow */ 257275970Scy TAILQ_FOREACH(rpc, &base->registered_rpcs, next) { 258275970Scy if (strcmp(rpc->uri, name) == 0) 259275970Scy break; 260275970Scy } 261275970Scy if (rpc == NULL) { 262275970Scy /* We did not find an RPC with this name */ 263275970Scy return (-1); 264275970Scy } 265275970Scy TAILQ_REMOVE(&base->registered_rpcs, rpc, next); 266275970Scy 267275970Scy registered_uri = evrpc_construct_uri(name); 268275970Scy 269275970Scy /* remove the http server callback */ 270275970Scy r = evhttp_del_cb(base->http_server, registered_uri); 271275970Scy EVUTIL_ASSERT(r == 0); 272275970Scy 273275970Scy mm_free(registered_uri); 274275970Scy 275275970Scy mm_free((char *)rpc->uri); 276275970Scy mm_free(rpc); 277275970Scy return (0); 278275970Scy} 279275970Scy 280275970Scystatic int evrpc_pause_request(void *vbase, void *ctx, 281275970Scy void (*cb)(void *, enum EVRPC_HOOK_RESULT)); 282275970Scystatic void evrpc_request_cb_closure(void *, enum EVRPC_HOOK_RESULT); 283275970Scy 284275970Scystatic void 285275970Scyevrpc_request_cb(struct evhttp_request *req, void *arg) 286275970Scy{ 287275970Scy struct evrpc *rpc = arg; 288275970Scy struct evrpc_req_generic *rpc_state = NULL; 289275970Scy 290275970Scy /* let's verify the outside parameters */ 291275970Scy if (req->type != EVHTTP_REQ_POST || 292275970Scy evbuffer_get_length(req->input_buffer) <= 0) 293275970Scy goto error; 294275970Scy 295275970Scy rpc_state = mm_calloc(1, sizeof(struct evrpc_req_generic)); 296275970Scy if (rpc_state == NULL) 297275970Scy goto error; 298275970Scy rpc_state->rpc = rpc; 299275970Scy rpc_state->http_req = req; 300275970Scy rpc_state->rpc_data = NULL; 301275970Scy 302275970Scy if (TAILQ_FIRST(&rpc->base->input_hooks) != NULL) { 303275970Scy int hook_res; 304275970Scy 305275970Scy evrpc_hook_associate_meta_(&rpc_state->hook_meta, req->evcon); 306275970Scy 307275970Scy /* 308275970Scy * allow hooks to modify the outgoing request 309275970Scy */ 310275970Scy hook_res = evrpc_process_hooks(&rpc->base->input_hooks, 311275970Scy rpc_state, req, req->input_buffer); 312275970Scy switch (hook_res) { 313275970Scy case EVRPC_TERMINATE: 314275970Scy goto error; 315275970Scy case EVRPC_PAUSE: 316275970Scy evrpc_pause_request(rpc->base, rpc_state, 317275970Scy evrpc_request_cb_closure); 318275970Scy return; 319275970Scy case EVRPC_CONTINUE: 320275970Scy break; 321275970Scy default: 322275970Scy EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 323275970Scy hook_res == EVRPC_CONTINUE || 324275970Scy hook_res == EVRPC_PAUSE); 325275970Scy } 326275970Scy } 327275970Scy 328275970Scy evrpc_request_cb_closure(rpc_state, EVRPC_CONTINUE); 329275970Scy return; 330275970Scy 331275970Scyerror: 332275970Scy evrpc_reqstate_free_(rpc_state); 333275970Scy evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 334275970Scy return; 335275970Scy} 336275970Scy 337275970Scystatic void 338275970Scyevrpc_request_cb_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 339275970Scy{ 340275970Scy struct evrpc_req_generic *rpc_state = arg; 341275970Scy struct evrpc *rpc; 342275970Scy struct evhttp_request *req; 343275970Scy 344275970Scy EVUTIL_ASSERT(rpc_state); 345275970Scy rpc = rpc_state->rpc; 346275970Scy req = rpc_state->http_req; 347275970Scy 348275970Scy if (hook_res == EVRPC_TERMINATE) 349275970Scy goto error; 350275970Scy 351275970Scy /* let's check that we can parse the request */ 352275970Scy rpc_state->request = rpc->request_new(rpc->request_new_arg); 353275970Scy if (rpc_state->request == NULL) 354275970Scy goto error; 355275970Scy 356275970Scy if (rpc->request_unmarshal( 357275970Scy rpc_state->request, req->input_buffer) == -1) { 358275970Scy /* we failed to parse the request; that's a bummer */ 359275970Scy goto error; 360275970Scy } 361275970Scy 362275970Scy /* at this point, we have a well formed request, prepare the reply */ 363275970Scy 364275970Scy rpc_state->reply = rpc->reply_new(rpc->reply_new_arg); 365275970Scy if (rpc_state->reply == NULL) 366275970Scy goto error; 367275970Scy 368275970Scy /* give the rpc to the user; they can deal with it */ 369275970Scy rpc->cb(rpc_state, rpc->cb_arg); 370275970Scy 371275970Scy return; 372275970Scy 373275970Scyerror: 374275970Scy evrpc_reqstate_free_(rpc_state); 375275970Scy evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 376275970Scy return; 377275970Scy} 378275970Scy 379275970Scy 380275970Scyvoid 381275970Scyevrpc_reqstate_free_(struct evrpc_req_generic* rpc_state) 382275970Scy{ 383275970Scy struct evrpc *rpc; 384275970Scy EVUTIL_ASSERT(rpc_state != NULL); 385275970Scy rpc = rpc_state->rpc; 386275970Scy 387275970Scy /* clean up all memory */ 388275970Scy if (rpc_state->hook_meta != NULL) 389275970Scy evrpc_hook_context_free_(rpc_state->hook_meta); 390275970Scy if (rpc_state->request != NULL) 391275970Scy rpc->request_free(rpc_state->request); 392275970Scy if (rpc_state->reply != NULL) 393275970Scy rpc->reply_free(rpc_state->reply); 394275970Scy if (rpc_state->rpc_data != NULL) 395275970Scy evbuffer_free(rpc_state->rpc_data); 396275970Scy mm_free(rpc_state); 397275970Scy} 398275970Scy 399275970Scystatic void 400275970Scyevrpc_request_done_closure(void *, enum EVRPC_HOOK_RESULT); 401275970Scy 402275970Scyvoid 403275970Scyevrpc_request_done(struct evrpc_req_generic *rpc_state) 404275970Scy{ 405275970Scy struct evhttp_request *req; 406275970Scy struct evrpc *rpc; 407275970Scy 408275970Scy EVUTIL_ASSERT(rpc_state); 409275970Scy 410275970Scy req = rpc_state->http_req; 411275970Scy rpc = rpc_state->rpc; 412275970Scy 413275970Scy if (rpc->reply_complete(rpc_state->reply) == -1) { 414275970Scy /* the reply was not completely filled in. error out */ 415275970Scy goto error; 416275970Scy } 417275970Scy 418275970Scy if ((rpc_state->rpc_data = evbuffer_new()) == NULL) { 419275970Scy /* out of memory */ 420275970Scy goto error; 421275970Scy } 422275970Scy 423275970Scy /* serialize the reply */ 424275970Scy rpc->reply_marshal(rpc_state->rpc_data, rpc_state->reply); 425275970Scy 426275970Scy if (TAILQ_FIRST(&rpc->base->output_hooks) != NULL) { 427275970Scy int hook_res; 428275970Scy 429275970Scy evrpc_hook_associate_meta_(&rpc_state->hook_meta, req->evcon); 430275970Scy 431275970Scy /* do hook based tweaks to the request */ 432275970Scy hook_res = evrpc_process_hooks(&rpc->base->output_hooks, 433275970Scy rpc_state, req, rpc_state->rpc_data); 434275970Scy switch (hook_res) { 435275970Scy case EVRPC_TERMINATE: 436275970Scy goto error; 437275970Scy case EVRPC_PAUSE: 438275970Scy if (evrpc_pause_request(rpc->base, rpc_state, 439275970Scy evrpc_request_done_closure) == -1) 440275970Scy goto error; 441275970Scy return; 442275970Scy case EVRPC_CONTINUE: 443275970Scy break; 444275970Scy default: 445275970Scy EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 446275970Scy hook_res == EVRPC_CONTINUE || 447275970Scy hook_res == EVRPC_PAUSE); 448275970Scy } 449275970Scy } 450275970Scy 451275970Scy evrpc_request_done_closure(rpc_state, EVRPC_CONTINUE); 452275970Scy return; 453275970Scy 454275970Scyerror: 455275970Scy evrpc_reqstate_free_(rpc_state); 456275970Scy evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 457275970Scy return; 458275970Scy} 459275970Scy 460275970Scyvoid * 461275970Scyevrpc_get_request(struct evrpc_req_generic *req) 462275970Scy{ 463275970Scy return req->request; 464275970Scy} 465275970Scy 466275970Scyvoid * 467275970Scyevrpc_get_reply(struct evrpc_req_generic *req) 468275970Scy{ 469275970Scy return req->reply; 470275970Scy} 471275970Scy 472275970Scystatic void 473275970Scyevrpc_request_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 474275970Scy{ 475275970Scy struct evrpc_req_generic *rpc_state = arg; 476275970Scy struct evhttp_request *req; 477275970Scy EVUTIL_ASSERT(rpc_state); 478275970Scy req = rpc_state->http_req; 479275970Scy 480275970Scy if (hook_res == EVRPC_TERMINATE) 481275970Scy goto error; 482275970Scy 483275970Scy /* on success, we are going to transmit marshaled binary data */ 484275970Scy if (evhttp_find_header(req->output_headers, "Content-Type") == NULL) { 485275970Scy evhttp_add_header(req->output_headers, 486275970Scy "Content-Type", "application/octet-stream"); 487275970Scy } 488275970Scy evhttp_send_reply(req, HTTP_OK, "OK", rpc_state->rpc_data); 489275970Scy 490275970Scy evrpc_reqstate_free_(rpc_state); 491275970Scy 492275970Scy return; 493275970Scy 494275970Scyerror: 495275970Scy evrpc_reqstate_free_(rpc_state); 496275970Scy evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 497275970Scy return; 498275970Scy} 499275970Scy 500275970Scy 501275970Scy/* Client implementation of RPC site */ 502275970Scy 503275970Scystatic int evrpc_schedule_request(struct evhttp_connection *connection, 504275970Scy struct evrpc_request_wrapper *ctx); 505275970Scy 506275970Scystruct evrpc_pool * 507275970Scyevrpc_pool_new(struct event_base *base) 508275970Scy{ 509275970Scy struct evrpc_pool *pool = mm_calloc(1, sizeof(struct evrpc_pool)); 510275970Scy if (pool == NULL) 511275970Scy return (NULL); 512275970Scy 513275970Scy TAILQ_INIT(&pool->connections); 514275970Scy TAILQ_INIT(&pool->requests); 515275970Scy 516275970Scy TAILQ_INIT(&pool->paused_requests); 517275970Scy 518275970Scy TAILQ_INIT(&pool->input_hooks); 519275970Scy TAILQ_INIT(&pool->output_hooks); 520275970Scy 521275970Scy pool->base = base; 522275970Scy pool->timeout = -1; 523275970Scy 524275970Scy return (pool); 525275970Scy} 526275970Scy 527275970Scystatic void 528275970Scyevrpc_request_wrapper_free(struct evrpc_request_wrapper *request) 529275970Scy{ 530275970Scy if (request->hook_meta != NULL) 531275970Scy evrpc_hook_context_free_(request->hook_meta); 532275970Scy mm_free(request->name); 533275970Scy mm_free(request); 534275970Scy} 535275970Scy 536275970Scyvoid 537275970Scyevrpc_pool_free(struct evrpc_pool *pool) 538275970Scy{ 539275970Scy struct evhttp_connection *connection; 540275970Scy struct evrpc_request_wrapper *request; 541275970Scy struct evrpc_hook_ctx *pause; 542275970Scy struct evrpc_hook *hook; 543275970Scy int r; 544275970Scy 545275970Scy while ((request = TAILQ_FIRST(&pool->requests)) != NULL) { 546275970Scy TAILQ_REMOVE(&pool->requests, request, next); 547275970Scy evrpc_request_wrapper_free(request); 548275970Scy } 549275970Scy 550275970Scy while ((pause = TAILQ_FIRST(&pool->paused_requests)) != NULL) { 551275970Scy TAILQ_REMOVE(&pool->paused_requests, pause, next); 552275970Scy mm_free(pause); 553275970Scy } 554275970Scy 555275970Scy while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) { 556275970Scy TAILQ_REMOVE(&pool->connections, connection, next); 557275970Scy evhttp_connection_free(connection); 558275970Scy } 559275970Scy 560275970Scy while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) { 561275970Scy r = evrpc_remove_hook(pool, EVRPC_INPUT, hook); 562275970Scy EVUTIL_ASSERT(r); 563275970Scy } 564275970Scy 565275970Scy while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) { 566275970Scy r = evrpc_remove_hook(pool, EVRPC_OUTPUT, hook); 567275970Scy EVUTIL_ASSERT(r); 568275970Scy } 569275970Scy 570275970Scy mm_free(pool); 571275970Scy} 572275970Scy 573275970Scy/* 574275970Scy * Add a connection to the RPC pool. A request scheduled on the pool 575275970Scy * may use any available connection. 576275970Scy */ 577275970Scy 578275970Scyvoid 579275970Scyevrpc_pool_add_connection(struct evrpc_pool *pool, 580275970Scy struct evhttp_connection *connection) 581275970Scy{ 582275970Scy EVUTIL_ASSERT(connection->http_server == NULL); 583275970Scy TAILQ_INSERT_TAIL(&pool->connections, connection, next); 584275970Scy 585275970Scy /* 586275970Scy * associate an event base with this connection 587275970Scy */ 588275970Scy if (pool->base != NULL) 589275970Scy evhttp_connection_set_base(connection, pool->base); 590275970Scy 591275970Scy /* 592275970Scy * unless a timeout was specifically set for a connection, 593275970Scy * the connection inherits the timeout from the pool. 594275970Scy */ 595275970Scy if (!evutil_timerisset(&connection->timeout)) 596275970Scy evhttp_connection_set_timeout(connection, pool->timeout); 597275970Scy 598275970Scy /* 599275970Scy * if we have any requests pending, schedule them with the new 600275970Scy * connections. 601275970Scy */ 602275970Scy 603275970Scy if (TAILQ_FIRST(&pool->requests) != NULL) { 604275970Scy struct evrpc_request_wrapper *request = 605275970Scy TAILQ_FIRST(&pool->requests); 606275970Scy TAILQ_REMOVE(&pool->requests, request, next); 607275970Scy evrpc_schedule_request(connection, request); 608275970Scy } 609275970Scy} 610275970Scy 611275970Scyvoid 612275970Scyevrpc_pool_remove_connection(struct evrpc_pool *pool, 613275970Scy struct evhttp_connection *connection) 614275970Scy{ 615275970Scy TAILQ_REMOVE(&pool->connections, connection, next); 616275970Scy} 617275970Scy 618275970Scyvoid 619275970Scyevrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs) 620275970Scy{ 621275970Scy struct evhttp_connection *evcon; 622275970Scy TAILQ_FOREACH(evcon, &pool->connections, next) { 623275970Scy evhttp_connection_set_timeout(evcon, timeout_in_secs); 624275970Scy } 625275970Scy pool->timeout = timeout_in_secs; 626275970Scy} 627275970Scy 628275970Scy 629275970Scystatic void evrpc_reply_done(struct evhttp_request *, void *); 630275970Scystatic void evrpc_request_timeout(evutil_socket_t, short, void *); 631275970Scy 632275970Scy/* 633275970Scy * Finds a connection object associated with the pool that is currently 634275970Scy * idle and can be used to make a request. 635275970Scy */ 636275970Scystatic struct evhttp_connection * 637275970Scyevrpc_pool_find_connection(struct evrpc_pool *pool) 638275970Scy{ 639275970Scy struct evhttp_connection *connection; 640275970Scy TAILQ_FOREACH(connection, &pool->connections, next) { 641275970Scy if (TAILQ_FIRST(&connection->requests) == NULL) 642275970Scy return (connection); 643275970Scy } 644275970Scy 645275970Scy return (NULL); 646275970Scy} 647275970Scy 648275970Scy/* 649275970Scy * Prototypes responsible for evrpc scheduling and hooking 650275970Scy */ 651275970Scy 652275970Scystatic void evrpc_schedule_request_closure(void *ctx, enum EVRPC_HOOK_RESULT); 653275970Scy 654275970Scy/* 655275970Scy * We assume that the ctx is no longer queued on the pool. 656275970Scy */ 657275970Scystatic int 658275970Scyevrpc_schedule_request(struct evhttp_connection *connection, 659275970Scy struct evrpc_request_wrapper *ctx) 660275970Scy{ 661275970Scy struct evhttp_request *req = NULL; 662275970Scy struct evrpc_pool *pool = ctx->pool; 663275970Scy struct evrpc_status status; 664275970Scy 665275970Scy if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL) 666275970Scy goto error; 667275970Scy 668275970Scy /* serialize the request data into the output buffer */ 669275970Scy ctx->request_marshal(req->output_buffer, ctx->request); 670275970Scy 671275970Scy /* we need to know the connection that we might have to abort */ 672275970Scy ctx->evcon = connection; 673275970Scy 674275970Scy /* if we get paused we also need to know the request */ 675275970Scy ctx->req = req; 676275970Scy 677275970Scy if (TAILQ_FIRST(&pool->output_hooks) != NULL) { 678275970Scy int hook_res; 679275970Scy 680275970Scy evrpc_hook_associate_meta_(&ctx->hook_meta, connection); 681275970Scy 682275970Scy /* apply hooks to the outgoing request */ 683275970Scy hook_res = evrpc_process_hooks(&pool->output_hooks, 684275970Scy ctx, req, req->output_buffer); 685275970Scy 686275970Scy switch (hook_res) { 687275970Scy case EVRPC_TERMINATE: 688275970Scy goto error; 689275970Scy case EVRPC_PAUSE: 690275970Scy /* we need to be explicitly resumed */ 691275970Scy if (evrpc_pause_request(pool, ctx, 692275970Scy evrpc_schedule_request_closure) == -1) 693275970Scy goto error; 694275970Scy return (0); 695275970Scy case EVRPC_CONTINUE: 696275970Scy /* we can just continue */ 697275970Scy break; 698275970Scy default: 699275970Scy EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 700275970Scy hook_res == EVRPC_CONTINUE || 701275970Scy hook_res == EVRPC_PAUSE); 702275970Scy } 703275970Scy } 704275970Scy 705275970Scy evrpc_schedule_request_closure(ctx, EVRPC_CONTINUE); 706275970Scy return (0); 707275970Scy 708275970Scyerror: 709275970Scy memset(&status, 0, sizeof(status)); 710275970Scy status.error = EVRPC_STATUS_ERR_UNSTARTED; 711275970Scy (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); 712275970Scy evrpc_request_wrapper_free(ctx); 713275970Scy return (-1); 714275970Scy} 715275970Scy 716275970Scystatic void 717275970Scyevrpc_schedule_request_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 718275970Scy{ 719275970Scy struct evrpc_request_wrapper *ctx = arg; 720275970Scy struct evhttp_connection *connection = ctx->evcon; 721275970Scy struct evhttp_request *req = ctx->req; 722275970Scy struct evrpc_pool *pool = ctx->pool; 723275970Scy struct evrpc_status status; 724275970Scy char *uri = NULL; 725275970Scy int res = 0; 726275970Scy 727275970Scy if (hook_res == EVRPC_TERMINATE) 728275970Scy goto error; 729275970Scy 730275970Scy uri = evrpc_construct_uri(ctx->name); 731275970Scy if (uri == NULL) 732275970Scy goto error; 733275970Scy 734275970Scy if (pool->timeout > 0) { 735275970Scy /* 736275970Scy * a timeout after which the whole rpc is going to be aborted. 737275970Scy */ 738275970Scy struct timeval tv; 739275970Scy evutil_timerclear(&tv); 740275970Scy tv.tv_sec = pool->timeout; 741275970Scy evtimer_add(&ctx->ev_timeout, &tv); 742275970Scy } 743275970Scy 744275970Scy /* start the request over the connection */ 745275970Scy res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri); 746275970Scy mm_free(uri); 747275970Scy 748275970Scy if (res == -1) 749275970Scy goto error; 750275970Scy 751275970Scy return; 752275970Scy 753275970Scyerror: 754275970Scy memset(&status, 0, sizeof(status)); 755275970Scy status.error = EVRPC_STATUS_ERR_UNSTARTED; 756275970Scy (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); 757275970Scy evrpc_request_wrapper_free(ctx); 758275970Scy} 759275970Scy 760275970Scy/* we just queue the paused request on the pool under the req object */ 761275970Scystatic int 762275970Scyevrpc_pause_request(void *vbase, void *ctx, 763275970Scy void (*cb)(void *, enum EVRPC_HOOK_RESULT)) 764275970Scy{ 765275970Scy struct evrpc_hooks_ *base = vbase; 766275970Scy struct evrpc_hook_ctx *pause = mm_malloc(sizeof(*pause)); 767275970Scy if (pause == NULL) 768275970Scy return (-1); 769275970Scy 770275970Scy pause->ctx = ctx; 771275970Scy pause->cb = cb; 772275970Scy 773275970Scy TAILQ_INSERT_TAIL(&base->pause_requests, pause, next); 774275970Scy return (0); 775275970Scy} 776275970Scy 777275970Scyint 778275970Scyevrpc_resume_request(void *vbase, void *ctx, enum EVRPC_HOOK_RESULT res) 779275970Scy{ 780275970Scy struct evrpc_hooks_ *base = vbase; 781275970Scy struct evrpc_pause_list *head = &base->pause_requests; 782275970Scy struct evrpc_hook_ctx *pause; 783275970Scy 784275970Scy TAILQ_FOREACH(pause, head, next) { 785275970Scy if (pause->ctx == ctx) 786275970Scy break; 787275970Scy } 788275970Scy 789275970Scy if (pause == NULL) 790275970Scy return (-1); 791275970Scy 792275970Scy (*pause->cb)(pause->ctx, res); 793275970Scy TAILQ_REMOVE(head, pause, next); 794275970Scy mm_free(pause); 795275970Scy return (0); 796275970Scy} 797275970Scy 798275970Scyint 799275970Scyevrpc_make_request(struct evrpc_request_wrapper *ctx) 800275970Scy{ 801275970Scy struct evrpc_pool *pool = ctx->pool; 802275970Scy 803275970Scy /* initialize the event structure for this rpc */ 804275970Scy evtimer_assign(&ctx->ev_timeout, pool->base, evrpc_request_timeout, ctx); 805275970Scy 806275970Scy /* we better have some available connections on the pool */ 807275970Scy EVUTIL_ASSERT(TAILQ_FIRST(&pool->connections) != NULL); 808275970Scy 809275970Scy /* 810275970Scy * if no connection is available, we queue the request on the pool, 811275970Scy * the next time a connection is empty, the rpc will be send on that. 812275970Scy */ 813275970Scy TAILQ_INSERT_TAIL(&pool->requests, ctx, next); 814275970Scy 815275970Scy evrpc_pool_schedule(pool); 816275970Scy 817275970Scy return (0); 818275970Scy} 819275970Scy 820275970Scy 821275970Scystruct evrpc_request_wrapper * 822275970Scyevrpc_make_request_ctx( 823275970Scy struct evrpc_pool *pool, void *request, void *reply, 824275970Scy const char *rpcname, 825275970Scy void (*req_marshal)(struct evbuffer*, void *), 826275970Scy void (*rpl_clear)(void *), 827275970Scy int (*rpl_unmarshal)(void *, struct evbuffer *), 828275970Scy void (*cb)(struct evrpc_status *, void *, void *, void *), 829275970Scy void *cbarg) 830275970Scy{ 831275970Scy struct evrpc_request_wrapper *ctx = (struct evrpc_request_wrapper *) 832275970Scy mm_malloc(sizeof(struct evrpc_request_wrapper)); 833275970Scy if (ctx == NULL) 834275970Scy return (NULL); 835275970Scy 836275970Scy ctx->pool = pool; 837275970Scy ctx->hook_meta = NULL; 838275970Scy ctx->evcon = NULL; 839275970Scy ctx->name = mm_strdup(rpcname); 840275970Scy if (ctx->name == NULL) { 841275970Scy mm_free(ctx); 842275970Scy return (NULL); 843275970Scy } 844275970Scy ctx->cb = cb; 845275970Scy ctx->cb_arg = cbarg; 846275970Scy ctx->request = request; 847275970Scy ctx->reply = reply; 848275970Scy ctx->request_marshal = req_marshal; 849275970Scy ctx->reply_clear = rpl_clear; 850275970Scy ctx->reply_unmarshal = rpl_unmarshal; 851275970Scy 852275970Scy return (ctx); 853275970Scy} 854275970Scy 855275970Scystatic void 856275970Scyevrpc_reply_done_closure(void *, enum EVRPC_HOOK_RESULT); 857275970Scy 858275970Scystatic void 859275970Scyevrpc_reply_done(struct evhttp_request *req, void *arg) 860275970Scy{ 861275970Scy struct evrpc_request_wrapper *ctx = arg; 862275970Scy struct evrpc_pool *pool = ctx->pool; 863275970Scy int hook_res = EVRPC_CONTINUE; 864275970Scy 865275970Scy /* cancel any timeout we might have scheduled */ 866275970Scy event_del(&ctx->ev_timeout); 867275970Scy 868275970Scy ctx->req = req; 869275970Scy 870275970Scy /* we need to get the reply now */ 871275970Scy if (req == NULL) { 872275970Scy evrpc_reply_done_closure(ctx, EVRPC_CONTINUE); 873275970Scy return; 874275970Scy } 875275970Scy 876275970Scy if (TAILQ_FIRST(&pool->input_hooks) != NULL) { 877275970Scy evrpc_hook_associate_meta_(&ctx->hook_meta, ctx->evcon); 878275970Scy 879275970Scy /* apply hooks to the incoming request */ 880275970Scy hook_res = evrpc_process_hooks(&pool->input_hooks, 881275970Scy ctx, req, req->input_buffer); 882275970Scy 883275970Scy switch (hook_res) { 884275970Scy case EVRPC_TERMINATE: 885275970Scy case EVRPC_CONTINUE: 886275970Scy break; 887275970Scy case EVRPC_PAUSE: 888275970Scy /* 889275970Scy * if we get paused we also need to know the 890275970Scy * request. unfortunately, the underlying 891275970Scy * layer is going to free it. we need to 892275970Scy * request ownership explicitly 893275970Scy */ 894275970Scy if (req != NULL) 895275970Scy evhttp_request_own(req); 896275970Scy 897275970Scy evrpc_pause_request(pool, ctx, 898275970Scy evrpc_reply_done_closure); 899275970Scy return; 900275970Scy default: 901275970Scy EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 902275970Scy hook_res == EVRPC_CONTINUE || 903275970Scy hook_res == EVRPC_PAUSE); 904275970Scy } 905275970Scy } 906275970Scy 907275970Scy evrpc_reply_done_closure(ctx, hook_res); 908275970Scy 909275970Scy /* http request is being freed by underlying layer */ 910275970Scy} 911275970Scy 912275970Scystatic void 913275970Scyevrpc_reply_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 914275970Scy{ 915275970Scy struct evrpc_request_wrapper *ctx = arg; 916275970Scy struct evhttp_request *req = ctx->req; 917275970Scy struct evrpc_pool *pool = ctx->pool; 918275970Scy struct evrpc_status status; 919275970Scy int res = -1; 920275970Scy 921275970Scy memset(&status, 0, sizeof(status)); 922275970Scy status.http_req = req; 923275970Scy 924275970Scy /* we need to get the reply now */ 925275970Scy if (req == NULL) { 926275970Scy status.error = EVRPC_STATUS_ERR_TIMEOUT; 927275970Scy } else if (hook_res == EVRPC_TERMINATE) { 928275970Scy status.error = EVRPC_STATUS_ERR_HOOKABORTED; 929275970Scy } else { 930275970Scy res = ctx->reply_unmarshal(ctx->reply, req->input_buffer); 931275970Scy if (res == -1) 932275970Scy status.error = EVRPC_STATUS_ERR_BADPAYLOAD; 933275970Scy } 934275970Scy 935275970Scy if (res == -1) { 936275970Scy /* clear everything that we might have written previously */ 937275970Scy ctx->reply_clear(ctx->reply); 938275970Scy } 939275970Scy 940275970Scy (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); 941275970Scy 942275970Scy evrpc_request_wrapper_free(ctx); 943275970Scy 944275970Scy /* the http layer owned the original request structure, but if we 945275970Scy * got paused, we asked for ownership and need to free it here. */ 946275970Scy if (req != NULL && evhttp_request_is_owned(req)) 947275970Scy evhttp_request_free(req); 948275970Scy 949275970Scy /* see if we can schedule another request */ 950275970Scy evrpc_pool_schedule(pool); 951275970Scy} 952275970Scy 953275970Scystatic void 954275970Scyevrpc_pool_schedule(struct evrpc_pool *pool) 955275970Scy{ 956275970Scy struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests); 957275970Scy struct evhttp_connection *evcon; 958275970Scy 959275970Scy /* if no requests are pending, we have no work */ 960275970Scy if (ctx == NULL) 961275970Scy return; 962275970Scy 963275970Scy if ((evcon = evrpc_pool_find_connection(pool)) != NULL) { 964275970Scy TAILQ_REMOVE(&pool->requests, ctx, next); 965275970Scy evrpc_schedule_request(evcon, ctx); 966275970Scy } 967275970Scy} 968275970Scy 969275970Scystatic void 970275970Scyevrpc_request_timeout(evutil_socket_t fd, short what, void *arg) 971275970Scy{ 972275970Scy struct evrpc_request_wrapper *ctx = arg; 973275970Scy struct evhttp_connection *evcon = ctx->evcon; 974275970Scy EVUTIL_ASSERT(evcon != NULL); 975275970Scy 976275970Scy evhttp_connection_fail_(evcon, EVREQ_HTTP_TIMEOUT); 977275970Scy} 978275970Scy 979275970Scy/* 980275970Scy * frees potential meta data associated with a request. 981275970Scy */ 982275970Scy 983275970Scystatic void 984275970Scyevrpc_meta_data_free(struct evrpc_meta_list *meta_data) 985275970Scy{ 986275970Scy struct evrpc_meta *entry; 987275970Scy EVUTIL_ASSERT(meta_data != NULL); 988275970Scy 989275970Scy while ((entry = TAILQ_FIRST(meta_data)) != NULL) { 990275970Scy TAILQ_REMOVE(meta_data, entry, next); 991275970Scy mm_free(entry->key); 992275970Scy mm_free(entry->data); 993275970Scy mm_free(entry); 994275970Scy } 995275970Scy} 996275970Scy 997275970Scystatic struct evrpc_hook_meta * 998275970Scyevrpc_hook_meta_new_(void) 999275970Scy{ 1000275970Scy struct evrpc_hook_meta *ctx; 1001275970Scy ctx = mm_malloc(sizeof(struct evrpc_hook_meta)); 1002275970Scy EVUTIL_ASSERT(ctx != NULL); 1003275970Scy 1004275970Scy TAILQ_INIT(&ctx->meta_data); 1005275970Scy ctx->evcon = NULL; 1006275970Scy 1007275970Scy return (ctx); 1008275970Scy} 1009275970Scy 1010275970Scystatic void 1011275970Scyevrpc_hook_associate_meta_(struct evrpc_hook_meta **pctx, 1012275970Scy struct evhttp_connection *evcon) 1013275970Scy{ 1014275970Scy struct evrpc_hook_meta *ctx = *pctx; 1015275970Scy if (ctx == NULL) 1016275970Scy *pctx = ctx = evrpc_hook_meta_new_(); 1017275970Scy ctx->evcon = evcon; 1018275970Scy} 1019275970Scy 1020275970Scystatic void 1021275970Scyevrpc_hook_context_free_(struct evrpc_hook_meta *ctx) 1022275970Scy{ 1023275970Scy evrpc_meta_data_free(&ctx->meta_data); 1024275970Scy mm_free(ctx); 1025275970Scy} 1026275970Scy 1027275970Scy/* Adds meta data */ 1028275970Scyvoid 1029275970Scyevrpc_hook_add_meta(void *ctx, const char *key, 1030275970Scy const void *data, size_t data_size) 1031275970Scy{ 1032275970Scy struct evrpc_request_wrapper *req = ctx; 1033275970Scy struct evrpc_hook_meta *store = NULL; 1034275970Scy struct evrpc_meta *meta = NULL; 1035275970Scy 1036275970Scy if ((store = req->hook_meta) == NULL) 1037275970Scy store = req->hook_meta = evrpc_hook_meta_new_(); 1038275970Scy 1039275970Scy meta = mm_malloc(sizeof(struct evrpc_meta)); 1040275970Scy EVUTIL_ASSERT(meta != NULL); 1041275970Scy meta->key = mm_strdup(key); 1042275970Scy EVUTIL_ASSERT(meta->key != NULL); 1043275970Scy meta->data_size = data_size; 1044275970Scy meta->data = mm_malloc(data_size); 1045275970Scy EVUTIL_ASSERT(meta->data != NULL); 1046275970Scy memcpy(meta->data, data, data_size); 1047275970Scy 1048275970Scy TAILQ_INSERT_TAIL(&store->meta_data, meta, next); 1049275970Scy} 1050275970Scy 1051275970Scyint 1052275970Scyevrpc_hook_find_meta(void *ctx, const char *key, void **data, size_t *data_size) 1053275970Scy{ 1054275970Scy struct evrpc_request_wrapper *req = ctx; 1055275970Scy struct evrpc_meta *meta = NULL; 1056275970Scy 1057275970Scy if (req->hook_meta == NULL) 1058275970Scy return (-1); 1059275970Scy 1060275970Scy TAILQ_FOREACH(meta, &req->hook_meta->meta_data, next) { 1061275970Scy if (strcmp(meta->key, key) == 0) { 1062275970Scy *data = meta->data; 1063275970Scy *data_size = meta->data_size; 1064275970Scy return (0); 1065275970Scy } 1066275970Scy } 1067275970Scy 1068275970Scy return (-1); 1069275970Scy} 1070275970Scy 1071275970Scystruct evhttp_connection * 1072275970Scyevrpc_hook_get_connection(void *ctx) 1073275970Scy{ 1074275970Scy struct evrpc_request_wrapper *req = ctx; 1075275970Scy return (req->hook_meta != NULL ? req->hook_meta->evcon : NULL); 1076275970Scy} 1077275970Scy 1078275970Scyint 1079275970Scyevrpc_send_request_generic(struct evrpc_pool *pool, 1080275970Scy void *request, void *reply, 1081275970Scy void (*cb)(struct evrpc_status *, void *, void *, void *), 1082275970Scy void *cb_arg, 1083275970Scy const char *rpcname, 1084275970Scy void (*req_marshal)(struct evbuffer *, void *), 1085275970Scy void (*rpl_clear)(void *), 1086275970Scy int (*rpl_unmarshal)(void *, struct evbuffer *)) 1087275970Scy{ 1088275970Scy struct evrpc_status status; 1089275970Scy struct evrpc_request_wrapper *ctx; 1090275970Scy ctx = evrpc_make_request_ctx(pool, request, reply, 1091275970Scy rpcname, req_marshal, rpl_clear, rpl_unmarshal, cb, cb_arg); 1092275970Scy if (ctx == NULL) 1093275970Scy goto error; 1094275970Scy return (evrpc_make_request(ctx)); 1095275970Scyerror: 1096275970Scy memset(&status, 0, sizeof(status)); 1097275970Scy status.error = EVRPC_STATUS_ERR_UNSTARTED; 1098275970Scy (*(cb))(&status, request, reply, cb_arg); 1099275970Scy return (-1); 1100275970Scy} 1101275970Scy 1102275970Scy/** Takes a request object and fills it in with the right magic */ 1103275970Scystatic struct evrpc * 1104275970Scyevrpc_register_object(const char *name, 1105275970Scy void *(*req_new)(void*), void *req_new_arg, void (*req_free)(void *), 1106275970Scy int (*req_unmarshal)(void *, struct evbuffer *), 1107275970Scy void *(*rpl_new)(void*), void *rpl_new_arg, void (*rpl_free)(void *), 1108275970Scy int (*rpl_complete)(void *), 1109275970Scy void (*rpl_marshal)(struct evbuffer *, void *)) 1110275970Scy{ 1111275970Scy struct evrpc* rpc = (struct evrpc *)mm_calloc(1, sizeof(struct evrpc)); 1112275970Scy if (rpc == NULL) 1113275970Scy return (NULL); 1114275970Scy rpc->uri = mm_strdup(name); 1115275970Scy if (rpc->uri == NULL) { 1116275970Scy mm_free(rpc); 1117275970Scy return (NULL); 1118275970Scy } 1119275970Scy rpc->request_new = req_new; 1120275970Scy rpc->request_new_arg = req_new_arg; 1121275970Scy rpc->request_free = req_free; 1122275970Scy rpc->request_unmarshal = req_unmarshal; 1123275970Scy rpc->reply_new = rpl_new; 1124275970Scy rpc->reply_new_arg = rpl_new_arg; 1125275970Scy rpc->reply_free = rpl_free; 1126275970Scy rpc->reply_complete = rpl_complete; 1127275970Scy rpc->reply_marshal = rpl_marshal; 1128275970Scy return (rpc); 1129275970Scy} 1130275970Scy 1131275970Scyint 1132275970Scyevrpc_register_generic(struct evrpc_base *base, const char *name, 1133275970Scy void (*callback)(struct evrpc_req_generic *, void *), void *cbarg, 1134275970Scy void *(*req_new)(void *), void *req_new_arg, void (*req_free)(void *), 1135275970Scy int (*req_unmarshal)(void *, struct evbuffer *), 1136275970Scy void *(*rpl_new)(void *), void *rpl_new_arg, void (*rpl_free)(void *), 1137275970Scy int (*rpl_complete)(void *), 1138275970Scy void (*rpl_marshal)(struct evbuffer *, void *)) 1139275970Scy{ 1140275970Scy struct evrpc* rpc = 1141275970Scy evrpc_register_object(name, req_new, req_new_arg, req_free, req_unmarshal, 1142275970Scy rpl_new, rpl_new_arg, rpl_free, rpl_complete, rpl_marshal); 1143275970Scy if (rpc == NULL) 1144275970Scy return (-1); 1145275970Scy evrpc_register_rpc(base, rpc, 1146275970Scy (void (*)(struct evrpc_req_generic*, void *))callback, cbarg); 1147275970Scy return (0); 1148275970Scy} 1149275970Scy 1150275970Scy/** accessors for obscure and undocumented functionality */ 1151275970Scystruct evrpc_pool * 1152275970Scyevrpc_request_get_pool(struct evrpc_request_wrapper *ctx) 1153275970Scy{ 1154275970Scy return (ctx->pool); 1155275970Scy} 1156275970Scy 1157275970Scyvoid 1158275970Scyevrpc_request_set_pool(struct evrpc_request_wrapper *ctx, 1159275970Scy struct evrpc_pool *pool) 1160275970Scy{ 1161275970Scy ctx->pool = pool; 1162275970Scy} 1163275970Scy 1164275970Scyvoid 1165275970Scyevrpc_request_set_cb(struct evrpc_request_wrapper *ctx, 1166275970Scy void (*cb)(struct evrpc_status*, void *request, void *reply, void *arg), 1167275970Scy void *cb_arg) 1168275970Scy{ 1169275970Scy ctx->cb = cb; 1170275970Scy ctx->cb_arg = cb_arg; 1171275970Scy} 1172