1275970Scy/*
2275970Scy * Copyright (c) 2000-2007 Niels Provos <provos@citi.umich.edu>
3275970Scy * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
4275970Scy *
5275970Scy * Redistribution and use in source and binary forms, with or without
6275970Scy * modification, are permitted provided that the following conditions
7275970Scy * are met:
8275970Scy * 1. Redistributions of source code must retain the above copyright
9275970Scy *    notice, this list of conditions and the following disclaimer.
10275970Scy * 2. Redistributions in binary form must reproduce the above copyright
11275970Scy *    notice, this list of conditions and the following disclaimer in the
12275970Scy *    documentation and/or other materials provided with the distribution.
13275970Scy * 3. The name of the author may not be used to endorse or promote products
14275970Scy *    derived from this software without specific prior written permission.
15275970Scy *
16275970Scy * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
17275970Scy * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
18275970Scy * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
19275970Scy * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
20275970Scy * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
21275970Scy * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
22275970Scy * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
23275970Scy * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24275970Scy * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
25275970Scy * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26275970Scy */
27275970Scy#include "event2/event-config.h"
28275970Scy#include "evconfig-private.h"
29275970Scy
30275970Scy#ifdef _WIN32
31275970Scy#define WIN32_LEAN_AND_MEAN
32275970Scy#include <winsock2.h>
33275970Scy#include <windows.h>
34275970Scy#undef WIN32_LEAN_AND_MEAN
35275970Scy#endif
36275970Scy
37275970Scy#include <sys/types.h>
38275970Scy#ifndef _WIN32
39275970Scy#include <sys/socket.h>
40275970Scy#endif
41275970Scy#ifdef EVENT__HAVE_SYS_TIME_H
42275970Scy#include <sys/time.h>
43275970Scy#endif
44275970Scy#include <sys/queue.h>
45275970Scy#include <stdio.h>
46275970Scy#include <stdlib.h>
47275970Scy#ifndef _WIN32
48275970Scy#include <unistd.h>
49275970Scy#endif
50275970Scy#include <errno.h>
51275970Scy#include <signal.h>
52275970Scy#include <string.h>
53275970Scy
54275970Scy#include <sys/queue.h>
55275970Scy
56275970Scy#include "event2/event.h"
57275970Scy#include "event2/event_struct.h"
58275970Scy#include "event2/rpc.h"
59275970Scy#include "event2/rpc_struct.h"
60275970Scy#include "evrpc-internal.h"
61275970Scy#include "event2/http.h"
62275970Scy#include "event2/buffer.h"
63275970Scy#include "event2/tag.h"
64275970Scy#include "event2/http_struct.h"
65275970Scy#include "event2/http_compat.h"
66275970Scy#include "event2/util.h"
67275970Scy#include "util-internal.h"
68275970Scy#include "log-internal.h"
69275970Scy#include "mm-internal.h"
70275970Scy
71275970Scystruct evrpc_base *
72275970Scyevrpc_init(struct evhttp *http_server)
73275970Scy{
74275970Scy	struct evrpc_base* base = mm_calloc(1, sizeof(struct evrpc_base));
75275970Scy	if (base == NULL)
76275970Scy		return (NULL);
77275970Scy
78275970Scy	/* we rely on the tagging sub system */
79275970Scy	evtag_init();
80275970Scy
81275970Scy	TAILQ_INIT(&base->registered_rpcs);
82275970Scy	TAILQ_INIT(&base->input_hooks);
83275970Scy	TAILQ_INIT(&base->output_hooks);
84275970Scy
85275970Scy	TAILQ_INIT(&base->paused_requests);
86275970Scy
87275970Scy	base->http_server = http_server;
88275970Scy
89275970Scy	return (base);
90275970Scy}
91275970Scy
92275970Scyvoid
93275970Scyevrpc_free(struct evrpc_base *base)
94275970Scy{
95275970Scy	struct evrpc *rpc;
96275970Scy	struct evrpc_hook *hook;
97275970Scy	struct evrpc_hook_ctx *pause;
98275970Scy	int r;
99275970Scy
100275970Scy	while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) {
101275970Scy		r = evrpc_unregister_rpc(base, rpc->uri);
102275970Scy		EVUTIL_ASSERT(r == 0);
103275970Scy	}
104275970Scy	while ((pause = TAILQ_FIRST(&base->paused_requests)) != NULL) {
105275970Scy		TAILQ_REMOVE(&base->paused_requests, pause, next);
106275970Scy		mm_free(pause);
107275970Scy	}
108275970Scy	while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) {
109275970Scy		r = evrpc_remove_hook(base, EVRPC_INPUT, hook);
110275970Scy		EVUTIL_ASSERT(r);
111275970Scy	}
112275970Scy	while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) {
113275970Scy		r = evrpc_remove_hook(base, EVRPC_OUTPUT, hook);
114275970Scy		EVUTIL_ASSERT(r);
115275970Scy	}
116275970Scy	mm_free(base);
117275970Scy}
118275970Scy
119275970Scyvoid *
120275970Scyevrpc_add_hook(void *vbase,
121275970Scy    enum EVRPC_HOOK_TYPE hook_type,
122275970Scy    int (*cb)(void *, struct evhttp_request *, struct evbuffer *, void *),
123275970Scy    void *cb_arg)
124275970Scy{
125275970Scy	struct evrpc_hooks_ *base = vbase;
126275970Scy	struct evrpc_hook_list *head = NULL;
127275970Scy	struct evrpc_hook *hook = NULL;
128275970Scy	switch (hook_type) {
129275970Scy	case EVRPC_INPUT:
130275970Scy		head = &base->in_hooks;
131275970Scy		break;
132275970Scy	case EVRPC_OUTPUT:
133275970Scy		head = &base->out_hooks;
134275970Scy		break;
135275970Scy	default:
136275970Scy		EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
137275970Scy	}
138275970Scy
139275970Scy	hook = mm_calloc(1, sizeof(struct evrpc_hook));
140275970Scy	EVUTIL_ASSERT(hook != NULL);
141275970Scy
142275970Scy	hook->process = cb;
143275970Scy	hook->process_arg = cb_arg;
144275970Scy	TAILQ_INSERT_TAIL(head, hook, next);
145275970Scy
146275970Scy	return (hook);
147275970Scy}
148275970Scy
149275970Scystatic int
150275970Scyevrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle)
151275970Scy{
152275970Scy	struct evrpc_hook *hook = NULL;
153275970Scy	TAILQ_FOREACH(hook, head, next) {
154275970Scy		if (hook == handle) {
155275970Scy			TAILQ_REMOVE(head, hook, next);
156275970Scy			mm_free(hook);
157275970Scy			return (1);
158275970Scy		}
159275970Scy	}
160275970Scy
161275970Scy	return (0);
162275970Scy}
163275970Scy
164275970Scy/*
165275970Scy * remove the hook specified by the handle
166275970Scy */
167275970Scy
168275970Scyint
169275970Scyevrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle)
170275970Scy{
171275970Scy	struct evrpc_hooks_ *base = vbase;
172275970Scy	struct evrpc_hook_list *head = NULL;
173275970Scy	switch (hook_type) {
174275970Scy	case EVRPC_INPUT:
175275970Scy		head = &base->in_hooks;
176275970Scy		break;
177275970Scy	case EVRPC_OUTPUT:
178275970Scy		head = &base->out_hooks;
179275970Scy		break;
180275970Scy	default:
181275970Scy		EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
182275970Scy	}
183275970Scy
184275970Scy	return (evrpc_remove_hook_internal(head, handle));
185275970Scy}
186275970Scy
187275970Scystatic int
188275970Scyevrpc_process_hooks(struct evrpc_hook_list *head, void *ctx,
189275970Scy    struct evhttp_request *req, struct evbuffer *evbuf)
190275970Scy{
191275970Scy	struct evrpc_hook *hook;
192275970Scy	TAILQ_FOREACH(hook, head, next) {
193275970Scy		int res = hook->process(ctx, req, evbuf, hook->process_arg);
194275970Scy		if (res != EVRPC_CONTINUE)
195275970Scy			return (res);
196275970Scy	}
197275970Scy
198275970Scy	return (EVRPC_CONTINUE);
199275970Scy}
200275970Scy
201275970Scystatic void evrpc_pool_schedule(struct evrpc_pool *pool);
202275970Scystatic void evrpc_request_cb(struct evhttp_request *, void *);
203275970Scy
204275970Scy/*
205275970Scy * Registers a new RPC with the HTTP server.   The evrpc object is expected
206275970Scy * to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn
207275970Scy * calls this function.
208275970Scy */
209275970Scy
210275970Scystatic char *
211275970Scyevrpc_construct_uri(const char *uri)
212275970Scy{
213275970Scy	char *constructed_uri;
214275970Scy	size_t constructed_uri_len;
215275970Scy
216275970Scy	constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1;
217275970Scy	if ((constructed_uri = mm_malloc(constructed_uri_len)) == NULL)
218275970Scy		event_err(1, "%s: failed to register rpc at %s",
219275970Scy		    __func__, uri);
220275970Scy	memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX));
221275970Scy	memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri));
222275970Scy	constructed_uri[constructed_uri_len - 1] = '\0';
223275970Scy
224275970Scy	return (constructed_uri);
225275970Scy}
226275970Scy
227275970Scyint
228275970Scyevrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc,
229275970Scy    void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg)
230275970Scy{
231275970Scy	char *constructed_uri = evrpc_construct_uri(rpc->uri);
232275970Scy
233275970Scy	rpc->base = base;
234275970Scy	rpc->cb = cb;
235275970Scy	rpc->cb_arg = cb_arg;
236275970Scy
237275970Scy	TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next);
238275970Scy
239275970Scy	evhttp_set_cb(base->http_server,
240275970Scy	    constructed_uri,
241275970Scy	    evrpc_request_cb,
242275970Scy	    rpc);
243275970Scy
244275970Scy	mm_free(constructed_uri);
245275970Scy
246275970Scy	return (0);
247275970Scy}
248275970Scy
249275970Scyint
250275970Scyevrpc_unregister_rpc(struct evrpc_base *base, const char *name)
251275970Scy{
252275970Scy	char *registered_uri = NULL;
253275970Scy	struct evrpc *rpc;
254275970Scy	int r;
255275970Scy
256275970Scy	/* find the right rpc; linear search might be slow */
257275970Scy	TAILQ_FOREACH(rpc, &base->registered_rpcs, next) {
258275970Scy		if (strcmp(rpc->uri, name) == 0)
259275970Scy			break;
260275970Scy	}
261275970Scy	if (rpc == NULL) {
262275970Scy		/* We did not find an RPC with this name */
263275970Scy		return (-1);
264275970Scy	}
265275970Scy	TAILQ_REMOVE(&base->registered_rpcs, rpc, next);
266275970Scy
267275970Scy	registered_uri = evrpc_construct_uri(name);
268275970Scy
269275970Scy	/* remove the http server callback */
270275970Scy	r = evhttp_del_cb(base->http_server, registered_uri);
271275970Scy	EVUTIL_ASSERT(r == 0);
272275970Scy
273275970Scy	mm_free(registered_uri);
274275970Scy
275275970Scy	mm_free((char *)rpc->uri);
276275970Scy	mm_free(rpc);
277275970Scy	return (0);
278275970Scy}
279275970Scy
280275970Scystatic int evrpc_pause_request(void *vbase, void *ctx,
281275970Scy    void (*cb)(void *, enum EVRPC_HOOK_RESULT));
282275970Scystatic void evrpc_request_cb_closure(void *, enum EVRPC_HOOK_RESULT);
283275970Scy
284275970Scystatic void
285275970Scyevrpc_request_cb(struct evhttp_request *req, void *arg)
286275970Scy{
287275970Scy	struct evrpc *rpc = arg;
288275970Scy	struct evrpc_req_generic *rpc_state = NULL;
289275970Scy
290275970Scy	/* let's verify the outside parameters */
291275970Scy	if (req->type != EVHTTP_REQ_POST ||
292275970Scy	    evbuffer_get_length(req->input_buffer) <= 0)
293275970Scy		goto error;
294275970Scy
295275970Scy	rpc_state = mm_calloc(1, sizeof(struct evrpc_req_generic));
296275970Scy	if (rpc_state == NULL)
297275970Scy		goto error;
298275970Scy	rpc_state->rpc = rpc;
299275970Scy	rpc_state->http_req = req;
300275970Scy	rpc_state->rpc_data = NULL;
301275970Scy
302275970Scy	if (TAILQ_FIRST(&rpc->base->input_hooks) != NULL) {
303275970Scy		int hook_res;
304275970Scy
305275970Scy		evrpc_hook_associate_meta_(&rpc_state->hook_meta, req->evcon);
306275970Scy
307275970Scy		/*
308275970Scy		 * allow hooks to modify the outgoing request
309275970Scy		 */
310275970Scy		hook_res = evrpc_process_hooks(&rpc->base->input_hooks,
311275970Scy		    rpc_state, req, req->input_buffer);
312275970Scy		switch (hook_res) {
313275970Scy		case EVRPC_TERMINATE:
314275970Scy			goto error;
315275970Scy		case EVRPC_PAUSE:
316275970Scy			evrpc_pause_request(rpc->base, rpc_state,
317275970Scy			    evrpc_request_cb_closure);
318275970Scy			return;
319275970Scy		case EVRPC_CONTINUE:
320275970Scy			break;
321275970Scy		default:
322275970Scy			EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
323275970Scy			    hook_res == EVRPC_CONTINUE ||
324275970Scy			    hook_res == EVRPC_PAUSE);
325275970Scy		}
326275970Scy	}
327275970Scy
328275970Scy	evrpc_request_cb_closure(rpc_state, EVRPC_CONTINUE);
329275970Scy	return;
330275970Scy
331275970Scyerror:
332275970Scy	evrpc_reqstate_free_(rpc_state);
333275970Scy	evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
334275970Scy	return;
335275970Scy}
336275970Scy
337275970Scystatic void
338275970Scyevrpc_request_cb_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
339275970Scy{
340275970Scy	struct evrpc_req_generic *rpc_state = arg;
341275970Scy	struct evrpc *rpc;
342275970Scy	struct evhttp_request *req;
343275970Scy
344275970Scy	EVUTIL_ASSERT(rpc_state);
345275970Scy	rpc = rpc_state->rpc;
346275970Scy	req = rpc_state->http_req;
347275970Scy
348275970Scy	if (hook_res == EVRPC_TERMINATE)
349275970Scy		goto error;
350275970Scy
351275970Scy	/* let's check that we can parse the request */
352275970Scy	rpc_state->request = rpc->request_new(rpc->request_new_arg);
353275970Scy	if (rpc_state->request == NULL)
354275970Scy		goto error;
355275970Scy
356275970Scy	if (rpc->request_unmarshal(
357275970Scy		    rpc_state->request, req->input_buffer) == -1) {
358275970Scy		/* we failed to parse the request; that's a bummer */
359275970Scy		goto error;
360275970Scy	}
361275970Scy
362275970Scy	/* at this point, we have a well formed request, prepare the reply */
363275970Scy
364275970Scy	rpc_state->reply = rpc->reply_new(rpc->reply_new_arg);
365275970Scy	if (rpc_state->reply == NULL)
366275970Scy		goto error;
367275970Scy
368275970Scy	/* give the rpc to the user; they can deal with it */
369275970Scy	rpc->cb(rpc_state, rpc->cb_arg);
370275970Scy
371275970Scy	return;
372275970Scy
373275970Scyerror:
374275970Scy	evrpc_reqstate_free_(rpc_state);
375275970Scy	evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
376275970Scy	return;
377275970Scy}
378275970Scy
379275970Scy
380275970Scyvoid
381275970Scyevrpc_reqstate_free_(struct evrpc_req_generic* rpc_state)
382275970Scy{
383275970Scy	struct evrpc *rpc;
384275970Scy	EVUTIL_ASSERT(rpc_state != NULL);
385275970Scy	rpc = rpc_state->rpc;
386275970Scy
387275970Scy	/* clean up all memory */
388275970Scy	if (rpc_state->hook_meta != NULL)
389275970Scy		evrpc_hook_context_free_(rpc_state->hook_meta);
390275970Scy	if (rpc_state->request != NULL)
391275970Scy		rpc->request_free(rpc_state->request);
392275970Scy	if (rpc_state->reply != NULL)
393275970Scy		rpc->reply_free(rpc_state->reply);
394275970Scy	if (rpc_state->rpc_data != NULL)
395275970Scy		evbuffer_free(rpc_state->rpc_data);
396275970Scy	mm_free(rpc_state);
397275970Scy}
398275970Scy
399275970Scystatic void
400275970Scyevrpc_request_done_closure(void *, enum EVRPC_HOOK_RESULT);
401275970Scy
402275970Scyvoid
403275970Scyevrpc_request_done(struct evrpc_req_generic *rpc_state)
404275970Scy{
405275970Scy	struct evhttp_request *req;
406275970Scy	struct evrpc *rpc;
407275970Scy
408275970Scy	EVUTIL_ASSERT(rpc_state);
409275970Scy
410275970Scy	req = rpc_state->http_req;
411275970Scy	rpc = rpc_state->rpc;
412275970Scy
413275970Scy	if (rpc->reply_complete(rpc_state->reply) == -1) {
414275970Scy		/* the reply was not completely filled in.  error out */
415275970Scy		goto error;
416275970Scy	}
417275970Scy
418275970Scy	if ((rpc_state->rpc_data = evbuffer_new()) == NULL) {
419275970Scy		/* out of memory */
420275970Scy		goto error;
421275970Scy	}
422275970Scy
423275970Scy	/* serialize the reply */
424275970Scy	rpc->reply_marshal(rpc_state->rpc_data, rpc_state->reply);
425275970Scy
426275970Scy	if (TAILQ_FIRST(&rpc->base->output_hooks) != NULL) {
427275970Scy		int hook_res;
428275970Scy
429275970Scy		evrpc_hook_associate_meta_(&rpc_state->hook_meta, req->evcon);
430275970Scy
431275970Scy		/* do hook based tweaks to the request */
432275970Scy		hook_res = evrpc_process_hooks(&rpc->base->output_hooks,
433275970Scy		    rpc_state, req, rpc_state->rpc_data);
434275970Scy		switch (hook_res) {
435275970Scy		case EVRPC_TERMINATE:
436275970Scy			goto error;
437275970Scy		case EVRPC_PAUSE:
438275970Scy			if (evrpc_pause_request(rpc->base, rpc_state,
439275970Scy				evrpc_request_done_closure) == -1)
440275970Scy				goto error;
441275970Scy			return;
442275970Scy		case EVRPC_CONTINUE:
443275970Scy			break;
444275970Scy		default:
445275970Scy			EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
446275970Scy			    hook_res == EVRPC_CONTINUE ||
447275970Scy			    hook_res == EVRPC_PAUSE);
448275970Scy		}
449275970Scy	}
450275970Scy
451275970Scy	evrpc_request_done_closure(rpc_state, EVRPC_CONTINUE);
452275970Scy	return;
453275970Scy
454275970Scyerror:
455275970Scy	evrpc_reqstate_free_(rpc_state);
456275970Scy	evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
457275970Scy	return;
458275970Scy}
459275970Scy
460275970Scyvoid *
461275970Scyevrpc_get_request(struct evrpc_req_generic *req)
462275970Scy{
463275970Scy	return req->request;
464275970Scy}
465275970Scy
466275970Scyvoid *
467275970Scyevrpc_get_reply(struct evrpc_req_generic *req)
468275970Scy{
469275970Scy	return req->reply;
470275970Scy}
471275970Scy
472275970Scystatic void
473275970Scyevrpc_request_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
474275970Scy{
475275970Scy	struct evrpc_req_generic *rpc_state = arg;
476275970Scy	struct evhttp_request *req;
477275970Scy	EVUTIL_ASSERT(rpc_state);
478275970Scy	req = rpc_state->http_req;
479275970Scy
480275970Scy	if (hook_res == EVRPC_TERMINATE)
481275970Scy		goto error;
482275970Scy
483275970Scy	/* on success, we are going to transmit marshaled binary data */
484275970Scy	if (evhttp_find_header(req->output_headers, "Content-Type") == NULL) {
485275970Scy		evhttp_add_header(req->output_headers,
486275970Scy		    "Content-Type", "application/octet-stream");
487275970Scy	}
488275970Scy	evhttp_send_reply(req, HTTP_OK, "OK", rpc_state->rpc_data);
489275970Scy
490275970Scy	evrpc_reqstate_free_(rpc_state);
491275970Scy
492275970Scy	return;
493275970Scy
494275970Scyerror:
495275970Scy	evrpc_reqstate_free_(rpc_state);
496275970Scy	evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
497275970Scy	return;
498275970Scy}
499275970Scy
500275970Scy
501275970Scy/* Client implementation of RPC site */
502275970Scy
503275970Scystatic int evrpc_schedule_request(struct evhttp_connection *connection,
504275970Scy    struct evrpc_request_wrapper *ctx);
505275970Scy
506275970Scystruct evrpc_pool *
507275970Scyevrpc_pool_new(struct event_base *base)
508275970Scy{
509275970Scy	struct evrpc_pool *pool = mm_calloc(1, sizeof(struct evrpc_pool));
510275970Scy	if (pool == NULL)
511275970Scy		return (NULL);
512275970Scy
513275970Scy	TAILQ_INIT(&pool->connections);
514275970Scy	TAILQ_INIT(&pool->requests);
515275970Scy
516275970Scy	TAILQ_INIT(&pool->paused_requests);
517275970Scy
518275970Scy	TAILQ_INIT(&pool->input_hooks);
519275970Scy	TAILQ_INIT(&pool->output_hooks);
520275970Scy
521275970Scy	pool->base = base;
522275970Scy	pool->timeout = -1;
523275970Scy
524275970Scy	return (pool);
525275970Scy}
526275970Scy
527275970Scystatic void
528275970Scyevrpc_request_wrapper_free(struct evrpc_request_wrapper *request)
529275970Scy{
530275970Scy	if (request->hook_meta != NULL)
531275970Scy		evrpc_hook_context_free_(request->hook_meta);
532275970Scy	mm_free(request->name);
533275970Scy	mm_free(request);
534275970Scy}
535275970Scy
536275970Scyvoid
537275970Scyevrpc_pool_free(struct evrpc_pool *pool)
538275970Scy{
539275970Scy	struct evhttp_connection *connection;
540275970Scy	struct evrpc_request_wrapper *request;
541275970Scy	struct evrpc_hook_ctx *pause;
542275970Scy	struct evrpc_hook *hook;
543275970Scy	int r;
544275970Scy
545275970Scy	while ((request = TAILQ_FIRST(&pool->requests)) != NULL) {
546275970Scy		TAILQ_REMOVE(&pool->requests, request, next);
547275970Scy		evrpc_request_wrapper_free(request);
548275970Scy	}
549275970Scy
550275970Scy	while ((pause = TAILQ_FIRST(&pool->paused_requests)) != NULL) {
551275970Scy		TAILQ_REMOVE(&pool->paused_requests, pause, next);
552275970Scy		mm_free(pause);
553275970Scy	}
554275970Scy
555275970Scy	while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) {
556275970Scy		TAILQ_REMOVE(&pool->connections, connection, next);
557275970Scy		evhttp_connection_free(connection);
558275970Scy	}
559275970Scy
560275970Scy	while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) {
561275970Scy		r = evrpc_remove_hook(pool, EVRPC_INPUT, hook);
562275970Scy		EVUTIL_ASSERT(r);
563275970Scy	}
564275970Scy
565275970Scy	while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) {
566275970Scy		r = evrpc_remove_hook(pool, EVRPC_OUTPUT, hook);
567275970Scy		EVUTIL_ASSERT(r);
568275970Scy	}
569275970Scy
570275970Scy	mm_free(pool);
571275970Scy}
572275970Scy
573275970Scy/*
574275970Scy * Add a connection to the RPC pool.   A request scheduled on the pool
575275970Scy * may use any available connection.
576275970Scy */
577275970Scy
578275970Scyvoid
579275970Scyevrpc_pool_add_connection(struct evrpc_pool *pool,
580275970Scy    struct evhttp_connection *connection)
581275970Scy{
582275970Scy	EVUTIL_ASSERT(connection->http_server == NULL);
583275970Scy	TAILQ_INSERT_TAIL(&pool->connections, connection, next);
584275970Scy
585275970Scy	/*
586275970Scy	 * associate an event base with this connection
587275970Scy	 */
588275970Scy	if (pool->base != NULL)
589275970Scy		evhttp_connection_set_base(connection, pool->base);
590275970Scy
591275970Scy	/*
592275970Scy	 * unless a timeout was specifically set for a connection,
593275970Scy	 * the connection inherits the timeout from the pool.
594275970Scy	 */
595275970Scy	if (!evutil_timerisset(&connection->timeout))
596275970Scy		evhttp_connection_set_timeout(connection, pool->timeout);
597275970Scy
598275970Scy	/*
599275970Scy	 * if we have any requests pending, schedule them with the new
600275970Scy	 * connections.
601275970Scy	 */
602275970Scy
603275970Scy	if (TAILQ_FIRST(&pool->requests) != NULL) {
604275970Scy		struct evrpc_request_wrapper *request =
605275970Scy		    TAILQ_FIRST(&pool->requests);
606275970Scy		TAILQ_REMOVE(&pool->requests, request, next);
607275970Scy		evrpc_schedule_request(connection, request);
608275970Scy	}
609275970Scy}
610275970Scy
611275970Scyvoid
612275970Scyevrpc_pool_remove_connection(struct evrpc_pool *pool,
613275970Scy    struct evhttp_connection *connection)
614275970Scy{
615275970Scy	TAILQ_REMOVE(&pool->connections, connection, next);
616275970Scy}
617275970Scy
618275970Scyvoid
619275970Scyevrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs)
620275970Scy{
621275970Scy	struct evhttp_connection *evcon;
622275970Scy	TAILQ_FOREACH(evcon, &pool->connections, next) {
623275970Scy		evhttp_connection_set_timeout(evcon, timeout_in_secs);
624275970Scy	}
625275970Scy	pool->timeout = timeout_in_secs;
626275970Scy}
627275970Scy
628275970Scy
629275970Scystatic void evrpc_reply_done(struct evhttp_request *, void *);
630275970Scystatic void evrpc_request_timeout(evutil_socket_t, short, void *);
631275970Scy
632275970Scy/*
633275970Scy * Finds a connection object associated with the pool that is currently
634275970Scy * idle and can be used to make a request.
635275970Scy */
636275970Scystatic struct evhttp_connection *
637275970Scyevrpc_pool_find_connection(struct evrpc_pool *pool)
638275970Scy{
639275970Scy	struct evhttp_connection *connection;
640275970Scy	TAILQ_FOREACH(connection, &pool->connections, next) {
641275970Scy		if (TAILQ_FIRST(&connection->requests) == NULL)
642275970Scy			return (connection);
643275970Scy	}
644275970Scy
645275970Scy	return (NULL);
646275970Scy}
647275970Scy
648275970Scy/*
649275970Scy * Prototypes responsible for evrpc scheduling and hooking
650275970Scy */
651275970Scy
652275970Scystatic void evrpc_schedule_request_closure(void *ctx, enum EVRPC_HOOK_RESULT);
653275970Scy
654275970Scy/*
655275970Scy * We assume that the ctx is no longer queued on the pool.
656275970Scy */
657275970Scystatic int
658275970Scyevrpc_schedule_request(struct evhttp_connection *connection,
659275970Scy    struct evrpc_request_wrapper *ctx)
660275970Scy{
661275970Scy	struct evhttp_request *req = NULL;
662275970Scy	struct evrpc_pool *pool = ctx->pool;
663275970Scy	struct evrpc_status status;
664275970Scy
665275970Scy	if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL)
666275970Scy		goto error;
667275970Scy
668275970Scy	/* serialize the request data into the output buffer */
669275970Scy	ctx->request_marshal(req->output_buffer, ctx->request);
670275970Scy
671275970Scy	/* we need to know the connection that we might have to abort */
672275970Scy	ctx->evcon = connection;
673275970Scy
674275970Scy	/* if we get paused we also need to know the request */
675275970Scy	ctx->req = req;
676275970Scy
677275970Scy	if (TAILQ_FIRST(&pool->output_hooks) != NULL) {
678275970Scy		int hook_res;
679275970Scy
680275970Scy		evrpc_hook_associate_meta_(&ctx->hook_meta, connection);
681275970Scy
682275970Scy		/* apply hooks to the outgoing request */
683275970Scy		hook_res = evrpc_process_hooks(&pool->output_hooks,
684275970Scy		    ctx, req, req->output_buffer);
685275970Scy
686275970Scy		switch (hook_res) {
687275970Scy		case EVRPC_TERMINATE:
688275970Scy			goto error;
689275970Scy		case EVRPC_PAUSE:
690275970Scy			/* we need to be explicitly resumed */
691275970Scy			if (evrpc_pause_request(pool, ctx,
692275970Scy				evrpc_schedule_request_closure) == -1)
693275970Scy				goto error;
694275970Scy			return (0);
695275970Scy		case EVRPC_CONTINUE:
696275970Scy			/* we can just continue */
697275970Scy			break;
698275970Scy		default:
699275970Scy			EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
700275970Scy			    hook_res == EVRPC_CONTINUE ||
701275970Scy			    hook_res == EVRPC_PAUSE);
702275970Scy		}
703275970Scy	}
704275970Scy
705275970Scy	evrpc_schedule_request_closure(ctx, EVRPC_CONTINUE);
706275970Scy	return (0);
707275970Scy
708275970Scyerror:
709275970Scy	memset(&status, 0, sizeof(status));
710275970Scy	status.error = EVRPC_STATUS_ERR_UNSTARTED;
711275970Scy	(*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
712275970Scy	evrpc_request_wrapper_free(ctx);
713275970Scy	return (-1);
714275970Scy}
715275970Scy
716275970Scystatic void
717275970Scyevrpc_schedule_request_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
718275970Scy{
719275970Scy	struct evrpc_request_wrapper *ctx = arg;
720275970Scy	struct evhttp_connection *connection = ctx->evcon;
721275970Scy	struct evhttp_request *req = ctx->req;
722275970Scy	struct evrpc_pool *pool = ctx->pool;
723275970Scy	struct evrpc_status status;
724275970Scy	char *uri = NULL;
725275970Scy	int res = 0;
726275970Scy
727275970Scy	if (hook_res == EVRPC_TERMINATE)
728275970Scy		goto error;
729275970Scy
730275970Scy	uri = evrpc_construct_uri(ctx->name);
731275970Scy	if (uri == NULL)
732275970Scy		goto error;
733275970Scy
734275970Scy	if (pool->timeout > 0) {
735275970Scy		/*
736275970Scy		 * a timeout after which the whole rpc is going to be aborted.
737275970Scy		 */
738275970Scy		struct timeval tv;
739275970Scy		evutil_timerclear(&tv);
740275970Scy		tv.tv_sec = pool->timeout;
741275970Scy		evtimer_add(&ctx->ev_timeout, &tv);
742275970Scy	}
743275970Scy
744275970Scy	/* start the request over the connection */
745275970Scy	res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri);
746275970Scy	mm_free(uri);
747275970Scy
748275970Scy	if (res == -1)
749275970Scy		goto error;
750275970Scy
751275970Scy	return;
752275970Scy
753275970Scyerror:
754275970Scy	memset(&status, 0, sizeof(status));
755275970Scy	status.error = EVRPC_STATUS_ERR_UNSTARTED;
756275970Scy	(*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
757275970Scy	evrpc_request_wrapper_free(ctx);
758275970Scy}
759275970Scy
760275970Scy/* we just queue the paused request on the pool under the req object */
761275970Scystatic int
762275970Scyevrpc_pause_request(void *vbase, void *ctx,
763275970Scy    void (*cb)(void *, enum EVRPC_HOOK_RESULT))
764275970Scy{
765275970Scy	struct evrpc_hooks_ *base = vbase;
766275970Scy	struct evrpc_hook_ctx *pause = mm_malloc(sizeof(*pause));
767275970Scy	if (pause == NULL)
768275970Scy		return (-1);
769275970Scy
770275970Scy	pause->ctx = ctx;
771275970Scy	pause->cb = cb;
772275970Scy
773275970Scy	TAILQ_INSERT_TAIL(&base->pause_requests, pause, next);
774275970Scy	return (0);
775275970Scy}
776275970Scy
777275970Scyint
778275970Scyevrpc_resume_request(void *vbase, void *ctx, enum EVRPC_HOOK_RESULT res)
779275970Scy{
780275970Scy	struct evrpc_hooks_ *base = vbase;
781275970Scy	struct evrpc_pause_list *head = &base->pause_requests;
782275970Scy	struct evrpc_hook_ctx *pause;
783275970Scy
784275970Scy	TAILQ_FOREACH(pause, head, next) {
785275970Scy		if (pause->ctx == ctx)
786275970Scy			break;
787275970Scy	}
788275970Scy
789275970Scy	if (pause == NULL)
790275970Scy		return (-1);
791275970Scy
792275970Scy	(*pause->cb)(pause->ctx, res);
793275970Scy	TAILQ_REMOVE(head, pause, next);
794275970Scy	mm_free(pause);
795275970Scy	return (0);
796275970Scy}
797275970Scy
798275970Scyint
799275970Scyevrpc_make_request(struct evrpc_request_wrapper *ctx)
800275970Scy{
801275970Scy	struct evrpc_pool *pool = ctx->pool;
802275970Scy
803275970Scy	/* initialize the event structure for this rpc */
804275970Scy	evtimer_assign(&ctx->ev_timeout, pool->base, evrpc_request_timeout, ctx);
805275970Scy
806275970Scy	/* we better have some available connections on the pool */
807275970Scy	EVUTIL_ASSERT(TAILQ_FIRST(&pool->connections) != NULL);
808275970Scy
809275970Scy	/*
810275970Scy	 * if no connection is available, we queue the request on the pool,
811275970Scy	 * the next time a connection is empty, the rpc will be send on that.
812275970Scy	 */
813275970Scy	TAILQ_INSERT_TAIL(&pool->requests, ctx, next);
814275970Scy
815275970Scy	evrpc_pool_schedule(pool);
816275970Scy
817275970Scy	return (0);
818275970Scy}
819275970Scy
820275970Scy
821275970Scystruct evrpc_request_wrapper *
822275970Scyevrpc_make_request_ctx(
823275970Scy	struct evrpc_pool *pool, void *request, void *reply,
824275970Scy	const char *rpcname,
825275970Scy	void (*req_marshal)(struct evbuffer*, void *),
826275970Scy	void (*rpl_clear)(void *),
827275970Scy	int (*rpl_unmarshal)(void *, struct evbuffer *),
828275970Scy	void (*cb)(struct evrpc_status *, void *, void *, void *),
829275970Scy	void *cbarg)
830275970Scy{
831275970Scy	struct evrpc_request_wrapper *ctx = (struct evrpc_request_wrapper *)
832275970Scy	    mm_malloc(sizeof(struct evrpc_request_wrapper));
833275970Scy	if (ctx == NULL)
834275970Scy		return (NULL);
835275970Scy
836275970Scy	ctx->pool = pool;
837275970Scy	ctx->hook_meta = NULL;
838275970Scy	ctx->evcon = NULL;
839275970Scy	ctx->name = mm_strdup(rpcname);
840275970Scy	if (ctx->name == NULL) {
841275970Scy		mm_free(ctx);
842275970Scy		return (NULL);
843275970Scy	}
844275970Scy	ctx->cb = cb;
845275970Scy	ctx->cb_arg = cbarg;
846275970Scy	ctx->request = request;
847275970Scy	ctx->reply = reply;
848275970Scy	ctx->request_marshal = req_marshal;
849275970Scy	ctx->reply_clear = rpl_clear;
850275970Scy	ctx->reply_unmarshal = rpl_unmarshal;
851275970Scy
852275970Scy	return (ctx);
853275970Scy}
854275970Scy
855275970Scystatic void
856275970Scyevrpc_reply_done_closure(void *, enum EVRPC_HOOK_RESULT);
857275970Scy
858275970Scystatic void
859275970Scyevrpc_reply_done(struct evhttp_request *req, void *arg)
860275970Scy{
861275970Scy	struct evrpc_request_wrapper *ctx = arg;
862275970Scy	struct evrpc_pool *pool = ctx->pool;
863275970Scy	int hook_res = EVRPC_CONTINUE;
864275970Scy
865275970Scy	/* cancel any timeout we might have scheduled */
866275970Scy	event_del(&ctx->ev_timeout);
867275970Scy
868275970Scy	ctx->req = req;
869275970Scy
870275970Scy	/* we need to get the reply now */
871275970Scy	if (req == NULL) {
872275970Scy		evrpc_reply_done_closure(ctx, EVRPC_CONTINUE);
873275970Scy		return;
874275970Scy	}
875275970Scy
876275970Scy	if (TAILQ_FIRST(&pool->input_hooks) != NULL) {
877275970Scy		evrpc_hook_associate_meta_(&ctx->hook_meta, ctx->evcon);
878275970Scy
879275970Scy		/* apply hooks to the incoming request */
880275970Scy		hook_res = evrpc_process_hooks(&pool->input_hooks,
881275970Scy		    ctx, req, req->input_buffer);
882275970Scy
883275970Scy		switch (hook_res) {
884275970Scy		case EVRPC_TERMINATE:
885275970Scy		case EVRPC_CONTINUE:
886275970Scy			break;
887275970Scy		case EVRPC_PAUSE:
888275970Scy			/*
889275970Scy			 * if we get paused we also need to know the
890275970Scy			 * request.  unfortunately, the underlying
891275970Scy			 * layer is going to free it.  we need to
892275970Scy			 * request ownership explicitly
893275970Scy			 */
894275970Scy			if (req != NULL)
895275970Scy				evhttp_request_own(req);
896275970Scy
897275970Scy			evrpc_pause_request(pool, ctx,
898275970Scy			    evrpc_reply_done_closure);
899275970Scy			return;
900275970Scy		default:
901275970Scy			EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
902275970Scy			    hook_res == EVRPC_CONTINUE ||
903275970Scy			    hook_res == EVRPC_PAUSE);
904275970Scy		}
905275970Scy	}
906275970Scy
907275970Scy	evrpc_reply_done_closure(ctx, hook_res);
908275970Scy
909275970Scy	/* http request is being freed by underlying layer */
910275970Scy}
911275970Scy
912275970Scystatic void
913275970Scyevrpc_reply_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
914275970Scy{
915275970Scy	struct evrpc_request_wrapper *ctx = arg;
916275970Scy	struct evhttp_request *req = ctx->req;
917275970Scy	struct evrpc_pool *pool = ctx->pool;
918275970Scy	struct evrpc_status status;
919275970Scy	int res = -1;
920275970Scy
921275970Scy	memset(&status, 0, sizeof(status));
922275970Scy	status.http_req = req;
923275970Scy
924275970Scy	/* we need to get the reply now */
925275970Scy	if (req == NULL) {
926275970Scy		status.error = EVRPC_STATUS_ERR_TIMEOUT;
927275970Scy	} else if (hook_res == EVRPC_TERMINATE) {
928275970Scy		status.error = EVRPC_STATUS_ERR_HOOKABORTED;
929275970Scy	} else {
930275970Scy		res = ctx->reply_unmarshal(ctx->reply, req->input_buffer);
931275970Scy		if (res == -1)
932275970Scy			status.error = EVRPC_STATUS_ERR_BADPAYLOAD;
933275970Scy	}
934275970Scy
935275970Scy	if (res == -1) {
936275970Scy		/* clear everything that we might have written previously */
937275970Scy		ctx->reply_clear(ctx->reply);
938275970Scy	}
939275970Scy
940275970Scy	(*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
941275970Scy
942275970Scy	evrpc_request_wrapper_free(ctx);
943275970Scy
944275970Scy	/* the http layer owned the original request structure, but if we
945275970Scy	 * got paused, we asked for ownership and need to free it here. */
946275970Scy	if (req != NULL && evhttp_request_is_owned(req))
947275970Scy		evhttp_request_free(req);
948275970Scy
949275970Scy	/* see if we can schedule another request */
950275970Scy	evrpc_pool_schedule(pool);
951275970Scy}
952275970Scy
953275970Scystatic void
954275970Scyevrpc_pool_schedule(struct evrpc_pool *pool)
955275970Scy{
956275970Scy	struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests);
957275970Scy	struct evhttp_connection *evcon;
958275970Scy
959275970Scy	/* if no requests are pending, we have no work */
960275970Scy	if (ctx == NULL)
961275970Scy		return;
962275970Scy
963275970Scy	if ((evcon = evrpc_pool_find_connection(pool)) != NULL) {
964275970Scy		TAILQ_REMOVE(&pool->requests, ctx, next);
965275970Scy		evrpc_schedule_request(evcon, ctx);
966275970Scy	}
967275970Scy}
968275970Scy
969275970Scystatic void
970275970Scyevrpc_request_timeout(evutil_socket_t fd, short what, void *arg)
971275970Scy{
972275970Scy	struct evrpc_request_wrapper *ctx = arg;
973275970Scy	struct evhttp_connection *evcon = ctx->evcon;
974275970Scy	EVUTIL_ASSERT(evcon != NULL);
975275970Scy
976275970Scy	evhttp_connection_fail_(evcon, EVREQ_HTTP_TIMEOUT);
977275970Scy}
978275970Scy
979275970Scy/*
980275970Scy * frees potential meta data associated with a request.
981275970Scy */
982275970Scy
983275970Scystatic void
984275970Scyevrpc_meta_data_free(struct evrpc_meta_list *meta_data)
985275970Scy{
986275970Scy	struct evrpc_meta *entry;
987275970Scy	EVUTIL_ASSERT(meta_data != NULL);
988275970Scy
989275970Scy	while ((entry = TAILQ_FIRST(meta_data)) != NULL) {
990275970Scy		TAILQ_REMOVE(meta_data, entry, next);
991275970Scy		mm_free(entry->key);
992275970Scy		mm_free(entry->data);
993275970Scy		mm_free(entry);
994275970Scy	}
995275970Scy}
996275970Scy
997275970Scystatic struct evrpc_hook_meta *
998275970Scyevrpc_hook_meta_new_(void)
999275970Scy{
1000275970Scy	struct evrpc_hook_meta *ctx;
1001275970Scy	ctx = mm_malloc(sizeof(struct evrpc_hook_meta));
1002275970Scy	EVUTIL_ASSERT(ctx != NULL);
1003275970Scy
1004275970Scy	TAILQ_INIT(&ctx->meta_data);
1005275970Scy	ctx->evcon = NULL;
1006275970Scy
1007275970Scy	return (ctx);
1008275970Scy}
1009275970Scy
1010275970Scystatic void
1011275970Scyevrpc_hook_associate_meta_(struct evrpc_hook_meta **pctx,
1012275970Scy    struct evhttp_connection *evcon)
1013275970Scy{
1014275970Scy	struct evrpc_hook_meta *ctx = *pctx;
1015275970Scy	if (ctx == NULL)
1016275970Scy		*pctx = ctx = evrpc_hook_meta_new_();
1017275970Scy	ctx->evcon = evcon;
1018275970Scy}
1019275970Scy
1020275970Scystatic void
1021275970Scyevrpc_hook_context_free_(struct evrpc_hook_meta *ctx)
1022275970Scy{
1023275970Scy	evrpc_meta_data_free(&ctx->meta_data);
1024275970Scy	mm_free(ctx);
1025275970Scy}
1026275970Scy
1027275970Scy/* Adds meta data */
1028275970Scyvoid
1029275970Scyevrpc_hook_add_meta(void *ctx, const char *key,
1030275970Scy    const void *data, size_t data_size)
1031275970Scy{
1032275970Scy	struct evrpc_request_wrapper *req = ctx;
1033275970Scy	struct evrpc_hook_meta *store = NULL;
1034275970Scy	struct evrpc_meta *meta = NULL;
1035275970Scy
1036275970Scy	if ((store = req->hook_meta) == NULL)
1037275970Scy		store = req->hook_meta = evrpc_hook_meta_new_();
1038275970Scy
1039275970Scy	meta = mm_malloc(sizeof(struct evrpc_meta));
1040275970Scy	EVUTIL_ASSERT(meta != NULL);
1041275970Scy	meta->key = mm_strdup(key);
1042275970Scy	EVUTIL_ASSERT(meta->key != NULL);
1043275970Scy	meta->data_size = data_size;
1044275970Scy	meta->data = mm_malloc(data_size);
1045275970Scy	EVUTIL_ASSERT(meta->data != NULL);
1046275970Scy	memcpy(meta->data, data, data_size);
1047275970Scy
1048275970Scy	TAILQ_INSERT_TAIL(&store->meta_data, meta, next);
1049275970Scy}
1050275970Scy
1051275970Scyint
1052275970Scyevrpc_hook_find_meta(void *ctx, const char *key, void **data, size_t *data_size)
1053275970Scy{
1054275970Scy	struct evrpc_request_wrapper *req = ctx;
1055275970Scy	struct evrpc_meta *meta = NULL;
1056275970Scy
1057275970Scy	if (req->hook_meta == NULL)
1058275970Scy		return (-1);
1059275970Scy
1060275970Scy	TAILQ_FOREACH(meta, &req->hook_meta->meta_data, next) {
1061275970Scy		if (strcmp(meta->key, key) == 0) {
1062275970Scy			*data = meta->data;
1063275970Scy			*data_size = meta->data_size;
1064275970Scy			return (0);
1065275970Scy		}
1066275970Scy	}
1067275970Scy
1068275970Scy	return (-1);
1069275970Scy}
1070275970Scy
1071275970Scystruct evhttp_connection *
1072275970Scyevrpc_hook_get_connection(void *ctx)
1073275970Scy{
1074275970Scy	struct evrpc_request_wrapper *req = ctx;
1075275970Scy	return (req->hook_meta != NULL ? req->hook_meta->evcon : NULL);
1076275970Scy}
1077275970Scy
1078275970Scyint
1079275970Scyevrpc_send_request_generic(struct evrpc_pool *pool,
1080275970Scy    void *request, void *reply,
1081275970Scy    void (*cb)(struct evrpc_status *, void *, void *, void *),
1082275970Scy    void *cb_arg,
1083275970Scy    const char *rpcname,
1084275970Scy    void (*req_marshal)(struct evbuffer *, void *),
1085275970Scy    void (*rpl_clear)(void *),
1086275970Scy    int (*rpl_unmarshal)(void *, struct evbuffer *))
1087275970Scy{
1088275970Scy	struct evrpc_status status;
1089275970Scy	struct evrpc_request_wrapper *ctx;
1090275970Scy	ctx = evrpc_make_request_ctx(pool, request, reply,
1091275970Scy	    rpcname, req_marshal, rpl_clear, rpl_unmarshal, cb, cb_arg);
1092275970Scy	if (ctx == NULL)
1093275970Scy		goto error;
1094275970Scy	return (evrpc_make_request(ctx));
1095275970Scyerror:
1096275970Scy	memset(&status, 0, sizeof(status));
1097275970Scy	status.error = EVRPC_STATUS_ERR_UNSTARTED;
1098275970Scy	(*(cb))(&status, request, reply, cb_arg);
1099275970Scy	return (-1);
1100275970Scy}
1101275970Scy
1102275970Scy/** Takes a request object and fills it in with the right magic */
1103275970Scystatic struct evrpc *
1104275970Scyevrpc_register_object(const char *name,
1105275970Scy    void *(*req_new)(void*), void *req_new_arg, void (*req_free)(void *),
1106275970Scy    int (*req_unmarshal)(void *, struct evbuffer *),
1107275970Scy    void *(*rpl_new)(void*), void *rpl_new_arg, void (*rpl_free)(void *),
1108275970Scy    int (*rpl_complete)(void *),
1109275970Scy    void (*rpl_marshal)(struct evbuffer *, void *))
1110275970Scy{
1111275970Scy	struct evrpc* rpc = (struct evrpc *)mm_calloc(1, sizeof(struct evrpc));
1112275970Scy	if (rpc == NULL)
1113275970Scy		return (NULL);
1114275970Scy	rpc->uri = mm_strdup(name);
1115275970Scy	if (rpc->uri == NULL) {
1116275970Scy		mm_free(rpc);
1117275970Scy		return (NULL);
1118275970Scy	}
1119275970Scy	rpc->request_new = req_new;
1120275970Scy	rpc->request_new_arg = req_new_arg;
1121275970Scy	rpc->request_free = req_free;
1122275970Scy	rpc->request_unmarshal = req_unmarshal;
1123275970Scy	rpc->reply_new = rpl_new;
1124275970Scy	rpc->reply_new_arg = rpl_new_arg;
1125275970Scy	rpc->reply_free = rpl_free;
1126275970Scy	rpc->reply_complete = rpl_complete;
1127275970Scy	rpc->reply_marshal = rpl_marshal;
1128275970Scy	return (rpc);
1129275970Scy}
1130275970Scy
1131275970Scyint
1132275970Scyevrpc_register_generic(struct evrpc_base *base, const char *name,
1133275970Scy    void (*callback)(struct evrpc_req_generic *, void *), void *cbarg,
1134275970Scy    void *(*req_new)(void *), void *req_new_arg, void (*req_free)(void *),
1135275970Scy    int (*req_unmarshal)(void *, struct evbuffer *),
1136275970Scy    void *(*rpl_new)(void *), void *rpl_new_arg, void (*rpl_free)(void *),
1137275970Scy    int (*rpl_complete)(void *),
1138275970Scy    void (*rpl_marshal)(struct evbuffer *, void *))
1139275970Scy{
1140275970Scy	struct evrpc* rpc =
1141275970Scy	    evrpc_register_object(name, req_new, req_new_arg, req_free, req_unmarshal,
1142275970Scy		rpl_new, rpl_new_arg, rpl_free, rpl_complete, rpl_marshal);
1143275970Scy	if (rpc == NULL)
1144275970Scy		return (-1);
1145275970Scy	evrpc_register_rpc(base, rpc,
1146275970Scy	    (void (*)(struct evrpc_req_generic*, void *))callback, cbarg);
1147275970Scy	return (0);
1148275970Scy}
1149275970Scy
1150275970Scy/** accessors for obscure and undocumented functionality */
1151275970Scystruct evrpc_pool *
1152275970Scyevrpc_request_get_pool(struct evrpc_request_wrapper *ctx)
1153275970Scy{
1154275970Scy	return (ctx->pool);
1155275970Scy}
1156275970Scy
1157275970Scyvoid
1158275970Scyevrpc_request_set_pool(struct evrpc_request_wrapper *ctx,
1159275970Scy    struct evrpc_pool *pool)
1160275970Scy{
1161275970Scy	ctx->pool = pool;
1162275970Scy}
1163275970Scy
1164275970Scyvoid
1165275970Scyevrpc_request_set_cb(struct evrpc_request_wrapper *ctx,
1166275970Scy    void (*cb)(struct evrpc_status*, void *request, void *reply, void *arg),
1167275970Scy    void *cb_arg)
1168275970Scy{
1169275970Scy	ctx->cb = cb;
1170275970Scy	ctx->cb_arg = cb_arg;
1171275970Scy}
1172