1/*-
2 * Copyright (c) 2005 Michael Bushkov <bushman@rsu.ru>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 *    notice, this list of conditions and the following disclaimer in the
12 *    documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 *
26 */
27
28#include <sys/types.h>
29#include <sys/event.h>
30#include <sys/socket.h>
31#include <sys/time.h>
32
33#include <assert.h>
34#include <errno.h>
35#include <stdio.h>
36#include <stdlib.h>
37#include <string.h>
38
39#include "cachelib.h"
40#include "config.h"
41#include "debug.h"
42#include "log.h"
43#include "query.h"
44#include "mp_ws_query.h"
45#include "singletons.h"
46
47static int on_mp_write_session_abandon_notification(struct query_state *);
48static int on_mp_write_session_close_notification(struct query_state *);
49static void on_mp_write_session_destroy(struct query_state *);
50static int on_mp_write_session_mapper(struct query_state *);
51/* int on_mp_write_session_request_read1(struct query_state *); */
52static int on_mp_write_session_request_read2(struct query_state *);
53static int on_mp_write_session_request_process(struct query_state *);
54static int on_mp_write_session_response_write1(struct query_state *);
55static int on_mp_write_session_write_request_read1(struct query_state *);
56static int on_mp_write_session_write_request_read2(struct query_state *);
57static int on_mp_write_session_write_request_process(struct query_state *);
58static int on_mp_write_session_write_response_write1(struct query_state *);
59
60/*
61 * This function is used as the query_state's destroy_func to make the
62 * proper cleanup in case of errors.
63 */
64static void
65on_mp_write_session_destroy(struct query_state *qstate)
66{
67
68	TRACE_IN(on_mp_write_session_destroy);
69	finalize_comm_element(&qstate->request);
70	finalize_comm_element(&qstate->response);
71
72	if (qstate->mdata != NULL) {
73		configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
74		abandon_cache_mp_write_session(
75	    		(cache_mp_write_session)qstate->mdata);
76		configuration_unlock_entry(qstate->config_entry,
77			CELT_MULTIPART);
78	}
79	TRACE_OUT(on_mp_write_session_destroy);
80}
81
82/*
83 * The functions below are used to process multipart write session initiation
84 * requests.
85 * - on_mp_write_session_request_read1 and on_mp_write_session_request_read2
86 *   read the request itself
87 * - on_mp_write_session_request_process processes it
88 * - on_mp_write_session_response_write1 sends the response
89 */
90int
91on_mp_write_session_request_read1(struct query_state *qstate)
92{
93	struct cache_mp_write_session_request	*c_mp_ws_request;
94	ssize_t	result;
95
96	TRACE_IN(on_mp_write_session_request_read1);
97	if (qstate->kevent_watermark == 0)
98		qstate->kevent_watermark = sizeof(size_t);
99	else {
100		init_comm_element(&qstate->request,
101	    		CET_MP_WRITE_SESSION_REQUEST);
102		c_mp_ws_request = get_cache_mp_write_session_request(
103	    		&qstate->request);
104
105		result = qstate->read_func(qstate,
106	    		&c_mp_ws_request->entry_length, sizeof(size_t));
107
108		if (result != sizeof(size_t)) {
109			LOG_ERR_3("on_mp_write_session_request_read1",
110				"read failed");
111			TRACE_OUT(on_mp_write_session_request_read1);
112			return (-1);
113		}
114
115		if (BUFSIZE_INVALID(c_mp_ws_request->entry_length)) {
116			LOG_ERR_3("on_mp_write_session_request_read1",
117				"invalid entry_length value");
118			TRACE_OUT(on_mp_write_session_request_read1);
119			return (-1);
120		}
121
122		c_mp_ws_request->entry = calloc(1,
123			c_mp_ws_request->entry_length + 1);
124		assert(c_mp_ws_request->entry != NULL);
125
126		qstate->kevent_watermark = c_mp_ws_request->entry_length;
127		qstate->process_func = on_mp_write_session_request_read2;
128	}
129	TRACE_OUT(on_mp_write_session_request_read1);
130	return (0);
131}
132
133static int
134on_mp_write_session_request_read2(struct query_state *qstate)
135{
136	struct cache_mp_write_session_request	*c_mp_ws_request;
137	ssize_t	result;
138
139	TRACE_IN(on_mp_write_session_request_read2);
140	c_mp_ws_request = get_cache_mp_write_session_request(&qstate->request);
141
142	result = qstate->read_func(qstate, c_mp_ws_request->entry,
143		c_mp_ws_request->entry_length);
144
145	if (result < 0 || (size_t)result != qstate->kevent_watermark) {
146		LOG_ERR_3("on_mp_write_session_request_read2",
147			"read failed");
148		TRACE_OUT(on_mp_write_session_request_read2);
149		return (-1);
150	}
151
152	qstate->kevent_watermark = 0;
153	qstate->process_func = on_mp_write_session_request_process;
154
155	TRACE_OUT(on_mp_write_session_request_read2);
156	return (0);
157}
158
159static int
160on_mp_write_session_request_process(struct query_state *qstate)
161{
162	struct cache_mp_write_session_request	*c_mp_ws_request;
163	struct cache_mp_write_session_response	*c_mp_ws_response;
164	cache_mp_write_session	ws;
165	cache_entry	c_entry;
166	char	*dec_cache_entry_name;
167
168	TRACE_IN(on_mp_write_session_request_process);
169	init_comm_element(&qstate->response, CET_MP_WRITE_SESSION_RESPONSE);
170	c_mp_ws_response = get_cache_mp_write_session_response(
171		&qstate->response);
172	c_mp_ws_request = get_cache_mp_write_session_request(&qstate->request);
173
174	qstate->config_entry = configuration_find_entry(
175		s_configuration, c_mp_ws_request->entry);
176	if (qstate->config_entry == NULL) {
177		c_mp_ws_response->error_code = ENOENT;
178
179		LOG_ERR_2("write_session_request",
180			"can't find configuration entry '%s'. "
181	    		"aborting request", c_mp_ws_request->entry);
182	    	goto fin;
183	}
184
185	if (qstate->config_entry->enabled == 0) {
186		c_mp_ws_response->error_code = EACCES;
187
188		LOG_ERR_2("write_session_request",
189			"configuration entry '%s' is disabled",
190			c_mp_ws_request->entry);
191		goto fin;
192	}
193
194	if (qstate->config_entry->perform_actual_lookups != 0) {
195		c_mp_ws_response->error_code = EOPNOTSUPP;
196
197		LOG_ERR_2("write_session_request",
198			"entry '%s' performs lookups by itself: "
199			"can't write to it", c_mp_ws_request->entry);
200		goto fin;
201	} else {
202#ifdef NS_NSCD_EID_CHECKING
203		if (check_query_eids(qstate) != 0) {
204			c_mp_ws_response->error_code = EPERM;
205			goto fin;
206		}
207#endif
208	}
209
210	/*
211	 * All multipart entries are separated by their name decorations.
212	 * For one configuration entry there will be a lot of multipart
213	 * cache entries - each with its own decorated name.
214	 */
215	asprintf(&dec_cache_entry_name, "%s%s", qstate->eid_str,
216		qstate->config_entry->mp_cache_params.cep.entry_name);
217	assert(dec_cache_entry_name != NULL);
218
219	configuration_lock_rdlock(s_configuration);
220	c_entry = find_cache_entry(s_cache,
221		dec_cache_entry_name);
222	configuration_unlock(s_configuration);
223
224	if (c_entry == INVALID_CACHE_ENTRY)
225		c_entry = register_new_mp_cache_entry(qstate,
226			dec_cache_entry_name);
227
228	free(dec_cache_entry_name);
229
230	assert(c_entry != NULL);
231	configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
232	ws = open_cache_mp_write_session(c_entry);
233	if (ws == INVALID_CACHE_MP_WRITE_SESSION)
234		c_mp_ws_response->error_code = -1;
235	else {
236		qstate->mdata = ws;
237		qstate->destroy_func = on_mp_write_session_destroy;
238
239		if ((qstate->config_entry->mp_query_timeout.tv_sec != 0) ||
240		    (qstate->config_entry->mp_query_timeout.tv_usec != 0))
241			memcpy(&qstate->timeout,
242				&qstate->config_entry->mp_query_timeout,
243				sizeof(struct timeval));
244	}
245	configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
246
247fin:
248	qstate->process_func = on_mp_write_session_response_write1;
249	qstate->kevent_watermark = sizeof(int);
250	qstate->kevent_filter = EVFILT_WRITE;
251
252	TRACE_OUT(on_mp_write_session_request_process);
253	return (0);
254}
255
256static int
257on_mp_write_session_response_write1(struct query_state *qstate)
258{
259	struct cache_mp_write_session_response	*c_mp_ws_response;
260	ssize_t	result;
261
262	TRACE_IN(on_mp_write_session_response_write1);
263	c_mp_ws_response = get_cache_mp_write_session_response(
264		&qstate->response);
265	result = qstate->write_func(qstate, &c_mp_ws_response->error_code,
266		sizeof(int));
267	if (result != sizeof(int)) {
268		LOG_ERR_3("on_mp_write_session_response_write1",
269			"write failed");
270		TRACE_OUT(on_mp_write_session_response_write1);
271		return (-1);
272	}
273
274	if (c_mp_ws_response->error_code == 0) {
275		qstate->kevent_watermark = sizeof(int);
276		qstate->process_func = on_mp_write_session_mapper;
277		qstate->kevent_filter = EVFILT_READ;
278	} else {
279		qstate->kevent_watermark = 0;
280		qstate->process_func = NULL;
281	}
282	TRACE_OUT(on_mp_write_session_response_write1);
283	return (0);
284}
285
286/*
287 * Mapper function is used to avoid multiple connections for each session
288 * write or read requests. After processing the request, it does not close
289 * the connection, but waits for the next request.
290 */
291static int
292on_mp_write_session_mapper(struct query_state *qstate)
293{
294	ssize_t	result;
295	int		elem_type;
296
297	TRACE_IN(on_mp_write_session_mapper);
298	if (qstate->kevent_watermark == 0) {
299		qstate->kevent_watermark = sizeof(int);
300	} else {
301		result = qstate->read_func(qstate, &elem_type, sizeof(int));
302		if (result != sizeof(int)) {
303			LOG_ERR_3("on_mp_write_session_mapper",
304				"read failed");
305			TRACE_OUT(on_mp_write_session_mapper);
306			return (-1);
307		}
308
309		switch (elem_type) {
310		case CET_MP_WRITE_SESSION_WRITE_REQUEST:
311			qstate->kevent_watermark = sizeof(size_t);
312			qstate->process_func =
313				on_mp_write_session_write_request_read1;
314			break;
315		case CET_MP_WRITE_SESSION_ABANDON_NOTIFICATION:
316			qstate->kevent_watermark = 0;
317			qstate->process_func =
318				on_mp_write_session_abandon_notification;
319			break;
320		case CET_MP_WRITE_SESSION_CLOSE_NOTIFICATION:
321			qstate->kevent_watermark = 0;
322			qstate->process_func =
323				on_mp_write_session_close_notification;
324			break;
325		default:
326			qstate->kevent_watermark = 0;
327			qstate->process_func = NULL;
328			LOG_ERR_2("on_mp_write_session_mapper",
329				"unknown element type");
330			TRACE_OUT(on_mp_write_session_mapper);
331			return (-1);
332		}
333	}
334	TRACE_OUT(on_mp_write_session_mapper);
335	return (0);
336}
337
338/*
339 * The functions below are used to process multipart write sessions write
340 * requests.
341 * - on_mp_write_session_write_request_read1 and
342 *   on_mp_write_session_write_request_read2 read the request itself
343 * - on_mp_write_session_write_request_process processes it
344 * - on_mp_write_session_write_response_write1 sends the response
345 */
346static int
347on_mp_write_session_write_request_read1(struct query_state *qstate)
348{
349	struct cache_mp_write_session_write_request	*write_request;
350	ssize_t	result;
351
352	TRACE_IN(on_mp_write_session_write_request_read1);
353	init_comm_element(&qstate->request,
354		CET_MP_WRITE_SESSION_WRITE_REQUEST);
355	write_request = get_cache_mp_write_session_write_request(
356		&qstate->request);
357
358	result = qstate->read_func(qstate, &write_request->data_size,
359		sizeof(size_t));
360
361	if (result != sizeof(size_t)) {
362		LOG_ERR_3("on_mp_write_session_write_request_read1",
363			"read failed");
364		TRACE_OUT(on_mp_write_session_write_request_read1);
365		return (-1);
366	}
367
368	if (BUFSIZE_INVALID(write_request->data_size)) {
369		LOG_ERR_3("on_mp_write_session_write_request_read1",
370			"invalid data_size value");
371		TRACE_OUT(on_mp_write_session_write_request_read1);
372		return (-1);
373	}
374
375	write_request->data = calloc(1, write_request->data_size);
376	assert(write_request->data != NULL);
377
378	qstate->kevent_watermark = write_request->data_size;
379	qstate->process_func = on_mp_write_session_write_request_read2;
380	TRACE_OUT(on_mp_write_session_write_request_read1);
381	return (0);
382}
383
384static int
385on_mp_write_session_write_request_read2(struct query_state *qstate)
386{
387	struct cache_mp_write_session_write_request	*write_request;
388	ssize_t	result;
389
390	TRACE_IN(on_mp_write_session_write_request_read2);
391	write_request = get_cache_mp_write_session_write_request(
392		&qstate->request);
393
394	result = qstate->read_func(qstate, write_request->data,
395		write_request->data_size);
396
397	if (result < 0 || (size_t)result != qstate->kevent_watermark) {
398		LOG_ERR_3("on_mp_write_session_write_request_read2",
399			"read failed");
400		TRACE_OUT(on_mp_write_session_write_request_read2);
401		return (-1);
402	}
403
404	qstate->kevent_watermark = 0;
405	qstate->process_func = on_mp_write_session_write_request_process;
406	TRACE_OUT(on_mp_write_session_write_request_read2);
407	return (0);
408}
409
410static int
411on_mp_write_session_write_request_process(struct query_state *qstate)
412{
413	struct cache_mp_write_session_write_request	*write_request;
414	struct cache_mp_write_session_write_response	*write_response;
415
416	TRACE_IN(on_mp_write_session_write_request_process);
417	init_comm_element(&qstate->response,
418		CET_MP_WRITE_SESSION_WRITE_RESPONSE);
419	write_response = get_cache_mp_write_session_write_response(
420		&qstate->response);
421	write_request = get_cache_mp_write_session_write_request(
422		&qstate->request);
423
424	configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
425	write_response->error_code = cache_mp_write(
426		(cache_mp_write_session)qstate->mdata,
427		write_request->data,
428		write_request->data_size);
429	configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
430
431	qstate->kevent_watermark = sizeof(int);
432	qstate->process_func = on_mp_write_session_write_response_write1;
433	qstate->kevent_filter = EVFILT_WRITE;
434
435	TRACE_OUT(on_mp_write_session_write_request_process);
436	return (0);
437}
438
439static int
440on_mp_write_session_write_response_write1(struct query_state *qstate)
441{
442	struct cache_mp_write_session_write_response	*write_response;
443	ssize_t	result;
444
445	TRACE_IN(on_mp_write_session_write_response_write1);
446	write_response = get_cache_mp_write_session_write_response(
447		&qstate->response);
448	result = qstate->write_func(qstate, &write_response->error_code,
449		sizeof(int));
450	if (result != sizeof(int)) {
451		LOG_ERR_3("on_mp_write_session_write_response_write1",
452			"write failed");
453		TRACE_OUT(on_mp_write_session_write_response_write1);
454		return (-1);
455	}
456
457	if (write_response->error_code == 0) {
458		finalize_comm_element(&qstate->request);
459		finalize_comm_element(&qstate->response);
460
461		qstate->kevent_watermark = sizeof(int);
462		qstate->process_func = on_mp_write_session_mapper;
463		qstate->kevent_filter = EVFILT_READ;
464	} else {
465		qstate->kevent_watermark = 0;
466		qstate->process_func = 0;
467	}
468
469	TRACE_OUT(on_mp_write_session_write_response_write1);
470	return (0);
471}
472
473/*
474 * Handles abandon notifications. Destroys the session by calling the
475 * abandon_cache_mp_write_session.
476 */
477static int
478on_mp_write_session_abandon_notification(struct query_state *qstate)
479{
480	TRACE_IN(on_mp_write_session_abandon_notification);
481	configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
482	abandon_cache_mp_write_session((cache_mp_write_session)qstate->mdata);
483	configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
484	qstate->mdata = INVALID_CACHE_MP_WRITE_SESSION;
485
486	qstate->kevent_watermark = 0;
487	qstate->process_func = NULL;
488	TRACE_OUT(on_mp_write_session_abandon_notification);
489	return (0);
490}
491
492/*
493 * Handles close notifications. Commits the session by calling
494 * the close_cache_mp_write_session.
495 */
496static int
497on_mp_write_session_close_notification(struct query_state *qstate)
498{
499	TRACE_IN(on_mp_write_session_close_notification);
500	configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
501	close_cache_mp_write_session((cache_mp_write_session)qstate->mdata);
502	configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
503	qstate->mdata = INVALID_CACHE_MP_WRITE_SESSION;
504
505	qstate->kevent_watermark = 0;
506	qstate->process_func = NULL;
507	TRACE_OUT(on_mp_write_session_close_notification);
508	return (0);
509}
510
511cache_entry register_new_mp_cache_entry(struct query_state *qstate,
512	const char *dec_cache_entry_name)
513{
514	cache_entry c_entry;
515	char *en_bkp;
516
517	TRACE_IN(register_new_mp_cache_entry);
518	c_entry = INVALID_CACHE_ENTRY;
519	configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
520
521	configuration_lock_wrlock(s_configuration);
522	en_bkp = qstate->config_entry->mp_cache_params.cep.entry_name;
523	qstate->config_entry->mp_cache_params.cep.entry_name =
524		(char *)dec_cache_entry_name;
525	register_cache_entry(s_cache, (struct cache_entry_params *)
526		&qstate->config_entry->mp_cache_params);
527	qstate->config_entry->mp_cache_params.cep.entry_name = en_bkp;
528	configuration_unlock(s_configuration);
529
530	configuration_lock_rdlock(s_configuration);
531	c_entry = find_cache_entry(s_cache,
532		dec_cache_entry_name);
533	configuration_unlock(s_configuration);
534
535	configuration_entry_add_mp_cache_entry(qstate->config_entry,
536		c_entry);
537
538	configuration_unlock_entry(qstate->config_entry,
539		CELT_MULTIPART);
540
541	TRACE_OUT(register_new_mp_cache_entry);
542	return (c_entry);
543}
544