1/*
2 * services/mesh.c - deal with mesh of query states and handle events for that.
3 *
4 * Copyright (c) 2007, NLnet Labs. All rights reserved.
5 *
6 * This software is open source.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 *
12 * Redistributions of source code must retain the above copyright notice,
13 * this list of conditions and the following disclaimer.
14 *
15 * Redistributions in binary form must reproduce the above copyright notice,
16 * this list of conditions and the following disclaimer in the documentation
17 * and/or other materials provided with the distribution.
18 *
19 * Neither the name of the NLNET LABS nor the names of its contributors may
20 * be used to endorse or promote products derived from this software without
21 * specific prior written permission.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
25 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
26 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE
27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
33 * POSSIBILITY OF SUCH DAMAGE.
34 */
35
36/**
37 * \file
38 *
39 * This file contains functions to assist in dealing with a mesh of
40 * query states. This mesh is supposed to be thread-specific.
41 * It consists of query states (per qname, qtype, qclass) and connections
42 * between query states and the super and subquery states, and replies to
43 * send back to clients.
44 */
45#include "config.h"
46#include <ldns/wire2host.h>
47#include "services/mesh.h"
48#include "services/outbound_list.h"
49#include "services/cache/dns.h"
50#include "util/log.h"
51#include "util/net_help.h"
52#include "util/module.h"
53#include "util/regional.h"
54#include "util/data/msgencode.h"
55#include "util/timehist.h"
56#include "util/fptr_wlist.h"
57#include "util/alloc.h"
58#include "util/config_file.h"
59
60/** subtract timers and the values do not overflow or become negative */
61static void
62timeval_subtract(struct timeval* d, const struct timeval* end, const struct timeval* start)
63{
64#ifndef S_SPLINT_S
65	time_t end_usec = end->tv_usec;
66	d->tv_sec = end->tv_sec - start->tv_sec;
67	if(end_usec < start->tv_usec) {
68		end_usec += 1000000;
69		d->tv_sec--;
70	}
71	d->tv_usec = end_usec - start->tv_usec;
72#endif
73}
74
75/** add timers and the values do not overflow or become negative */
76static void
77timeval_add(struct timeval* d, const struct timeval* add)
78{
79#ifndef S_SPLINT_S
80	d->tv_sec += add->tv_sec;
81	d->tv_usec += add->tv_usec;
82	if(d->tv_usec > 1000000 ) {
83		d->tv_usec -= 1000000;
84		d->tv_sec++;
85	}
86#endif
87}
88
89/** divide sum of timers to get average */
90static void
91timeval_divide(struct timeval* avg, const struct timeval* sum, size_t d)
92{
93#ifndef S_SPLINT_S
94	size_t leftover;
95	if(d == 0) {
96		avg->tv_sec = 0;
97		avg->tv_usec = 0;
98		return;
99	}
100	avg->tv_sec = sum->tv_sec / d;
101	avg->tv_usec = sum->tv_usec / d;
102	/* handle fraction from seconds divide */
103	leftover = sum->tv_sec - avg->tv_sec*d;
104	avg->tv_usec += (leftover*1000000)/d;
105#endif
106}
107
108/** histogram compare of time values */
109static int
110timeval_smaller(const struct timeval* x, const struct timeval* y)
111{
112#ifndef S_SPLINT_S
113	if(x->tv_sec < y->tv_sec)
114		return 1;
115	else if(x->tv_sec == y->tv_sec) {
116		if(x->tv_usec <= y->tv_usec)
117			return 1;
118		else	return 0;
119	}
120	else	return 0;
121#endif
122}
123
124int
125mesh_state_compare(const void* ap, const void* bp)
126{
127	struct mesh_state* a = (struct mesh_state*)ap;
128	struct mesh_state* b = (struct mesh_state*)bp;
129
130	if(a->s.is_priming && !b->s.is_priming)
131		return -1;
132	if(!a->s.is_priming && b->s.is_priming)
133		return 1;
134
135	if((a->s.query_flags&BIT_RD) && !(b->s.query_flags&BIT_RD))
136		return -1;
137	if(!(a->s.query_flags&BIT_RD) && (b->s.query_flags&BIT_RD))
138		return 1;
139
140	if((a->s.query_flags&BIT_CD) && !(b->s.query_flags&BIT_CD))
141		return -1;
142	if(!(a->s.query_flags&BIT_CD) && (b->s.query_flags&BIT_CD))
143		return 1;
144
145	return query_info_compare(&a->s.qinfo, &b->s.qinfo);
146}
147
148int
149mesh_state_ref_compare(const void* ap, const void* bp)
150{
151	struct mesh_state_ref* a = (struct mesh_state_ref*)ap;
152	struct mesh_state_ref* b = (struct mesh_state_ref*)bp;
153	return mesh_state_compare(a->s, b->s);
154}
155
156struct mesh_area*
157mesh_create(struct module_stack* stack, struct module_env* env)
158{
159	struct mesh_area* mesh = calloc(1, sizeof(struct mesh_area));
160	if(!mesh) {
161		log_err("mesh area alloc: out of memory");
162		return NULL;
163	}
164	mesh->histogram = timehist_setup();
165	mesh->qbuf_bak = ldns_buffer_new(env->cfg->msg_buffer_size);
166	if(!mesh->histogram || !mesh->qbuf_bak) {
167		free(mesh);
168		log_err("mesh area alloc: out of memory");
169		return NULL;
170	}
171	mesh->mods = *stack;
172	mesh->env = env;
173	rbtree_init(&mesh->run, &mesh_state_compare);
174	rbtree_init(&mesh->all, &mesh_state_compare);
175	mesh->num_reply_addrs = 0;
176	mesh->num_reply_states = 0;
177	mesh->num_detached_states = 0;
178	mesh->num_forever_states = 0;
179	mesh->stats_jostled = 0;
180	mesh->stats_dropped = 0;
181	mesh->max_reply_states = env->cfg->num_queries_per_thread;
182	mesh->max_forever_states = (mesh->max_reply_states+1)/2;
183#ifndef S_SPLINT_S
184	mesh->jostle_max.tv_sec = (time_t)(env->cfg->jostle_time / 1000);
185	mesh->jostle_max.tv_usec = (time_t)((env->cfg->jostle_time % 1000)
186		*1000);
187#endif
188	return mesh;
189}
190
191/** help mesh delete delete mesh states */
192static void
193mesh_delete_helper(rbnode_t* n)
194{
195	struct mesh_state* mstate = (struct mesh_state*)n->key;
196	/* perform a full delete, not only 'cleanup' routine,
197	 * because other callbacks expect a clean state in the mesh.
198	 * For 're-entrant' calls */
199	mesh_state_delete(&mstate->s);
200	/* but because these delete the items from the tree, postorder
201	 * traversal and rbtree rebalancing do not work together */
202}
203
204void
205mesh_delete(struct mesh_area* mesh)
206{
207	if(!mesh)
208		return;
209	/* free all query states */
210	while(mesh->all.count)
211		mesh_delete_helper(mesh->all.root);
212	timehist_delete(mesh->histogram);
213	ldns_buffer_free(mesh->qbuf_bak);
214	free(mesh);
215}
216
217void
218mesh_delete_all(struct mesh_area* mesh)
219{
220	/* free all query states */
221	while(mesh->all.count)
222		mesh_delete_helper(mesh->all.root);
223	mesh->stats_dropped += mesh->num_reply_addrs;
224	/* clear mesh area references */
225	rbtree_init(&mesh->run, &mesh_state_compare);
226	rbtree_init(&mesh->all, &mesh_state_compare);
227	mesh->num_reply_addrs = 0;
228	mesh->num_reply_states = 0;
229	mesh->num_detached_states = 0;
230	mesh->num_forever_states = 0;
231	mesh->forever_first = NULL;
232	mesh->forever_last = NULL;
233	mesh->jostle_first = NULL;
234	mesh->jostle_last = NULL;
235}
236
237int mesh_make_new_space(struct mesh_area* mesh, ldns_buffer* qbuf)
238{
239	struct mesh_state* m = mesh->jostle_first;
240	/* free space is available */
241	if(mesh->num_reply_states < mesh->max_reply_states)
242		return 1;
243	/* try to kick out a jostle-list item */
244	if(m && m->reply_list && m->list_select == mesh_jostle_list) {
245		/* how old is it? */
246		struct timeval age;
247		timeval_subtract(&age, mesh->env->now_tv,
248			&m->reply_list->start_time);
249		if(timeval_smaller(&mesh->jostle_max, &age)) {
250			/* its a goner */
251			log_nametypeclass(VERB_ALGO, "query jostled out to "
252				"make space for a new one",
253				m->s.qinfo.qname, m->s.qinfo.qtype,
254				m->s.qinfo.qclass);
255			/* backup the query */
256			if(qbuf) ldns_buffer_copy(mesh->qbuf_bak, qbuf);
257			/* notify supers */
258			if(m->super_set.count > 0) {
259				verbose(VERB_ALGO, "notify supers of failure");
260				m->s.return_msg = NULL;
261				m->s.return_rcode = LDNS_RCODE_SERVFAIL;
262				mesh_walk_supers(mesh, m);
263			}
264			mesh->stats_jostled ++;
265			mesh_state_delete(&m->s);
266			/* restore the query - note that the qinfo ptr to
267			 * the querybuffer is then correct again. */
268			if(qbuf) ldns_buffer_copy(qbuf, mesh->qbuf_bak);
269			return 1;
270		}
271	}
272	/* no space for new item */
273	return 0;
274}
275
276void mesh_new_client(struct mesh_area* mesh, struct query_info* qinfo,
277        uint16_t qflags, struct edns_data* edns, struct comm_reply* rep,
278        uint16_t qid)
279{
280	/* do not use CD flag from user for mesh state, we want the CD-query
281	 * to receive validation anyway, to protect out cache contents and
282	 * avoid bad-data in this cache that a downstream validator cannot
283	 * remove from this cache */
284	struct mesh_state* s = mesh_area_find(mesh, qinfo, qflags&BIT_RD, 0);
285	int was_detached = 0;
286	int was_noreply = 0;
287	int added = 0;
288	/* does this create a new reply state? */
289	if(!s || s->list_select == mesh_no_list) {
290		if(!mesh_make_new_space(mesh, rep->c->buffer)) {
291			verbose(VERB_ALGO, "Too many queries. dropping "
292				"incoming query.");
293			comm_point_drop_reply(rep);
294			mesh->stats_dropped ++;
295			return;
296		}
297		/* for this new reply state, the reply address is free,
298		 * so the limit of reply addresses does not stop reply states*/
299	} else {
300		/* protect our memory usage from storing reply addresses */
301		if(mesh->num_reply_addrs > mesh->max_reply_states*16) {
302			verbose(VERB_ALGO, "Too many requests queued. "
303				"dropping incoming query.");
304			mesh->stats_dropped++;
305			comm_point_drop_reply(rep);
306			return;
307		}
308	}
309	/* see if it already exists, if not, create one */
310	if(!s) {
311#ifdef UNBOUND_DEBUG
312		struct rbnode_t* n;
313#endif
314		s = mesh_state_create(mesh->env, qinfo, qflags&BIT_RD, 0);
315		if(!s) {
316			log_err("mesh_state_create: out of memory; SERVFAIL");
317			error_encode(rep->c->buffer, LDNS_RCODE_SERVFAIL,
318				qinfo, qid, qflags, edns);
319			comm_point_send_reply(rep);
320			return;
321		}
322#ifdef UNBOUND_DEBUG
323		n =
324#endif
325		rbtree_insert(&mesh->all, &s->node);
326		log_assert(n != NULL);
327		/* set detached (it is now) */
328		mesh->num_detached_states++;
329		added = 1;
330	}
331	if(!s->reply_list && !s->cb_list && s->super_set.count == 0)
332		was_detached = 1;
333	if(!s->reply_list && !s->cb_list)
334		was_noreply = 1;
335	/* add reply to s */
336	if(!mesh_state_add_reply(s, edns, rep, qid, qflags, qinfo->qname)) {
337			log_err("mesh_new_client: out of memory; SERVFAIL");
338			error_encode(rep->c->buffer, LDNS_RCODE_SERVFAIL,
339				qinfo, qid, qflags, edns);
340			comm_point_send_reply(rep);
341			if(added)
342				mesh_state_delete(&s->s);
343			return;
344	}
345	/* update statistics */
346	if(was_detached) {
347		log_assert(mesh->num_detached_states > 0);
348		mesh->num_detached_states--;
349	}
350	if(was_noreply) {
351		mesh->num_reply_states ++;
352	}
353	mesh->num_reply_addrs++;
354	if(s->list_select == mesh_no_list) {
355		/* move to either the forever or the jostle_list */
356		if(mesh->num_forever_states < mesh->max_forever_states) {
357			mesh->num_forever_states ++;
358			mesh_list_insert(s, &mesh->forever_first,
359				&mesh->forever_last);
360			s->list_select = mesh_forever_list;
361		} else {
362			mesh_list_insert(s, &mesh->jostle_first,
363				&mesh->jostle_last);
364			s->list_select = mesh_jostle_list;
365		}
366	}
367	if(added)
368		mesh_run(mesh, s, module_event_new, NULL);
369}
370
371int
372mesh_new_callback(struct mesh_area* mesh, struct query_info* qinfo,
373	uint16_t qflags, struct edns_data* edns, ldns_buffer* buf,
374	uint16_t qid, mesh_cb_func_t cb, void* cb_arg)
375{
376	struct mesh_state* s = mesh_area_find(mesh, qinfo, qflags&BIT_RD, 0);
377	int was_detached = 0;
378	int was_noreply = 0;
379	int added = 0;
380	/* there are no limits on the number of callbacks */
381
382	/* see if it already exists, if not, create one */
383	if(!s) {
384#ifdef UNBOUND_DEBUG
385		struct rbnode_t* n;
386#endif
387		s = mesh_state_create(mesh->env, qinfo, qflags&BIT_RD, 0);
388		if(!s) {
389			return 0;
390		}
391#ifdef UNBOUND_DEBUG
392		n =
393#endif
394		rbtree_insert(&mesh->all, &s->node);
395		log_assert(n != NULL);
396		/* set detached (it is now) */
397		mesh->num_detached_states++;
398		added = 1;
399	}
400	if(!s->reply_list && !s->cb_list && s->super_set.count == 0)
401		was_detached = 1;
402	if(!s->reply_list && !s->cb_list)
403		was_noreply = 1;
404	/* add reply to s */
405	if(!mesh_state_add_cb(s, edns, buf, cb, cb_arg, qid, qflags)) {
406			if(added)
407				mesh_state_delete(&s->s);
408			return 0;
409	}
410	/* update statistics */
411	if(was_detached) {
412		log_assert(mesh->num_detached_states > 0);
413		mesh->num_detached_states--;
414	}
415	if(was_noreply) {
416		mesh->num_reply_states ++;
417	}
418	mesh->num_reply_addrs++;
419	if(added)
420		mesh_run(mesh, s, module_event_new, NULL);
421	return 1;
422}
423
424void mesh_new_prefetch(struct mesh_area* mesh, struct query_info* qinfo,
425        uint16_t qflags, uint32_t leeway)
426{
427	struct mesh_state* s = mesh_area_find(mesh, qinfo, qflags&BIT_RD, 0);
428#ifdef UNBOUND_DEBUG
429	struct rbnode_t* n;
430#endif
431	/* already exists, and for a different purpose perhaps.
432	 * if mesh_no_list, keep it that way. */
433	if(s) {
434		/* make it ignore the cache from now on */
435		if(!s->s.blacklist)
436			sock_list_insert(&s->s.blacklist, NULL, 0, s->s.region);
437		if(s->s.prefetch_leeway < leeway)
438			s->s.prefetch_leeway = leeway;
439		return;
440	}
441	if(!mesh_make_new_space(mesh, NULL)) {
442		verbose(VERB_ALGO, "Too many queries. dropped prefetch.");
443		mesh->stats_dropped ++;
444		return;
445	}
446	s = mesh_state_create(mesh->env, qinfo, qflags&BIT_RD, 0);
447	if(!s) {
448		log_err("prefetch mesh_state_create: out of memory");
449		return;
450	}
451#ifdef UNBOUND_DEBUG
452	n =
453#endif
454	rbtree_insert(&mesh->all, &s->node);
455	log_assert(n != NULL);
456	/* set detached (it is now) */
457	mesh->num_detached_states++;
458	/* make it ignore the cache */
459	sock_list_insert(&s->s.blacklist, NULL, 0, s->s.region);
460	s->s.prefetch_leeway = leeway;
461
462	if(s->list_select == mesh_no_list) {
463		/* move to either the forever or the jostle_list */
464		if(mesh->num_forever_states < mesh->max_forever_states) {
465			mesh->num_forever_states ++;
466			mesh_list_insert(s, &mesh->forever_first,
467				&mesh->forever_last);
468			s->list_select = mesh_forever_list;
469		} else {
470			mesh_list_insert(s, &mesh->jostle_first,
471				&mesh->jostle_last);
472			s->list_select = mesh_jostle_list;
473		}
474	}
475	mesh_run(mesh, s, module_event_new, NULL);
476}
477
478void mesh_report_reply(struct mesh_area* mesh, struct outbound_entry* e,
479        struct comm_reply* reply, int what)
480{
481	enum module_ev event = module_event_reply;
482	e->qstate->reply = reply;
483	if(what != NETEVENT_NOERROR) {
484		event = module_event_noreply;
485		if(what == NETEVENT_CAPSFAIL)
486			event = module_event_capsfail;
487	}
488	mesh_run(mesh, e->qstate->mesh_info, event, e);
489}
490
491struct mesh_state*
492mesh_state_create(struct module_env* env, struct query_info* qinfo,
493	uint16_t qflags, int prime)
494{
495	struct regional* region = alloc_reg_obtain(env->alloc);
496	struct mesh_state* mstate;
497	int i;
498	if(!region)
499		return NULL;
500	mstate = (struct mesh_state*)regional_alloc(region,
501		sizeof(struct mesh_state));
502	if(!mstate) {
503		alloc_reg_release(env->alloc, region);
504		return NULL;
505	}
506	memset(mstate, 0, sizeof(*mstate));
507	mstate->node = *RBTREE_NULL;
508	mstate->run_node = *RBTREE_NULL;
509	mstate->node.key = mstate;
510	mstate->run_node.key = mstate;
511	mstate->reply_list = NULL;
512	mstate->list_select = mesh_no_list;
513	mstate->replies_sent = 0;
514	rbtree_init(&mstate->super_set, &mesh_state_ref_compare);
515	rbtree_init(&mstate->sub_set, &mesh_state_ref_compare);
516	mstate->num_activated = 0;
517	/* init module qstate */
518	mstate->s.qinfo.qtype = qinfo->qtype;
519	mstate->s.qinfo.qclass = qinfo->qclass;
520	mstate->s.qinfo.qname_len = qinfo->qname_len;
521	mstate->s.qinfo.qname = regional_alloc_init(region, qinfo->qname,
522		qinfo->qname_len);
523	if(!mstate->s.qinfo.qname) {
524		alloc_reg_release(env->alloc, region);
525		return NULL;
526	}
527	/* remove all weird bits from qflags */
528	mstate->s.query_flags = (qflags & (BIT_RD|BIT_CD));
529	mstate->s.is_priming = prime;
530	mstate->s.reply = NULL;
531	mstate->s.region = region;
532	mstate->s.curmod = 0;
533	mstate->s.return_msg = 0;
534	mstate->s.return_rcode = LDNS_RCODE_NOERROR;
535	mstate->s.env = env;
536	mstate->s.mesh_info = mstate;
537	mstate->s.prefetch_leeway = 0;
538	/* init modules */
539	for(i=0; i<env->mesh->mods.num; i++) {
540		mstate->s.minfo[i] = NULL;
541		mstate->s.ext_state[i] = module_state_initial;
542	}
543	return mstate;
544}
545
546void
547mesh_state_cleanup(struct mesh_state* mstate)
548{
549	struct mesh_area* mesh;
550	int i;
551	if(!mstate)
552		return;
553	mesh = mstate->s.env->mesh;
554	/* drop unsent replies */
555	if(!mstate->replies_sent) {
556		struct mesh_reply* rep;
557		struct mesh_cb* cb;
558		for(rep=mstate->reply_list; rep; rep=rep->next) {
559			comm_point_drop_reply(&rep->query_reply);
560			mesh->num_reply_addrs--;
561		}
562		for(cb=mstate->cb_list; cb; cb=cb->next) {
563			fptr_ok(fptr_whitelist_mesh_cb(cb->cb));
564			(*cb->cb)(cb->cb_arg, LDNS_RCODE_SERVFAIL, NULL,
565				sec_status_unchecked, NULL);
566			mesh->num_reply_addrs--;
567		}
568	}
569
570	/* de-init modules */
571	for(i=0; i<mesh->mods.num; i++) {
572		fptr_ok(fptr_whitelist_mod_clear(mesh->mods.mod[i]->clear));
573		(*mesh->mods.mod[i]->clear)(&mstate->s, i);
574		mstate->s.minfo[i] = NULL;
575		mstate->s.ext_state[i] = module_finished;
576	}
577	alloc_reg_release(mstate->s.env->alloc, mstate->s.region);
578}
579
580void
581mesh_state_delete(struct module_qstate* qstate)
582{
583	struct mesh_area* mesh;
584	struct mesh_state_ref* super, ref;
585	struct mesh_state* mstate;
586	if(!qstate)
587		return;
588	mstate = qstate->mesh_info;
589	mesh = mstate->s.env->mesh;
590	mesh_detach_subs(&mstate->s);
591	if(mstate->list_select == mesh_forever_list) {
592		mesh->num_forever_states --;
593		mesh_list_remove(mstate, &mesh->forever_first,
594			&mesh->forever_last);
595	} else if(mstate->list_select == mesh_jostle_list) {
596		mesh_list_remove(mstate, &mesh->jostle_first,
597			&mesh->jostle_last);
598	}
599	if(!mstate->reply_list && !mstate->cb_list
600		&& mstate->super_set.count == 0) {
601		log_assert(mesh->num_detached_states > 0);
602		mesh->num_detached_states--;
603	}
604	if(mstate->reply_list || mstate->cb_list) {
605		log_assert(mesh->num_reply_states > 0);
606		mesh->num_reply_states--;
607	}
608	ref.node.key = &ref;
609	ref.s = mstate;
610	RBTREE_FOR(super, struct mesh_state_ref*, &mstate->super_set) {
611		(void)rbtree_delete(&super->s->sub_set, &ref);
612	}
613	(void)rbtree_delete(&mesh->run, mstate);
614	(void)rbtree_delete(&mesh->all, mstate);
615	mesh_state_cleanup(mstate);
616}
617
618/** helper recursive rbtree find routine */
619static int
620find_in_subsub(struct mesh_state* m, struct mesh_state* tofind, size_t *c)
621{
622	struct mesh_state_ref* r;
623	if((*c)++ > MESH_MAX_SUBSUB)
624		return 1;
625	RBTREE_FOR(r, struct mesh_state_ref*, &m->sub_set) {
626		if(r->s == tofind || find_in_subsub(r->s, tofind, c))
627			return 1;
628	}
629	return 0;
630}
631
632/** find cycle for already looked up mesh_state */
633static int
634mesh_detect_cycle_found(struct module_qstate* qstate, struct mesh_state* dep_m)
635{
636	struct mesh_state* cyc_m = qstate->mesh_info;
637	size_t counter = 0;
638	if(!dep_m)
639		return 0;
640	if(dep_m == cyc_m || find_in_subsub(dep_m, cyc_m, &counter)) {
641		if(counter > MESH_MAX_SUBSUB)
642			return 2;
643		return 1;
644	}
645	return 0;
646}
647
648void mesh_detach_subs(struct module_qstate* qstate)
649{
650	struct mesh_area* mesh = qstate->env->mesh;
651	struct mesh_state_ref* ref, lookup;
652#ifdef UNBOUND_DEBUG
653	struct rbnode_t* n;
654#endif
655	lookup.node.key = &lookup;
656	lookup.s = qstate->mesh_info;
657	RBTREE_FOR(ref, struct mesh_state_ref*, &qstate->mesh_info->sub_set) {
658#ifdef UNBOUND_DEBUG
659		n =
660#endif
661		rbtree_delete(&ref->s->super_set, &lookup);
662		log_assert(n != NULL); /* must have been present */
663		if(!ref->s->reply_list && !ref->s->cb_list
664			&& ref->s->super_set.count == 0) {
665			mesh->num_detached_states++;
666			log_assert(mesh->num_detached_states +
667				mesh->num_reply_states <= mesh->all.count);
668		}
669	}
670	rbtree_init(&qstate->mesh_info->sub_set, &mesh_state_ref_compare);
671}
672
673int mesh_attach_sub(struct module_qstate* qstate, struct query_info* qinfo,
674        uint16_t qflags, int prime, struct module_qstate** newq)
675{
676	/* find it, if not, create it */
677	struct mesh_area* mesh = qstate->env->mesh;
678	struct mesh_state* sub = mesh_area_find(mesh, qinfo, qflags, prime);
679	int was_detached;
680	if(mesh_detect_cycle_found(qstate, sub)) {
681		verbose(VERB_ALGO, "attach failed, cycle detected");
682		return 0;
683	}
684	if(!sub) {
685#ifdef UNBOUND_DEBUG
686		struct rbnode_t* n;
687#endif
688		/* create a new one */
689		sub = mesh_state_create(qstate->env, qinfo, qflags, prime);
690		if(!sub) {
691			log_err("mesh_attach_sub: out of memory");
692			return 0;
693		}
694#ifdef UNBOUND_DEBUG
695		n =
696#endif
697		rbtree_insert(&mesh->all, &sub->node);
698		log_assert(n != NULL);
699		/* set detached (it is now) */
700		mesh->num_detached_states++;
701		/* set new query state to run */
702#ifdef UNBOUND_DEBUG
703		n =
704#endif
705		rbtree_insert(&mesh->run, &sub->run_node);
706		log_assert(n != NULL);
707		*newq = &sub->s;
708	} else
709		*newq = NULL;
710	was_detached = (sub->super_set.count == 0);
711	if(!mesh_state_attachment(qstate->mesh_info, sub))
712		return 0;
713	/* if it was a duplicate  attachment, the count was not zero before */
714	if(!sub->reply_list && !sub->cb_list && was_detached &&
715		sub->super_set.count == 1) {
716		/* it used to be detached, before this one got added */
717		log_assert(mesh->num_detached_states > 0);
718		mesh->num_detached_states--;
719	}
720	/* *newq will be run when inited after the current module stops */
721	return 1;
722}
723
724int mesh_state_attachment(struct mesh_state* super, struct mesh_state* sub)
725{
726#ifdef UNBOUND_DEBUG
727	struct rbnode_t* n;
728#endif
729	struct mesh_state_ref* subref; /* points to sub, inserted in super */
730	struct mesh_state_ref* superref; /* points to super, inserted in sub */
731	if( !(subref = regional_alloc(super->s.region,
732		sizeof(struct mesh_state_ref))) ||
733		!(superref = regional_alloc(sub->s.region,
734		sizeof(struct mesh_state_ref))) ) {
735		log_err("mesh_state_attachment: out of memory");
736		return 0;
737	}
738	superref->node.key = superref;
739	superref->s = super;
740	subref->node.key = subref;
741	subref->s = sub;
742	if(!rbtree_insert(&sub->super_set, &superref->node)) {
743		/* this should not happen, iterator and validator do not
744		 * attach subqueries that are identical. */
745		/* already attached, we are done, nothing todo.
746		 * since superref and subref already allocated in region,
747		 * we cannot free them */
748		return 1;
749	}
750#ifdef UNBOUND_DEBUG
751	n =
752#endif
753	rbtree_insert(&super->sub_set, &subref->node);
754	log_assert(n != NULL); /* we checked above if statement, the reverse
755	  administration should not fail now, unless they are out of sync */
756	return 1;
757}
758
759/**
760 * callback results to mesh cb entry
761 * @param m: mesh state to send it for.
762 * @param rcode: if not 0, error code.
763 * @param rep: reply to send (or NULL if rcode is set).
764 * @param r: callback entry
765 */
766static void
767mesh_do_callback(struct mesh_state* m, int rcode, struct reply_info* rep,
768	struct mesh_cb* r)
769{
770	int secure;
771	char* reason = NULL;
772	/* bogus messages are not made into servfail, sec_status passed
773	 * to the callback function */
774	if(rep && rep->security == sec_status_secure)
775		secure = 1;
776	else	secure = 0;
777	if(!rep && rcode == LDNS_RCODE_NOERROR)
778		rcode = LDNS_RCODE_SERVFAIL;
779	if(!rcode && rep->security == sec_status_bogus) {
780		if(!(reason = errinf_to_str(&m->s)))
781			rcode = LDNS_RCODE_SERVFAIL;
782	}
783	/* send the reply */
784	if(rcode) {
785		fptr_ok(fptr_whitelist_mesh_cb(r->cb));
786		(*r->cb)(r->cb_arg, rcode, r->buf, sec_status_unchecked, NULL);
787	} else {
788		size_t udp_size = r->edns.udp_size;
789		ldns_buffer_clear(r->buf);
790		r->edns.edns_version = EDNS_ADVERTISED_VERSION;
791		r->edns.udp_size = EDNS_ADVERTISED_SIZE;
792		r->edns.ext_rcode = 0;
793		r->edns.bits &= EDNS_DO;
794		if(!reply_info_answer_encode(&m->s.qinfo, rep, r->qid,
795			r->qflags, r->buf, 0, 1,
796			m->s.env->scratch, udp_size, &r->edns,
797			(int)(r->edns.bits & EDNS_DO), secure))
798		{
799			fptr_ok(fptr_whitelist_mesh_cb(r->cb));
800			(*r->cb)(r->cb_arg, LDNS_RCODE_SERVFAIL, r->buf,
801				sec_status_unchecked, NULL);
802		} else {
803			fptr_ok(fptr_whitelist_mesh_cb(r->cb));
804			(*r->cb)(r->cb_arg, LDNS_RCODE_NOERROR, r->buf,
805				rep->security, reason);
806		}
807	}
808	free(reason);
809	m->s.env->mesh->num_reply_addrs--;
810}
811
812/**
813 * Send reply to mesh reply entry
814 * @param m: mesh state to send it for.
815 * @param rcode: if not 0, error code.
816 * @param rep: reply to send (or NULL if rcode is set).
817 * @param r: reply entry
818 * @param prev: previous reply, already has its answer encoded in buffer.
819 */
820static void
821mesh_send_reply(struct mesh_state* m, int rcode, struct reply_info* rep,
822	struct mesh_reply* r, struct mesh_reply* prev)
823{
824	struct timeval end_time;
825	struct timeval duration;
826	int secure;
827	/* examine security status */
828	if(m->s.env->need_to_validate && (!(r->qflags&BIT_CD) ||
829		m->s.env->cfg->ignore_cd) && rep &&
830		rep->security <= sec_status_bogus) {
831		rcode = LDNS_RCODE_SERVFAIL;
832		if(m->s.env->cfg->stat_extended)
833			m->s.env->mesh->ans_bogus++;
834	}
835	if(rep && rep->security == sec_status_secure)
836		secure = 1;
837	else	secure = 0;
838	if(!rep && rcode == LDNS_RCODE_NOERROR)
839		rcode = LDNS_RCODE_SERVFAIL;
840	/* send the reply */
841	if(prev && prev->qflags == r->qflags &&
842		prev->edns.edns_present == r->edns.edns_present &&
843		prev->edns.bits == r->edns.bits &&
844		prev->edns.udp_size == r->edns.udp_size) {
845		/* if the previous reply is identical to this one, fix ID */
846		if(prev->query_reply.c->buffer != r->query_reply.c->buffer)
847			ldns_buffer_copy(r->query_reply.c->buffer,
848				prev->query_reply.c->buffer);
849		ldns_buffer_write_at(r->query_reply.c->buffer, 0,
850			&r->qid, sizeof(uint16_t));
851		ldns_buffer_write_at(r->query_reply.c->buffer, 12,
852			r->qname, m->s.qinfo.qname_len);
853		comm_point_send_reply(&r->query_reply);
854	} else if(rcode) {
855		m->s.qinfo.qname = r->qname;
856		error_encode(r->query_reply.c->buffer, rcode, &m->s.qinfo,
857			r->qid, r->qflags, &r->edns);
858		comm_point_send_reply(&r->query_reply);
859	} else {
860		size_t udp_size = r->edns.udp_size;
861		r->edns.edns_version = EDNS_ADVERTISED_VERSION;
862		r->edns.udp_size = EDNS_ADVERTISED_SIZE;
863		r->edns.ext_rcode = 0;
864		r->edns.bits &= EDNS_DO;
865		m->s.qinfo.qname = r->qname;
866		if(!reply_info_answer_encode(&m->s.qinfo, rep, r->qid,
867			r->qflags, r->query_reply.c->buffer, 0, 1,
868			m->s.env->scratch, udp_size, &r->edns,
869			(int)(r->edns.bits & EDNS_DO), secure))
870		{
871			error_encode(r->query_reply.c->buffer,
872				LDNS_RCODE_SERVFAIL, &m->s.qinfo, r->qid,
873				r->qflags, &r->edns);
874		}
875		comm_point_send_reply(&r->query_reply);
876	}
877	/* account */
878	m->s.env->mesh->num_reply_addrs--;
879	end_time = *m->s.env->now_tv;
880	timeval_subtract(&duration, &end_time, &r->start_time);
881	verbose(VERB_ALGO, "query took %d.%6.6d sec",
882		(int)duration.tv_sec, (int)duration.tv_usec);
883	m->s.env->mesh->replies_sent++;
884	timeval_add(&m->s.env->mesh->replies_sum_wait, &duration);
885	timehist_insert(m->s.env->mesh->histogram, &duration);
886	if(m->s.env->cfg->stat_extended) {
887		uint16_t rc = FLAGS_GET_RCODE(ldns_buffer_read_u16_at(r->
888			query_reply.c->buffer, 2));
889		if(secure) m->s.env->mesh->ans_secure++;
890		m->s.env->mesh->ans_rcode[ rc ] ++;
891		if(rc == 0 && LDNS_ANCOUNT(ldns_buffer_begin(r->
892			query_reply.c->buffer)) == 0)
893			m->s.env->mesh->ans_nodata++;
894	}
895}
896
897void mesh_query_done(struct mesh_state* mstate)
898{
899	struct mesh_reply* r;
900	struct mesh_reply* prev = NULL;
901	struct mesh_cb* c;
902	struct reply_info* rep = (mstate->s.return_msg?
903		mstate->s.return_msg->rep:NULL);
904	for(r = mstate->reply_list; r; r = r->next) {
905		mesh_send_reply(mstate, mstate->s.return_rcode, rep, r, prev);
906		prev = r;
907	}
908	mstate->replies_sent = 1;
909	for(c = mstate->cb_list; c; c = c->next) {
910		mesh_do_callback(mstate, mstate->s.return_rcode, rep, c);
911	}
912}
913
914void mesh_walk_supers(struct mesh_area* mesh, struct mesh_state* mstate)
915{
916	struct mesh_state_ref* ref;
917	RBTREE_FOR(ref, struct mesh_state_ref*, &mstate->super_set)
918	{
919		/* make super runnable */
920		(void)rbtree_insert(&mesh->run, &ref->s->run_node);
921		/* callback the function to inform super of result */
922		fptr_ok(fptr_whitelist_mod_inform_super(
923			mesh->mods.mod[ref->s->s.curmod]->inform_super));
924		(*mesh->mods.mod[ref->s->s.curmod]->inform_super)(&mstate->s,
925			ref->s->s.curmod, &ref->s->s);
926	}
927}
928
929struct mesh_state* mesh_area_find(struct mesh_area* mesh,
930	struct query_info* qinfo, uint16_t qflags, int prime)
931{
932	struct mesh_state key;
933	struct mesh_state* result;
934
935	key.node.key = &key;
936	key.s.is_priming = prime;
937	key.s.qinfo = *qinfo;
938	key.s.query_flags = qflags;
939
940	result = (struct mesh_state*)rbtree_search(&mesh->all, &key);
941	return result;
942}
943
944int mesh_state_add_cb(struct mesh_state* s, struct edns_data* edns,
945        ldns_buffer* buf, mesh_cb_func_t cb, void* cb_arg,
946	uint16_t qid, uint16_t qflags)
947{
948	struct mesh_cb* r = regional_alloc(s->s.region,
949		sizeof(struct mesh_cb));
950	if(!r)
951		return 0;
952	r->buf = buf;
953	log_assert(fptr_whitelist_mesh_cb(cb)); /* early failure ifmissing*/
954	r->cb = cb;
955	r->cb_arg = cb_arg;
956	r->edns = *edns;
957	r->qid = qid;
958	r->qflags = qflags;
959	r->next = s->cb_list;
960	s->cb_list = r;
961	return 1;
962
963}
964
965int mesh_state_add_reply(struct mesh_state* s, struct edns_data* edns,
966        struct comm_reply* rep, uint16_t qid, uint16_t qflags, uint8_t* qname)
967{
968	struct mesh_reply* r = regional_alloc(s->s.region,
969		sizeof(struct mesh_reply));
970	if(!r)
971		return 0;
972	r->query_reply = *rep;
973	r->edns = *edns;
974	r->qid = qid;
975	r->qflags = qflags;
976	r->start_time = *s->s.env->now_tv;
977	r->next = s->reply_list;
978	r->qname = regional_alloc_init(s->s.region, qname,
979		s->s.qinfo.qname_len);
980	if(!r->qname)
981		return 0;
982	s->reply_list = r;
983	return 1;
984
985}
986
987/**
988 * Continue processing the mesh state at another module.
989 * Handles module to modules tranfer of control.
990 * Handles module finished.
991 * @param mesh: the mesh area.
992 * @param mstate: currently active mesh state.
993 * 	Deleted if finished, calls _done and _supers to
994 * 	send replies to clients and inform other mesh states.
995 * 	This in turn may create additional runnable mesh states.
996 * @param s: state at which the current module exited.
997 * @param ev: the event sent to the module.
998 * 	returned is the event to send to the next module.
999 * @return true if continue processing at the new module.
1000 * 	false if not continued processing is needed.
1001 */
1002static int
1003mesh_continue(struct mesh_area* mesh, struct mesh_state* mstate,
1004	enum module_ext_state s, enum module_ev* ev)
1005{
1006	mstate->num_activated++;
1007	if(mstate->num_activated > MESH_MAX_ACTIVATION) {
1008		/* module is looping. Stop it. */
1009		log_err("internal error: looping module stopped");
1010		log_query_info(VERB_QUERY, "pass error for qstate",
1011			&mstate->s.qinfo);
1012		s = module_error;
1013	}
1014	if(s == module_wait_module || s == module_restart_next) {
1015		/* start next module */
1016		mstate->s.curmod++;
1017		if(mesh->mods.num == mstate->s.curmod) {
1018			log_err("Cannot pass to next module; at last module");
1019			log_query_info(VERB_QUERY, "pass error for qstate",
1020				&mstate->s.qinfo);
1021			mstate->s.curmod--;
1022			return mesh_continue(mesh, mstate, module_error, ev);
1023		}
1024		if(s == module_restart_next) {
1025			fptr_ok(fptr_whitelist_mod_clear(
1026				mesh->mods.mod[mstate->s.curmod]->clear));
1027			(*mesh->mods.mod[mstate->s.curmod]->clear)
1028				(&mstate->s, mstate->s.curmod);
1029			mstate->s.minfo[mstate->s.curmod] = NULL;
1030		}
1031		*ev = module_event_pass;
1032		return 1;
1033	}
1034	if(s == module_error && mstate->s.return_rcode == LDNS_RCODE_NOERROR) {
1035		/* error is bad, handle pass back up below */
1036		mstate->s.return_rcode = LDNS_RCODE_SERVFAIL;
1037	}
1038	if(s == module_error || s == module_finished) {
1039		if(mstate->s.curmod == 0) {
1040			mesh_query_done(mstate);
1041			mesh_walk_supers(mesh, mstate);
1042			mesh_state_delete(&mstate->s);
1043			return 0;
1044		}
1045		/* pass along the locus of control */
1046		mstate->s.curmod --;
1047		*ev = module_event_moddone;
1048		return 1;
1049	}
1050	return 0;
1051}
1052
1053void mesh_run(struct mesh_area* mesh, struct mesh_state* mstate,
1054	enum module_ev ev, struct outbound_entry* e)
1055{
1056	enum module_ext_state s;
1057	verbose(VERB_ALGO, "mesh_run: start");
1058	while(mstate) {
1059		/* run the module */
1060		fptr_ok(fptr_whitelist_mod_operate(
1061			mesh->mods.mod[mstate->s.curmod]->operate));
1062		(*mesh->mods.mod[mstate->s.curmod]->operate)
1063			(&mstate->s, ev, mstate->s.curmod, e);
1064
1065		/* examine results */
1066		mstate->s.reply = NULL;
1067		regional_free_all(mstate->s.env->scratch);
1068		s = mstate->s.ext_state[mstate->s.curmod];
1069		verbose(VERB_ALGO, "mesh_run: %s module exit state is %s",
1070			mesh->mods.mod[mstate->s.curmod]->name, strextstate(s));
1071		e = NULL;
1072		if(mesh_continue(mesh, mstate, s, &ev))
1073			continue;
1074
1075		/* run more modules */
1076		ev = module_event_pass;
1077		if(mesh->run.count > 0) {
1078			/* pop random element off the runnable tree */
1079			mstate = (struct mesh_state*)mesh->run.root->key;
1080			(void)rbtree_delete(&mesh->run, mstate);
1081		} else mstate = NULL;
1082	}
1083	if(verbosity >= VERB_ALGO) {
1084		mesh_stats(mesh, "mesh_run: end");
1085		mesh_log_list(mesh);
1086	}
1087}
1088
1089void
1090mesh_log_list(struct mesh_area* mesh)
1091{
1092	char buf[30];
1093	struct mesh_state* m;
1094	int num = 0;
1095	RBTREE_FOR(m, struct mesh_state*, &mesh->all) {
1096		snprintf(buf, sizeof(buf), "%d%s%s%s%s%s mod%d %s%s",
1097			num++, (m->s.is_priming)?"p":"",  /* prime */
1098			(m->s.query_flags&BIT_RD)?"RD":"",
1099			(m->s.query_flags&BIT_CD)?"CD":"",
1100			(m->super_set.count==0)?"d":"", /* detached */
1101			(m->sub_set.count!=0)?"c":"",  /* children */
1102			m->s.curmod, (m->reply_list)?"rep":"", /*hasreply*/
1103			(m->cb_list)?"cb":"" /* callbacks */
1104			);
1105		log_query_info(VERB_ALGO, buf, &m->s.qinfo);
1106	}
1107}
1108
1109void
1110mesh_stats(struct mesh_area* mesh, const char* str)
1111{
1112	verbose(VERB_DETAIL, "%s %u recursion states (%u with reply, "
1113		"%u detached), %u waiting replies, %u recursion replies "
1114		"sent, %d replies dropped, %d states jostled out",
1115		str, (unsigned)mesh->all.count,
1116		(unsigned)mesh->num_reply_states,
1117		(unsigned)mesh->num_detached_states,
1118		(unsigned)mesh->num_reply_addrs,
1119		(unsigned)mesh->replies_sent,
1120		(unsigned)mesh->stats_dropped,
1121		(unsigned)mesh->stats_jostled);
1122	if(mesh->replies_sent > 0) {
1123		struct timeval avg;
1124		timeval_divide(&avg, &mesh->replies_sum_wait,
1125			mesh->replies_sent);
1126		log_info("average recursion processing time "
1127			"%d.%6.6d sec", (int)avg.tv_sec, (int)avg.tv_usec);
1128		log_info("histogram of recursion processing times");
1129		timehist_log(mesh->histogram, "recursions");
1130	}
1131}
1132
1133void
1134mesh_stats_clear(struct mesh_area* mesh)
1135{
1136	if(!mesh)
1137		return;
1138	mesh->replies_sent = 0;
1139	mesh->replies_sum_wait.tv_sec = 0;
1140	mesh->replies_sum_wait.tv_usec = 0;
1141	mesh->stats_jostled = 0;
1142	mesh->stats_dropped = 0;
1143	timehist_clear(mesh->histogram);
1144	mesh->ans_secure = 0;
1145	mesh->ans_bogus = 0;
1146	memset(&mesh->ans_rcode[0], 0, sizeof(size_t)*16);
1147	mesh->ans_nodata = 0;
1148}
1149
1150size_t
1151mesh_get_mem(struct mesh_area* mesh)
1152{
1153	struct mesh_state* m;
1154	size_t s = sizeof(*mesh) + sizeof(struct timehist) +
1155		sizeof(struct th_buck)*mesh->histogram->num +
1156		sizeof(ldns_buffer) + ldns_buffer_capacity(mesh->qbuf_bak);
1157	RBTREE_FOR(m, struct mesh_state*, &mesh->all) {
1158		/* all, including m itself allocated in qstate region */
1159		s += regional_get_mem(m->s.region);
1160	}
1161	return s;
1162}
1163
1164int
1165mesh_detect_cycle(struct module_qstate* qstate, struct query_info* qinfo,
1166	uint16_t flags, int prime)
1167{
1168	struct mesh_area* mesh = qstate->env->mesh;
1169	struct mesh_state* dep_m = mesh_area_find(mesh, qinfo, flags, prime);
1170	return mesh_detect_cycle_found(qstate, dep_m);
1171}
1172
1173void mesh_list_insert(struct mesh_state* m, struct mesh_state** fp,
1174        struct mesh_state** lp)
1175{
1176	/* insert as last element */
1177	m->prev = *lp;
1178	m->next = NULL;
1179	if(*lp)
1180		(*lp)->next = m;
1181	else	*fp = m;
1182	*lp = m;
1183}
1184
1185void mesh_list_remove(struct mesh_state* m, struct mesh_state** fp,
1186        struct mesh_state** lp)
1187{
1188	if(m->next)
1189		m->next->prev = m->prev;
1190	else	*lp = m->prev;
1191	if(m->prev)
1192		m->prev->next = m->next;
1193	else	*fp = m->next;
1194}
1195